From 71b5041da4eef5ad2007a1a49f04ecaf2391bd5b Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Tue, 30 Nov 2021 23:26:28 +0900
Subject: [PATCH v8] Improve parallel vacuum implementation.

Previously, in parallel vacuum, we allocated shmem area of
IndexBulkDeleteResult only for indexes where parallel index vacuuming is
safe and had null-bitmap in shmem area to access them. This logic was too
complicated with a small benefit of saving only a few bits per indexes.

In this commit, we allocate a dedicated shmem area for the array of
LVParallelIndStats that includes a parallel-safety flag, the index vacuum
status, and IndexBulkdeleteResult. There is one array element for every
index, even those indexes where parallel index vacuuming is unsafe or not
worthwhile. This commit makes the code clear by removing all
bitmap-related code.

Also, add the check each index vacuum status after parallel index vacuum
to make sure that all indexes have been processed.

Finally, rename parallel vacuum functions to parallel_vacuum_* for
consistency.

Author: Masahiko Sawada, based on suggestions by Andres Freund
Reviewed-by: Hou Zhijie, Amit Kapila
Discussion: https://www.postgresql.org/message-id/20211030212101.ae3qcouatwmy7tbr%40alap3.anarazel.de
---
 src/backend/access/heap/vacuumlazy.c | 1222 +++++++++++++++++-----------------
 src/tools/pgindent/typedefs.list     |    2 +
 2 files changed, 597 insertions(+), 627 deletions(-)

diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c
index 282b44f..6d9f890 100644
--- a/src/backend/access/heap/vacuumlazy.c
+++ b/src/backend/access/heap/vacuumlazy.c
@@ -130,6 +130,7 @@
 #define PARALLEL_VACUUM_KEY_QUERY_TEXT		3
 #define PARALLEL_VACUUM_KEY_BUFFER_USAGE	4
 #define PARALLEL_VACUUM_KEY_WAL_USAGE		5
+#define PARALLEL_VACUUM_KEY_INDEX_STATS		6
 
 /*
  * Macro to check if we are in a parallel vacuum.  If true, we are in the
@@ -182,14 +183,6 @@ typedef struct LVShared
 	int			elevel;
 
 	/*
-	 * An indication for vacuum workers to perform either index vacuum or
-	 * index cleanup.  first_time is true only if for_cleanup is true and
-	 * bulk-deletion is not performed yet.
-	 */
-	bool		for_cleanup;
-	bool		first_time;
-
-	/*
 	 * Fields for both index vacuum and cleanup.
 	 *
 	 * reltuples is the total number of input heap tuples.  We set either old
@@ -226,33 +219,44 @@ typedef struct LVShared
 	 */
 	pg_atomic_uint32 active_nworkers;
 
-	/*
-	 * Variables to control parallel vacuum.  We have a bitmap to indicate
-	 * which index has stats in shared memory.  The set bit in the map
-	 * indicates that the particular index supports a parallel vacuum.
-	 */
-	pg_atomic_uint32 idx;		/* counter for vacuuming and clean up */
-	uint32		offset;			/* sizeof header incl. bitmap */
-	bits8		bitmap[FLEXIBLE_ARRAY_MEMBER];	/* bit map of NULLs */
-
-	/* Shared index statistics data follows at end of struct */
+	/* Counter for vacuuming and cleanup */
+	pg_atomic_uint32 idx;
 } LVShared;
 
-#define SizeOfLVShared (offsetof(LVShared, bitmap) + sizeof(bits8))
-#define GetSharedIndStats(s) \
-	((LVSharedIndStats *)((char *)(s) + ((LVShared *)(s))->offset))
-#define IndStatsIsNull(s, i) \
-	(!(((LVShared *)(s))->bitmap[(i) >> 3] & (1 << ((i) & 0x07))))
+/* Status used during parallel index vacuum or cleanup */
+typedef enum LVParallelIndVacStatus
+{
+	PARALLEL_INDVAC_STATUS_INITIAL = 0,
+	PARALLEL_INDVAC_STATUS_NEED_BULKDELETE,
+	PARALLEL_INDVAC_STATUS_NEED_CLEANUP,
+	PARALLEL_INDVAC_STATUS_COMPLETED
+} LVParallelIndVacStatus;
 
 /*
- * Struct for an index bulk-deletion statistic used for parallel vacuum.  This
- * is allocated in the DSM segment.
+ * Struct for index vacuum statistics of an index that is used for parallel vacuum.
+ * This includes the status of parallel index vacuum as well as index statistics.
  */
-typedef struct LVSharedIndStats
+typedef struct LVParallelIndStats
 {
-	bool		updated;		/* are the stats updated? */
+	/*
+	 * The following two fields are set by leader process before executing
+	 * parallel index vacuum or parallel index cleanup.  These fields are not
+	 * fixed for the entire VACUUM operation.  They are only fixed for an
+	 * individual parallel index vacuum and cleanup.
+	 *
+	 * parallel_workers_can_process is true if both leader and worker can
+	 * process the index, otherwise only leader can process it.
+	 */
+	LVParallelIndVacStatus status;
+	bool		parallel_workers_can_process;
+
+	/*
+	 * Individual worker or leader stores the result of index vacuum or
+	 * cleanup.
+	 */
+	bool		istat_updated;	/* are the stats updated? */
 	IndexBulkDeleteResult istat;
-} LVSharedIndStats;
+} LVParallelIndStats;
 
 /* Struct for maintaining a parallel vacuum state. */
 typedef struct LVParallelState
@@ -262,6 +266,16 @@ typedef struct LVParallelState
 	/* Shared information among parallel vacuum workers */
 	LVShared   *lvshared;
 
+	/*
+	 * Shared index statistics among parallel vacuum workers. The array
+	 * element is allocated for every index, even those indexes where parallel
+	 * index vacuuming is unsafe or not worthwhile (e.g.,
+	 * will_parallel_vacuum[] is false).  During parallel vacuum,
+	 * IndexBulkDeleteResult of each index is kept in DSM and is copied into
+	 * local memory at the end of parallel vacuum.
+	 */
+	LVParallelIndStats *lvpindstats;
+
 	/* Points to buffer usage area in DSM */
 	BufferUsage *buffer_usage;
 
@@ -269,6 +283,13 @@ typedef struct LVParallelState
 	WalUsage   *wal_usage;
 
 	/*
+	 * False if the index is totally unsuitable target for all parallel
+	 * processing. For example, the index could be <
+	 * min_parallel_index_scan_size cutoff.
+	 */
+	bool	   *will_parallel_vacuum;
+
+	/*
 	 * The number of indexes that support parallel index bulk-deletion and
 	 * parallel index cleanup respectively.
 	 */
@@ -391,18 +412,6 @@ static int	lazy_vacuum_heap_page(LVRelState *vacrel, BlockNumber blkno,
 static bool lazy_check_needs_freeze(Buffer buf, bool *hastup,
 									LVRelState *vacrel);
 static bool lazy_check_wraparound_failsafe(LVRelState *vacrel);
-static void do_parallel_lazy_vacuum_all_indexes(LVRelState *vacrel);
-static void do_parallel_lazy_cleanup_all_indexes(LVRelState *vacrel);
-static void do_parallel_vacuum_or_cleanup(LVRelState *vacrel, int nworkers);
-static void do_parallel_processing(LVRelState *vacrel,
-								   LVShared *lvshared);
-static void do_serial_processing_for_unsafe_indexes(LVRelState *vacrel,
-													LVShared *lvshared);
-static IndexBulkDeleteResult *parallel_process_one_index(Relation indrel,
-														 IndexBulkDeleteResult *istat,
-														 LVShared *lvshared,
-														 LVSharedIndStats *shared_indstats,
-														 LVRelState *vacrel);
 static void lazy_cleanup_all_indexes(LVRelState *vacrel);
 static IndexBulkDeleteResult *lazy_vacuum_one_index(Relation indrel,
 													IndexBulkDeleteResult *istat,
@@ -425,14 +434,20 @@ static bool lazy_tid_reaped(ItemPointer itemptr, void *state);
 static int	vac_cmp_itemptr(const void *left, const void *right);
 static bool heap_page_is_all_visible(LVRelState *vacrel, Buffer buf,
 									 TransactionId *visibility_cutoff_xid, bool *all_frozen);
-static int	compute_parallel_vacuum_workers(LVRelState *vacrel,
-											int nrequested,
-											bool *will_parallel_vacuum);
 static void update_index_statistics(LVRelState *vacrel);
-static void begin_parallel_vacuum(LVRelState *vacrel, int nrequested);
-static void end_parallel_vacuum(LVRelState *vacrel);
-static LVSharedIndStats *parallel_stats_for_idx(LVShared *lvshared, int getidx);
-static bool parallel_processing_is_safe(Relation indrel, LVShared *lvshared);
+static int	parallel_vacuum_compute_workers(LVRelState *vacrel, int nrequested,
+											bool *will_parallel_vacuum);
+static void parallel_vacuum_begin(LVRelState *vacrel, int nrequested);
+static void parallel_vacuum_end(LVRelState *vacrel);
+static bool parallel_vacuum_index_is_parallel_safe(LVRelState *vacrel, Relation indrel,
+												   bool vacuum);
+static void parallel_vacuum_process_all_indexes(LVRelState *vacrel, bool vacuum);
+static void parallel_vacuum_process_safe_indexes(LVRelState *vacrel, LVShared *shared,
+												 LVParallelIndStats *pindstats);
+static void parallel_vacuum_process_unsafe_indexes(LVRelState *vacrel);
+static void parallel_vacuum_process_one_index(LVRelState *vacrel, Relation indrel,
+											  LVShared *shared,
+											  LVParallelIndStats *pindstats);
 static void vacuum_error_callback(void *arg);
 static void update_vacuum_error_info(LVRelState *vacrel,
 									 LVSavedErrInfo *saved_vacrel,
@@ -2237,7 +2252,7 @@ lazy_vacuum_all_indexes(LVRelState *vacrel)
 	else
 	{
 		/* Outsource everything to parallel variant */
-		do_parallel_lazy_vacuum_all_indexes(vacrel);
+		parallel_vacuum_process_all_indexes(vacrel, true);
 
 		/*
 		 * Do a postcheck to consider applying wraparound failsafe now.  Note
@@ -2611,465 +2626,120 @@ lazy_check_wraparound_failsafe(LVRelState *vacrel)
 }
 
 /*
- * Perform lazy_vacuum_all_indexes() steps in parallel
- */
-static void
-do_parallel_lazy_vacuum_all_indexes(LVRelState *vacrel)
-{
-	/* Tell parallel workers to do index vacuuming */
-	vacrel->lps->lvshared->for_cleanup = false;
-	vacrel->lps->lvshared->first_time = false;
-
-	/*
-	 * We can only provide an approximate value of num_heap_tuples, at least
-	 * for now.  Matches serial VACUUM case.
-	 */
-	vacrel->lps->lvshared->reltuples = vacrel->old_live_tuples;
-	vacrel->lps->lvshared->estimated_count = true;
-
-	do_parallel_vacuum_or_cleanup(vacrel,
-								  vacrel->lps->nindexes_parallel_bulkdel);
-}
-
-/*
- * Perform lazy_cleanup_all_indexes() steps in parallel
- */
-static void
-do_parallel_lazy_cleanup_all_indexes(LVRelState *vacrel)
-{
-	int			nworkers;
-
-	/*
-	 * If parallel vacuum is active we perform index cleanup with parallel
-	 * workers.
-	 *
-	 * Tell parallel workers to do index cleanup.
-	 */
-	vacrel->lps->lvshared->for_cleanup = true;
-	vacrel->lps->lvshared->first_time = (vacrel->num_index_scans == 0);
-
-	/*
-	 * Now we can provide a better estimate of total number of surviving
-	 * tuples (we assume indexes are more interested in that than in the
-	 * number of nominally live tuples).
-	 */
-	vacrel->lps->lvshared->reltuples = vacrel->new_rel_tuples;
-	vacrel->lps->lvshared->estimated_count =
-		(vacrel->tupcount_pages < vacrel->rel_pages);
-
-	/* Determine the number of parallel workers to launch */
-	if (vacrel->lps->lvshared->first_time)
-		nworkers = vacrel->lps->nindexes_parallel_cleanup +
-			vacrel->lps->nindexes_parallel_condcleanup;
-	else
-		nworkers = vacrel->lps->nindexes_parallel_cleanup;
-
-	do_parallel_vacuum_or_cleanup(vacrel, nworkers);
-}
-
-/*
- * Perform index vacuum or index cleanup with parallel workers.  This function
- * must be used by the parallel vacuum leader process.  The caller must set
- * lps->lvshared->for_cleanup to indicate whether to perform vacuum or
- * cleanup.
+ *	lazy_cleanup_all_indexes() -- cleanup all indexes of relation.
  */
 static void
-do_parallel_vacuum_or_cleanup(LVRelState *vacrel, int nworkers)
+lazy_cleanup_all_indexes(LVRelState *vacrel)
 {
-	LVParallelState *lps = vacrel->lps;
-
 	Assert(!IsParallelWorker());
-	Assert(ParallelVacuumIsActive(vacrel));
 	Assert(vacrel->nindexes > 0);
 
-	/* The leader process will participate */
-	nworkers--;
-
-	/*
-	 * It is possible that parallel context is initialized with fewer workers
-	 * than the number of indexes that need a separate worker in the current
-	 * phase, so we need to consider it.  See compute_parallel_vacuum_workers.
-	 */
-	nworkers = Min(nworkers, lps->pcxt->nworkers);
+	/* Report that we are now cleaning up indexes */
+	pgstat_progress_update_param(PROGRESS_VACUUM_PHASE,
+								 PROGRESS_VACUUM_PHASE_INDEX_CLEANUP);
 
-	/* Setup the shared cost-based vacuum delay and launch workers */
-	if (nworkers > 0)
+	if (!ParallelVacuumIsActive(vacrel))
 	{
-		if (vacrel->num_index_scans > 0)
-		{
-			/* Reset the parallel index processing counter */
-			pg_atomic_write_u32(&(lps->lvshared->idx), 0);
-
-			/* Reinitialize the parallel context to relaunch parallel workers */
-			ReinitializeParallelDSM(lps->pcxt);
-		}
-
-		/*
-		 * Set up shared cost balance and the number of active workers for
-		 * vacuum delay.  We need to do this before launching workers as
-		 * otherwise, they might not see the updated values for these
-		 * parameters.
-		 */
-		pg_atomic_write_u32(&(lps->lvshared->cost_balance), VacuumCostBalance);
-		pg_atomic_write_u32(&(lps->lvshared->active_nworkers), 0);
-
-		/*
-		 * The number of workers can vary between bulkdelete and cleanup
-		 * phase.
-		 */
-		ReinitializeParallelWorkers(lps->pcxt, nworkers);
-
-		LaunchParallelWorkers(lps->pcxt);
+		double		reltuples = vacrel->new_rel_tuples;
+		bool		estimated_count =
+		vacrel->tupcount_pages < vacrel->rel_pages;
 
-		if (lps->pcxt->nworkers_launched > 0)
+		for (int idx = 0; idx < vacrel->nindexes; idx++)
 		{
-			/*
-			 * Reset the local cost values for leader backend as we have
-			 * already accumulated the remaining balance of heap.
-			 */
-			VacuumCostBalance = 0;
-			VacuumCostBalanceLocal = 0;
+			Relation	indrel = vacrel->indrels[idx];
+			IndexBulkDeleteResult *istat = vacrel->indstats[idx];
 
-			/* Enable shared cost balance for leader backend */
-			VacuumSharedCostBalance = &(lps->lvshared->cost_balance);
-			VacuumActiveNWorkers = &(lps->lvshared->active_nworkers);
+			vacrel->indstats[idx] =
+				lazy_cleanup_one_index(indrel, istat, reltuples,
+									   estimated_count, vacrel);
 		}
-
-		if (lps->lvshared->for_cleanup)
-			ereport(elevel,
-					(errmsg(ngettext("launched %d parallel vacuum worker for index cleanup (planned: %d)",
-									 "launched %d parallel vacuum workers for index cleanup (planned: %d)",
-									 lps->pcxt->nworkers_launched),
-							lps->pcxt->nworkers_launched, nworkers)));
-		else
-			ereport(elevel,
-					(errmsg(ngettext("launched %d parallel vacuum worker for index vacuuming (planned: %d)",
-									 "launched %d parallel vacuum workers for index vacuuming (planned: %d)",
-									 lps->pcxt->nworkers_launched),
-							lps->pcxt->nworkers_launched, nworkers)));
-	}
-
-	/* Process the indexes that can be processed by only leader process */
-	do_serial_processing_for_unsafe_indexes(vacrel, lps->lvshared);
-
-	/*
-	 * Join as a parallel worker.  The leader process alone processes all the
-	 * indexes in the case where no workers are launched.
-	 */
-	do_parallel_processing(vacrel, lps->lvshared);
-
-	/*
-	 * Next, accumulate buffer and WAL usage.  (This must wait for the workers
-	 * to finish, or we might get incomplete data.)
-	 */
-	if (nworkers > 0)
-	{
-		/* Wait for all vacuum workers to finish */
-		WaitForParallelWorkersToFinish(lps->pcxt);
-
-		for (int i = 0; i < lps->pcxt->nworkers_launched; i++)
-			InstrAccumParallelQuery(&lps->buffer_usage[i], &lps->wal_usage[i]);
 	}
-
-	/*
-	 * Carry the shared balance value to heap scan and disable shared costing
-	 */
-	if (VacuumSharedCostBalance)
+	else
 	{
-		VacuumCostBalance = pg_atomic_read_u32(VacuumSharedCostBalance);
-		VacuumSharedCostBalance = NULL;
-		VacuumActiveNWorkers = NULL;
+		/* Outsource everything to parallel variant */
+		parallel_vacuum_process_all_indexes(vacrel, false);
 	}
 }
 
 /*
- * Index vacuum/cleanup routine used by the leader process and parallel
- * vacuum worker processes to process the indexes in parallel.
+ *	lazy_vacuum_one_index() -- vacuum index relation.
+ *
+ *		Delete all the index tuples containing a TID collected in
+ *		vacrel->dead_items array.  Also update running statistics.
+ *		Exact details depend on index AM's ambulkdelete routine.
+ *
+ *		reltuples is the number of heap tuples to be passed to the
+ *		bulkdelete callback.  It's always assumed to be estimated.
+ *		See indexam.sgml for more info.
+ *
+ * Returns bulk delete stats derived from input stats
  */
-static void
-do_parallel_processing(LVRelState *vacrel, LVShared *lvshared)
+static IndexBulkDeleteResult *
+lazy_vacuum_one_index(Relation indrel, IndexBulkDeleteResult *istat,
+					  double reltuples, LVRelState *vacrel)
 {
-	/*
-	 * Increment the active worker count if we are able to launch any worker.
-	 */
-	if (VacuumActiveNWorkers)
-		pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1);
-
-	/* Loop until all indexes are vacuumed */
-	for (;;)
-	{
-		int			idx;
-		LVSharedIndStats *shared_istat;
-		Relation	indrel;
-		IndexBulkDeleteResult *istat;
-
-		/* Get an index number to process */
-		idx = pg_atomic_fetch_add_u32(&(lvshared->idx), 1);
+	IndexVacuumInfo ivinfo;
+	PGRUsage	ru0;
+	LVSavedErrInfo saved_err_info;
 
-		/* Done for all indexes? */
-		if (idx >= vacrel->nindexes)
-			break;
+	pg_rusage_init(&ru0);
 
-		/* Get the index statistics space from DSM, if any */
-		shared_istat = parallel_stats_for_idx(lvshared, idx);
+	ivinfo.index = indrel;
+	ivinfo.analyze_only = false;
+	ivinfo.report_progress = false;
+	ivinfo.estimated_count = true;
+	ivinfo.message_level = elevel;
+	ivinfo.num_heap_tuples = reltuples;
+	ivinfo.strategy = vacrel->bstrategy;
 
-		/* Skip indexes not participating in parallelism */
-		if (shared_istat == NULL)
-			continue;
+	/*
+	 * Update error traceback information.
+	 *
+	 * The index name is saved during this phase and restored immediately
+	 * after this phase.  See vacuum_error_callback.
+	 */
+	Assert(vacrel->indname == NULL);
+	vacrel->indname = pstrdup(RelationGetRelationName(indrel));
+	update_vacuum_error_info(vacrel, &saved_err_info,
+							 VACUUM_ERRCB_PHASE_VACUUM_INDEX,
+							 InvalidBlockNumber, InvalidOffsetNumber);
 
-		indrel = vacrel->indrels[idx];
+	/* Do bulk deletion */
+	istat = index_bulk_delete(&ivinfo, istat, lazy_tid_reaped,
+							  (void *) vacrel->dead_items);
 
-		/*
-		 * Skip processing indexes that are unsafe for workers (these are
-		 * processed in do_serial_processing_for_unsafe_indexes() by leader)
-		 */
-		if (!parallel_processing_is_safe(indrel, lvshared))
-			continue;
+	ereport(elevel,
+			(errmsg("scanned index \"%s\" to remove %d row versions",
+					vacrel->indname, vacrel->dead_items->num_items),
+			 errdetail_internal("%s", pg_rusage_show(&ru0))));
 
-		/* Do vacuum or cleanup of the index */
-		istat = vacrel->indstats[idx];
-		vacrel->indstats[idx] = parallel_process_one_index(indrel, istat,
-														   lvshared,
-														   shared_istat,
-														   vacrel);
-	}
+	/* Revert to the previous phase information for error traceback */
+	restore_vacuum_error_info(vacrel, &saved_err_info);
+	pfree(vacrel->indname);
+	vacrel->indname = NULL;
 
-	/*
-	 * We have completed the index vacuum so decrement the active worker
-	 * count.
-	 */
-	if (VacuumActiveNWorkers)
-		pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1);
+	return istat;
 }
 
 /*
- * Perform parallel processing of indexes in leader process.
+ *	lazy_cleanup_one_index() -- do post-vacuum cleanup for index relation.
  *
- * Handles index vacuuming (or index cleanup) for indexes that are not
- * parallel safe.  It's possible that this will vary for a given index, based
- * on details like whether we're performing for_cleanup processing right now.
+ *		Calls index AM's amvacuumcleanup routine.  reltuples is the number
+ *		of heap tuples and estimated_count is true if reltuples is an
+ *		estimated value.  See indexam.sgml for more info.
  *
- * Also performs processing of smaller indexes that fell under the size cutoff
- * enforced by compute_parallel_vacuum_workers().  These indexes never get a
- * slot for statistics in DSM.
+ * Returns bulk delete stats derived from input stats
  */
-static void
-do_serial_processing_for_unsafe_indexes(LVRelState *vacrel, LVShared *lvshared)
+static IndexBulkDeleteResult *
+lazy_cleanup_one_index(Relation indrel, IndexBulkDeleteResult *istat,
+					   double reltuples, bool estimated_count,
+					   LVRelState *vacrel)
 {
-	Assert(!IsParallelWorker());
-
-	/*
-	 * Increment the active worker count if we are able to launch any worker.
-	 */
-	if (VacuumActiveNWorkers)
-		pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1);
+	IndexVacuumInfo ivinfo;
+	PGRUsage	ru0;
+	LVSavedErrInfo saved_err_info;
 
-	for (int idx = 0; idx < vacrel->nindexes; idx++)
-	{
-		LVSharedIndStats *shared_istat;
-		Relation	indrel;
-		IndexBulkDeleteResult *istat;
-
-		shared_istat = parallel_stats_for_idx(lvshared, idx);
-		indrel = vacrel->indrels[idx];
-
-		/*
-		 * We're only here for the indexes that parallel workers won't
-		 * process.  Note that the shared_istat test ensures that we process
-		 * indexes that fell under initial size cutoff.
-		 */
-		if (shared_istat != NULL &&
-			parallel_processing_is_safe(indrel, lvshared))
-			continue;
-
-		/* Do vacuum or cleanup of the index */
-		istat = vacrel->indstats[idx];
-		vacrel->indstats[idx] = parallel_process_one_index(indrel, istat,
-														   lvshared,
-														   shared_istat,
-														   vacrel);
-	}
-
-	/*
-	 * We have completed the index vacuum so decrement the active worker
-	 * count.
-	 */
-	if (VacuumActiveNWorkers)
-		pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1);
-}
-
-/*
- * Vacuum or cleanup index either by leader process or by one of the worker
- * process.  After processing the index this function copies the index
- * statistics returned from ambulkdelete and amvacuumcleanup to the DSM
- * segment.
- */
-static IndexBulkDeleteResult *
-parallel_process_one_index(Relation indrel,
-						   IndexBulkDeleteResult *istat,
-						   LVShared *lvshared,
-						   LVSharedIndStats *shared_istat,
-						   LVRelState *vacrel)
-{
-	IndexBulkDeleteResult *istat_res;
-
-	/*
-	 * Update the pointer to the corresponding bulk-deletion result if someone
-	 * has already updated it
-	 */
-	if (shared_istat && shared_istat->updated && istat == NULL)
-		istat = &shared_istat->istat;
-
-	/* Do vacuum or cleanup of the index */
-	if (lvshared->for_cleanup)
-		istat_res = lazy_cleanup_one_index(indrel, istat, lvshared->reltuples,
-										   lvshared->estimated_count, vacrel);
-	else
-		istat_res = lazy_vacuum_one_index(indrel, istat, lvshared->reltuples,
-										  vacrel);
-
-	/*
-	 * Copy the index bulk-deletion result returned from ambulkdelete and
-	 * amvacuumcleanup to the DSM segment if it's the first cycle because they
-	 * allocate locally and it's possible that an index will be vacuumed by a
-	 * different vacuum process the next cycle.  Copying the result normally
-	 * happens only the first time an index is vacuumed.  For any additional
-	 * vacuum pass, we directly point to the result on the DSM segment and
-	 * pass it to vacuum index APIs so that workers can update it directly.
-	 *
-	 * Since all vacuum workers write the bulk-deletion result at different
-	 * slots we can write them without locking.
-	 */
-	if (shared_istat && !shared_istat->updated && istat_res != NULL)
-	{
-		memcpy(&shared_istat->istat, istat_res, sizeof(IndexBulkDeleteResult));
-		shared_istat->updated = true;
-
-		/* Free the locally-allocated bulk-deletion result */
-		pfree(istat_res);
-
-		/* return the pointer to the result from shared memory */
-		return &shared_istat->istat;
-	}
-
-	return istat_res;
-}
-
-/*
- *	lazy_cleanup_all_indexes() -- cleanup all indexes of relation.
- */
-static void
-lazy_cleanup_all_indexes(LVRelState *vacrel)
-{
-	Assert(!IsParallelWorker());
-	Assert(vacrel->nindexes > 0);
-
-	/* Report that we are now cleaning up indexes */
-	pgstat_progress_update_param(PROGRESS_VACUUM_PHASE,
-								 PROGRESS_VACUUM_PHASE_INDEX_CLEANUP);
-
-	if (!ParallelVacuumIsActive(vacrel))
-	{
-		double		reltuples = vacrel->new_rel_tuples;
-		bool		estimated_count =
-		vacrel->tupcount_pages < vacrel->rel_pages;
-
-		for (int idx = 0; idx < vacrel->nindexes; idx++)
-		{
-			Relation	indrel = vacrel->indrels[idx];
-			IndexBulkDeleteResult *istat = vacrel->indstats[idx];
-
-			vacrel->indstats[idx] =
-				lazy_cleanup_one_index(indrel, istat, reltuples,
-									   estimated_count, vacrel);
-		}
-	}
-	else
-	{
-		/* Outsource everything to parallel variant */
-		do_parallel_lazy_cleanup_all_indexes(vacrel);
-	}
-}
-
-/*
- *	lazy_vacuum_one_index() -- vacuum index relation.
- *
- *		Delete all the index tuples containing a TID collected in
- *		vacrel->dead_items array.  Also update running statistics.
- *		Exact details depend on index AM's ambulkdelete routine.
- *
- *		reltuples is the number of heap tuples to be passed to the
- *		bulkdelete callback.  It's always assumed to be estimated.
- *		See indexam.sgml for more info.
- *
- * Returns bulk delete stats derived from input stats
- */
-static IndexBulkDeleteResult *
-lazy_vacuum_one_index(Relation indrel, IndexBulkDeleteResult *istat,
-					  double reltuples, LVRelState *vacrel)
-{
-	IndexVacuumInfo ivinfo;
-	PGRUsage	ru0;
-	LVSavedErrInfo saved_err_info;
-
-	pg_rusage_init(&ru0);
-
-	ivinfo.index = indrel;
-	ivinfo.analyze_only = false;
-	ivinfo.report_progress = false;
-	ivinfo.estimated_count = true;
-	ivinfo.message_level = elevel;
-	ivinfo.num_heap_tuples = reltuples;
-	ivinfo.strategy = vacrel->bstrategy;
-
-	/*
-	 * Update error traceback information.
-	 *
-	 * The index name is saved during this phase and restored immediately
-	 * after this phase.  See vacuum_error_callback.
-	 */
-	Assert(vacrel->indname == NULL);
-	vacrel->indname = pstrdup(RelationGetRelationName(indrel));
-	update_vacuum_error_info(vacrel, &saved_err_info,
-							 VACUUM_ERRCB_PHASE_VACUUM_INDEX,
-							 InvalidBlockNumber, InvalidOffsetNumber);
-
-	/* Do bulk deletion */
-	istat = index_bulk_delete(&ivinfo, istat, lazy_tid_reaped,
-							  (void *) vacrel->dead_items);
-
-	ereport(elevel,
-			(errmsg("scanned index \"%s\" to remove %d row versions",
-					vacrel->indname, vacrel->dead_items->num_items),
-			 errdetail_internal("%s", pg_rusage_show(&ru0))));
-
-	/* Revert to the previous phase information for error traceback */
-	restore_vacuum_error_info(vacrel, &saved_err_info);
-	pfree(vacrel->indname);
-	vacrel->indname = NULL;
-
-	return istat;
-}
-
-/*
- *	lazy_cleanup_one_index() -- do post-vacuum cleanup for index relation.
- *
- *		Calls index AM's amvacuumcleanup routine.  reltuples is the number
- *		of heap tuples and estimated_count is true if reltuples is an
- *		estimated value.  See indexam.sgml for more info.
- *
- * Returns bulk delete stats derived from input stats
- */
-static IndexBulkDeleteResult *
-lazy_cleanup_one_index(Relation indrel, IndexBulkDeleteResult *istat,
-					   double reltuples, bool estimated_count,
-					   LVRelState *vacrel)
-{
-	IndexVacuumInfo ivinfo;
-	PGRUsage	ru0;
-	LVSavedErrInfo saved_err_info;
-
-	pg_rusage_init(&ru0);
+	pg_rusage_init(&ru0);
 
 	ivinfo.index = indrel;
 	ivinfo.analyze_only = false;
@@ -3520,7 +3190,7 @@ dead_items_alloc(LVRelState *vacrel, int nworkers)
 								vacrel->relname)));
 		}
 		else
-			begin_parallel_vacuum(vacrel, nworkers);
+			parallel_vacuum_begin(vacrel, nworkers);
 
 		/* If parallel mode started, vacrel->dead_items allocated in DSM */
 		if (ParallelVacuumIsActive(vacrel))
@@ -3552,7 +3222,7 @@ dead_items_cleanup(LVRelState *vacrel)
 	 * End parallel mode before updating index statistics as we cannot write
 	 * during parallel mode.
 	 */
-	end_parallel_vacuum(vacrel);
+	parallel_vacuum_end(vacrel);
 }
 
 /*
@@ -3745,6 +3415,38 @@ heap_page_is_all_visible(LVRelState *vacrel, Buffer buf,
 }
 
 /*
+ * Update index statistics in pg_class if the statistics are accurate.
+ */
+static void
+update_index_statistics(LVRelState *vacrel)
+{
+	Relation   *indrels = vacrel->indrels;
+	int			nindexes = vacrel->nindexes;
+	IndexBulkDeleteResult **indstats = vacrel->indstats;
+
+	Assert(!IsInParallelMode());
+
+	for (int idx = 0; idx < nindexes; idx++)
+	{
+		Relation	indrel = indrels[idx];
+		IndexBulkDeleteResult *istat = indstats[idx];
+
+		if (istat == NULL || istat->estimated_count)
+			continue;
+
+		/* Update index statistics */
+		vac_update_relstats(indrel,
+							istat->num_pages,
+							istat->num_index_tuples,
+							0,
+							false,
+							InvalidTransactionId,
+							InvalidMultiXactId,
+							false);
+	}
+}
+
+/*
  * Compute the number of parallel worker processes to request.  Both index
  * vacuum and index cleanup can be executed with parallel workers.  The index
  * is eligible for parallel vacuum iff its size is greater than
@@ -3758,7 +3460,7 @@ heap_page_is_all_visible(LVRelState *vacrel, Buffer buf,
  * vacuum.
  */
 static int
-compute_parallel_vacuum_workers(LVRelState *vacrel, int nrequested,
+parallel_vacuum_compute_workers(LVRelState *vacrel, int nrequested,
 								bool *will_parallel_vacuum)
 {
 	int			nindexes_parallel = 0;
@@ -3781,6 +3483,7 @@ compute_parallel_vacuum_workers(LVRelState *vacrel, int nrequested,
 		Relation	indrel = vacrel->indrels[idx];
 		uint8		vacoptions = indrel->rd_indam->amparallelvacuumoptions;
 
+		/* Skip index that is not a suitable target for parallel index vacuum */
 		if (vacoptions == VACUUM_OPTION_NO_PARALLEL ||
 			RelationGetNumberOfBlocks(indrel) < min_parallel_index_scan_size)
 			continue;
@@ -3815,38 +3518,6 @@ compute_parallel_vacuum_workers(LVRelState *vacrel, int nrequested,
 }
 
 /*
- * Update index statistics in pg_class if the statistics are accurate.
- */
-static void
-update_index_statistics(LVRelState *vacrel)
-{
-	Relation   *indrels = vacrel->indrels;
-	int			nindexes = vacrel->nindexes;
-	IndexBulkDeleteResult **indstats = vacrel->indstats;
-
-	Assert(!IsInParallelMode());
-
-	for (int idx = 0; idx < nindexes; idx++)
-	{
-		Relation	indrel = indrels[idx];
-		IndexBulkDeleteResult *istat = indstats[idx];
-
-		if (istat == NULL || istat->estimated_count)
-			continue;
-
-		/* Update index statistics */
-		vac_update_relstats(indrel,
-							istat->num_pages,
-							istat->num_index_tuples,
-							0,
-							false,
-							InvalidTransactionId,
-							InvalidMultiXactId,
-							false);
-	}
-}
-
-/*
  * Try to enter parallel mode and create a parallel context.  Then initialize
  * shared memory state.
  *
@@ -3855,7 +3526,7 @@ update_index_statistics(LVRelState *vacrel)
  * VACUUM is currently active.
  */
 static void
-begin_parallel_vacuum(LVRelState *vacrel, int nrequested)
+parallel_vacuum_begin(LVRelState *vacrel, int nrequested)
 {
 	LVParallelState *lps;
 	Relation   *indrels = vacrel->indrels;
@@ -3863,10 +3534,12 @@ begin_parallel_vacuum(LVRelState *vacrel, int nrequested)
 	ParallelContext *pcxt;
 	LVShared   *shared;
 	LVDeadItems *dead_items;
+	LVParallelIndStats *pindstats;
 	BufferUsage *buffer_usage;
 	WalUsage   *wal_usage;
 	bool	   *will_parallel_vacuum;
 	int			max_items;
+	Size		est_pindstats_len;
 	Size		est_shared_len;
 	Size		est_dead_items_len;
 	int			nindexes_mwm = 0;
@@ -3884,8 +3557,7 @@ begin_parallel_vacuum(LVRelState *vacrel, int nrequested)
 	 * Compute the number of parallel vacuum workers to launch
 	 */
 	will_parallel_vacuum = (bool *) palloc0(sizeof(bool) * nindexes);
-	parallel_workers = compute_parallel_vacuum_workers(vacrel,
-													   nrequested,
+	parallel_workers = parallel_vacuum_compute_workers(vacrel, nrequested,
 													   will_parallel_vacuum);
 	if (parallel_workers <= 0)
 	{
@@ -3901,50 +3573,23 @@ begin_parallel_vacuum(LVRelState *vacrel, int nrequested)
 								 parallel_workers);
 	Assert(pcxt->nworkers > 0);
 	lps->pcxt = pcxt;
+	lps->will_parallel_vacuum = will_parallel_vacuum;
 
-	/* Estimate size for shared information -- PARALLEL_VACUUM_KEY_SHARED */
-	est_shared_len = MAXALIGN(add_size(SizeOfLVShared, BITMAPLEN(nindexes)));
-	for (int idx = 0; idx < nindexes; idx++)
-	{
-		Relation	indrel = indrels[idx];
-		uint8		vacoptions = indrel->rd_indam->amparallelvacuumoptions;
+	/* Estimate size for index vacuum stats -- PARALLEL_VACUUM_KEY_STATS */
+	est_pindstats_len = mul_size(sizeof(LVParallelIndStats), nindexes);
+	shm_toc_estimate_chunk(&pcxt->estimator, est_pindstats_len);
+	shm_toc_estimate_keys(&pcxt->estimator, 1);
 
-		/*
-		 * Cleanup option should be either disabled, always performing in
-		 * parallel or conditionally performing in parallel.
-		 */
-		Assert(((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) ||
-			   ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0));
-		Assert(vacoptions <= VACUUM_OPTION_MAX_VALID_VALUE);
+	/* Estimate size for shared information -- PARALLEL_VACUUM_KEY_SHARED */
+	est_shared_len = sizeof(LVShared);
+	shm_toc_estimate_chunk(&pcxt->estimator, est_shared_len);
+	shm_toc_estimate_keys(&pcxt->estimator, 1);
 
-		/* Skip indexes that don't participate in parallel vacuum */
-		if (!will_parallel_vacuum[idx])
-			continue;
-
-		if (indrel->rd_indam->amusemaintenanceworkmem)
-			nindexes_mwm++;
-
-		est_shared_len = add_size(est_shared_len, sizeof(LVSharedIndStats));
-
-		/*
-		 * Remember the number of indexes that support parallel operation for
-		 * each phase.
-		 */
-		if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0)
-			lps->nindexes_parallel_bulkdel++;
-		if ((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0)
-			lps->nindexes_parallel_cleanup++;
-		if ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0)
-			lps->nindexes_parallel_condcleanup++;
-	}
-	shm_toc_estimate_chunk(&pcxt->estimator, est_shared_len);
-	shm_toc_estimate_keys(&pcxt->estimator, 1);
-
-	/* Estimate size for dead_items -- PARALLEL_VACUUM_KEY_DEAD_ITEMS */
-	max_items = dead_items_max_items(vacrel);
-	est_dead_items_len = MAXALIGN(max_items_to_alloc_size(max_items));
-	shm_toc_estimate_chunk(&pcxt->estimator, est_dead_items_len);
-	shm_toc_estimate_keys(&pcxt->estimator, 1);
+	/* Estimate size for dead_items -- PARALLEL_VACUUM_KEY_DEAD_ITEMS */
+	max_items = dead_items_max_items(vacrel);
+	est_dead_items_len = max_items_to_alloc_size(max_items);
+	shm_toc_estimate_chunk(&pcxt->estimator, est_dead_items_len);
+	shm_toc_estimate_keys(&pcxt->estimator, 1);
 
 	/*
 	 * Estimate space for BufferUsage and WalUsage --
@@ -3973,6 +3618,41 @@ begin_parallel_vacuum(LVRelState *vacrel, int nrequested)
 
 	InitializeParallelDSM(pcxt);
 
+	/* Prepare index vacuum stats */
+	pindstats = (LVParallelIndStats *) shm_toc_allocate(pcxt->toc, est_pindstats_len);
+	for (int idx = 0; idx < nindexes; idx++)
+	{
+		Relation	indrel = indrels[idx];
+		uint8		vacoptions = indrel->rd_indam->amparallelvacuumoptions;
+
+		/*
+		 * Cleanup option should be either disabled, always performing in
+		 * parallel or conditionally performing in parallel.
+		 */
+		Assert(((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) ||
+			   ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0));
+		Assert(vacoptions <= VACUUM_OPTION_MAX_VALID_VALUE);
+
+		if (!will_parallel_vacuum[idx])
+			continue;
+
+		if (indrel->rd_indam->amusemaintenanceworkmem)
+			nindexes_mwm++;
+
+		/*
+		 * Remember the number of indexes that support parallel operation for
+		 * each phase.
+		 */
+		if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0)
+			lps->nindexes_parallel_bulkdel++;
+		if ((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0)
+			lps->nindexes_parallel_cleanup++;
+		if ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0)
+			lps->nindexes_parallel_condcleanup++;
+	}
+	shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_INDEX_STATS, pindstats);
+	lps->lvpindstats = pindstats;
+
 	/* Prepare shared information */
 	shared = (LVShared *) shm_toc_allocate(pcxt->toc, est_shared_len);
 	MemSet(shared, 0, est_shared_len);
@@ -3986,21 +3666,6 @@ begin_parallel_vacuum(LVRelState *vacrel, int nrequested)
 	pg_atomic_init_u32(&(shared->cost_balance), 0);
 	pg_atomic_init_u32(&(shared->active_nworkers), 0);
 	pg_atomic_init_u32(&(shared->idx), 0);
-	shared->offset = MAXALIGN(add_size(SizeOfLVShared, BITMAPLEN(nindexes)));
-
-	/*
-	 * Initialize variables for shared index statistics, set NULL bitmap and
-	 * the size of stats for each index.
-	 */
-	memset(shared->bitmap, 0x00, BITMAPLEN(nindexes));
-	for (int idx = 0; idx < nindexes; idx++)
-	{
-		if (!will_parallel_vacuum[idx])
-			continue;
-
-		/* Set NOT NULL as this index does support parallelism */
-		shared->bitmap[idx >> 3] |= 1 << (idx & 0x07);
-	}
 
 	shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_SHARED, shared);
 	lps->lvshared = shared;
@@ -4038,8 +3703,6 @@ begin_parallel_vacuum(LVRelState *vacrel, int nrequested)
 					   PARALLEL_VACUUM_KEY_QUERY_TEXT, sharedquery);
 	}
 
-	pfree(will_parallel_vacuum);
-
 	/* Success -- set dead_items and lps in leader's vacrel state */
 	vacrel->dead_items = dead_items;
 	vacrel->lps = lps;
@@ -4055,7 +3718,7 @@ begin_parallel_vacuum(LVRelState *vacrel, int nrequested)
  * context, but that won't be safe (see ExitParallelMode).
  */
 static void
-end_parallel_vacuum(LVRelState *vacrel)
+parallel_vacuum_end(LVRelState *vacrel)
 {
 	IndexBulkDeleteResult **indstats = vacrel->indstats;
 	LVParallelState *lps = vacrel->lps;
@@ -4066,21 +3729,12 @@ end_parallel_vacuum(LVRelState *vacrel)
 	/* Copy the updated statistics */
 	for (int idx = 0; idx < nindexes; idx++)
 	{
-		LVSharedIndStats *shared_istat;
-
-		shared_istat = parallel_stats_for_idx(lps->lvshared, idx);
-
-		/*
-		 * Skip index -- it must have been processed by the leader, from
-		 * inside do_serial_processing_for_unsafe_indexes()
-		 */
-		if (shared_istat == NULL)
-			continue;
+		LVParallelIndStats *pindstats = &(lps->lvpindstats[idx]);
 
-		if (shared_istat->updated)
+		if (pindstats->istat_updated)
 		{
 			indstats[idx] = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
-			memcpy(indstats[idx], &shared_istat->istat, sizeof(IndexBulkDeleteResult));
+			memcpy(indstats[idx], &pindstats->istat, sizeof(IndexBulkDeleteResult));
 		}
 		else
 			indstats[idx] = NULL;
@@ -4090,74 +3744,385 @@ end_parallel_vacuum(LVRelState *vacrel)
 	ExitParallelMode();
 
 	/* Deactivate parallel vacuum */
+	pfree(lps->will_parallel_vacuum);
 	pfree(lps);
 	vacrel->lps = NULL;
 }
 
 /*
- * Return shared memory statistics for index at offset 'getidx', if any
- *
- * Returning NULL indicates that compute_parallel_vacuum_workers() determined
- * that the index is a totally unsuitable target for all parallel processing
- * up front.  For example, the index could be < min_parallel_index_scan_size
- * cutoff.
+ * Returns false, if the given index can't participate in the next execution of
+ * parallel index vacuum or parallel index cleanup.
  */
-static LVSharedIndStats *
-parallel_stats_for_idx(LVShared *lvshared, int getidx)
+static bool
+parallel_vacuum_index_is_parallel_safe(LVRelState *vacrel, Relation indrel,
+									   bool vacuum)
 {
-	char	   *p;
+	uint8		vacoptions;
 
-	if (IndStatsIsNull(lvshared, getidx))
-		return NULL;
+	vacoptions = indrel->rd_indam->amparallelvacuumoptions;
+
+	/* In parallel vacuum case, check if it supports parallel bulk-deletion */
+	if (vacuum)
+		return ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0);
+
+	/* Not safe, if the index does not support parallel cleanup */
+	if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) &&
+		((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0))
+		return false;
+
+	/*
+	 * Not safe, if the index supports parallel cleanup conditionally, but we
+	 * have already processed the index (for bulkdelete).  We do this to avoid
+	 * the need to invoke workers when parallel index cleanup doesn't need to
+	 * scan the index.  See the comments for option
+	 * VACUUM_OPTION_PARALLEL_COND_CLEANUP to know when indexes support
+	 * parallel cleanup conditionally.
+	 */
+	if (vacrel->num_index_scans > 0 &&
+		((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0))
+		return false;
+
+	return true;
+}
 
-	p = (char *) GetSharedIndStats(lvshared);
-	for (int idx = 0; idx < getidx; idx++)
+/*
+ * Perform index vacuum or index cleanup with parallel workers.  This function
+ * must be used by the parallel vacuum leader process.
+ */
+static void
+parallel_vacuum_process_all_indexes(LVRelState *vacrel, bool vacuum)
+{
+	LVParallelState *lps = vacrel->lps;
+	LVParallelIndVacStatus new_status;
+	int			nworkers;
+
+	Assert(!IsParallelWorker());
+	Assert(ParallelVacuumIsActive(vacrel));
+	Assert(vacrel->nindexes > 0);
+
+	if (vacuum)
 	{
-		if (IndStatsIsNull(lvshared, idx))
-			continue;
+		/*
+		 * We can only provide an approximate value of num_heap_tuples, at
+		 * least for now.  Matches serial VACUUM case.
+		 */
+		vacrel->lps->lvshared->reltuples = vacrel->old_live_tuples;
+		vacrel->lps->lvshared->estimated_count = true;
+
+		new_status = PARALLEL_INDVAC_STATUS_NEED_BULKDELETE;
+
+		/* Determine the number of parallel workers to launch */
+		nworkers = vacrel->lps->nindexes_parallel_bulkdel;
+	}
+	else
+	{
+		/*
+		 * We can provide a better estimate of total number of surviving
+		 * tuples (we assume indexes are more interested in that than in the
+		 * number of nominally live tuples).
+		 */
+		vacrel->lps->lvshared->reltuples = vacrel->new_rel_tuples;
+		vacrel->lps->lvshared->estimated_count =
+			(vacrel->tupcount_pages < vacrel->rel_pages);
+
+		new_status = PARALLEL_INDVAC_STATUS_NEED_CLEANUP;
+
+		/* Determine the number of parallel workers to launch */
+		nworkers = vacrel->lps->nindexes_parallel_cleanup;
+
+		/* Add conditionally parallel-aware indexes if in the first time call */
+		if (vacrel->num_index_scans == 0)
+			nworkers += vacrel->lps->nindexes_parallel_condcleanup;
+	}
+
+	/* The leader process will participate */
+	nworkers--;
 
-		p += sizeof(LVSharedIndStats);
+	/*
+	 * It is possible that parallel context is initialized with fewer workers
+	 * than the number of indexes that need a separate worker in the current
+	 * phase, so we need to consider it.  See
+	 * parallel_vacuum_compute_workers().
+	 */
+	nworkers = Min(nworkers, lps->pcxt->nworkers);
+
+	/*
+	 * Set index vacuum status and mark whether parallel vacuum worker can
+	 * process it.
+	 */
+	for (int i = 0; i < vacrel->nindexes; i++)
+	{
+		LVParallelIndStats *pindstats = &(vacrel->lps->lvpindstats[i]);
+
+		Assert(pindstats->status == PARALLEL_INDVAC_STATUS_INITIAL);
+		pindstats->status = new_status;
+		pindstats->parallel_workers_can_process =
+			(lps->will_parallel_vacuum[i] &
+			 parallel_vacuum_index_is_parallel_safe(vacrel, vacrel->indrels[i],
+													vacuum));
 	}
 
-	return (LVSharedIndStats *) p;
+	/* Reset the parallel index processing counter */
+	pg_atomic_write_u32(&(lps->lvshared->idx), 0);
+
+	/* Setup the shared cost-based vacuum delay and launch workers */
+	if (nworkers > 0)
+	{
+		/* Reinitialize parallel context to relaunch parallel workers */
+		if (vacrel->num_index_scans > 0)
+			ReinitializeParallelDSM(lps->pcxt);
+
+		/*
+		 * Set up shared cost balance and the number of active workers for
+		 * vacuum delay.  We need to do this before launching workers as
+		 * otherwise, they might not see the updated values for these
+		 * parameters.
+		 */
+		pg_atomic_write_u32(&(lps->lvshared->cost_balance), VacuumCostBalance);
+		pg_atomic_write_u32(&(lps->lvshared->active_nworkers), 0);
+
+		/*
+		 * The number of workers can vary between bulkdelete and cleanup
+		 * phase.
+		 */
+		ReinitializeParallelWorkers(lps->pcxt, nworkers);
+
+		LaunchParallelWorkers(lps->pcxt);
+
+		if (lps->pcxt->nworkers_launched > 0)
+		{
+			/*
+			 * Reset the local cost values for leader backend as we have
+			 * already accumulated the remaining balance of heap.
+			 */
+			VacuumCostBalance = 0;
+			VacuumCostBalanceLocal = 0;
+
+			/* Enable shared cost balance for leader backend */
+			VacuumSharedCostBalance = &(lps->lvshared->cost_balance);
+			VacuumActiveNWorkers = &(lps->lvshared->active_nworkers);
+		}
+
+		if (vacuum)
+			ereport(elevel,
+					(errmsg(ngettext("launched %d parallel vacuum worker for index vacuuming (planned: %d)",
+									 "launched %d parallel vacuum workers for index vacuuming (planned: %d)",
+									 lps->pcxt->nworkers_launched),
+							lps->pcxt->nworkers_launched, nworkers)));
+		else
+			ereport(elevel,
+					(errmsg(ngettext("launched %d parallel vacuum worker for index cleanup (planned: %d)",
+									 "launched %d parallel vacuum workers for index cleanup (planned: %d)",
+									 lps->pcxt->nworkers_launched),
+							lps->pcxt->nworkers_launched, nworkers)));
+	}
+
+	/* Process the indexes that can be processed by only leader process */
+	parallel_vacuum_process_unsafe_indexes(vacrel);
+
+	/*
+	 * Join as a parallel worker.  The leader process alone processes all
+	 * parallel-safe indexes in the case where no workers are launched.
+	 */
+	parallel_vacuum_process_safe_indexes(vacrel, lps->lvshared, lps->lvpindstats);
+
+	/*
+	 * Next, accumulate buffer and WAL usage.  (This must wait for the workers
+	 * to finish, or we might get incomplete data.)
+	 */
+	if (nworkers > 0)
+	{
+		/* Wait for all vacuum workers to finish */
+		WaitForParallelWorkersToFinish(lps->pcxt);
+
+		for (int i = 0; i < lps->pcxt->nworkers_launched; i++)
+			InstrAccumParallelQuery(&lps->buffer_usage[i], &lps->wal_usage[i]);
+	}
+
+	/*
+	 * Reset all index status back to initial (while checking that we have
+	 * processed all indexes).
+	 */
+	for (int i = 0; i < vacrel->nindexes; i++)
+	{
+		LVParallelIndStats *pindstats = &(lps->lvpindstats[i]);
+
+		if (pindstats->status != PARALLEL_INDVAC_STATUS_COMPLETED)
+			elog(ERROR, "parallel index vacuum on index \"%s\" is not completed",
+				 RelationGetRelationName(vacrel->indrels[i]));
+
+		pindstats->status = PARALLEL_INDVAC_STATUS_INITIAL;
+	}
+
+	/*
+	 * Carry the shared balance value to heap scan and disable shared costing
+	 */
+	if (VacuumSharedCostBalance)
+	{
+		VacuumCostBalance = pg_atomic_read_u32(VacuumSharedCostBalance);
+		VacuumSharedCostBalance = NULL;
+		VacuumActiveNWorkers = NULL;
+	}
 }
 
 /*
- * Returns false, if the given index can't participate in parallel index
- * vacuum or parallel index cleanup
+ * Index vacuum/cleanup routine used by the leader process and parallel
+ * vacuum worker processes to process the indexes in parallel.
  */
-static bool
-parallel_processing_is_safe(Relation indrel, LVShared *lvshared)
+static void
+parallel_vacuum_process_safe_indexes(LVRelState *vacrel, LVShared *shared,
+									 LVParallelIndStats *pindstats)
 {
-	uint8		vacoptions = indrel->rd_indam->amparallelvacuumoptions;
-
-	/* first_time must be true only if for_cleanup is true */
-	Assert(lvshared->for_cleanup || !lvshared->first_time);
+	/*
+	 * Increment the active worker count if we are able to launch any worker.
+	 */
+	if (VacuumActiveNWorkers)
+		pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1);
 
-	if (lvshared->for_cleanup)
+	/* Loop until all indexes are vacuumed */
+	for (;;)
 	{
-		/* Skip, if the index does not support parallel cleanup */
-		if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) &&
-			((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0))
-			return false;
+		int			idx;
+		LVParallelIndStats *pis;
+
+		/* Get an index number to process */
+		idx = pg_atomic_fetch_add_u32(&(shared->idx), 1);
+
+		/* Done for all indexes? */
+		if (idx >= vacrel->nindexes)
+			break;
+
+		pis = &(pindstats[idx]);
 
 		/*
-		 * Skip, if the index supports parallel cleanup conditionally, but we
-		 * have already processed the index (for bulkdelete).  See the
-		 * comments for option VACUUM_OPTION_PARALLEL_COND_CLEANUP to know
-		 * when indexes support parallel cleanup conditionally.
+		 * Skip processing index that is unsafe for workers or has an
+		 * unsuitable target for parallel index vacuum (this is processed in
+		 * parallel_vacuum_process_unsafe_indexes() by the leader).
 		 */
-		if (!lvshared->first_time &&
-			((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0))
-			return false;
+		if (!pis->parallel_workers_can_process)
+			continue;
+
+		/* Do vacuum or cleanup of the index */
+		parallel_vacuum_process_one_index(vacrel, vacrel->indrels[idx],
+										  shared, pis);
+	}
+
+	/*
+	 * We have completed the index vacuum so decrement the active worker
+	 * count.
+	 */
+	if (VacuumActiveNWorkers)
+		pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1);
+}
+
+/*
+ * Perform parallel processing of indexes in leader process.
+ *
+ * Handles index vacuuming (or index cleanup) for indexes that are not
+ * parallel safe.  It's possible that this will vary for a given index, based
+ * on details like whether we're performing index cleanup right now.
+ *
+ * Also performs processing of smaller indexes that fell under the size cutoff
+ * enforced by parallel_vacuum_compute_workers().
+ */
+static void
+parallel_vacuum_process_unsafe_indexes(LVRelState *vacrel)
+{
+	LVParallelState *lps = vacrel->lps;
+
+	Assert(!IsParallelWorker());
+
+	/*
+	 * Increment the active worker count if we are able to launch any worker.
+	 */
+	if (VacuumActiveNWorkers)
+		pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1);
+
+	for (int idx = 0; idx < vacrel->nindexes; idx++)
+	{
+		LVParallelIndStats *pindstats = &(lps->lvpindstats[idx]);
+
+		/* Skip, indexes that are safe for workers */
+		if (pindstats->parallel_workers_can_process)
+			continue;
+
+		/* Do vacuum or cleanup of the index */
+		parallel_vacuum_process_one_index(vacrel, vacrel->indrels[idx],
+										  lps->lvshared, pindstats);
 	}
-	else if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) == 0)
+
+	/*
+	 * We have completed the index vacuum so decrement the active worker
+	 * count.
+	 */
+	if (VacuumActiveNWorkers)
+		pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1);
+}
+
+/*
+ * Vacuum or cleanup index either by leader process or by one of the worker
+ * process.  After processing the index this function copies the index
+ * statistics returned from ambulkdelete and amvacuumcleanup to the DSM
+ * segment.
+ */
+static void
+parallel_vacuum_process_one_index(LVRelState *vacrel, Relation indrel,
+								  LVShared *shared, LVParallelIndStats *pindstats)
+{
+	IndexBulkDeleteResult *istat = NULL;
+	IndexBulkDeleteResult *istat_res;
+
+	/*
+	 * Update the pointer to the corresponding bulk-deletion result if someone
+	 * has already updated it
+	 */
+	if (pindstats->istat_updated)
+		istat = &(pindstats->istat);
+
+	switch (pindstats->status)
 	{
-		/* Skip if the index does not support parallel bulk deletion */
-		return false;
+		case PARALLEL_INDVAC_STATUS_NEED_BULKDELETE:
+			istat_res = lazy_vacuum_one_index(indrel, istat,
+											  shared->reltuples, vacrel);
+			break;
+		case PARALLEL_INDVAC_STATUS_NEED_CLEANUP:
+			istat_res = lazy_cleanup_one_index(indrel, istat,
+											   shared->reltuples,
+											   shared->estimated_count,
+											   vacrel);
+			break;
+		default:
+			elog(ERROR, "unexpected parallel vacuum index status %d for index \"%s\"",
+				 pindstats->status,
+				 RelationGetRelationName(indrel));
 	}
 
-	return true;
+	/*
+	 * Copy the index bulk-deletion result returned from ambulkdelete and
+	 * amvacuumcleanup to the DSM segment if it's the first cycle because they
+	 * allocate locally and it's possible that an index will be vacuumed by a
+	 * different vacuum process the next cycle.  Copying the result normally
+	 * happens only the first time an index is vacuumed.  For any additional
+	 * vacuum pass, we directly point to the result on the DSM segment and
+	 * pass it to vacuum index APIs so that workers can update it directly.
+	 *
+	 * Since all vacuum workers write the bulk-deletion result at different
+	 * slots we can write them without locking.
+	 */
+	if (!pindstats->istat_updated && istat_res != NULL)
+	{
+		memcpy(&(pindstats->istat), istat_res, sizeof(IndexBulkDeleteResult));
+		pindstats->istat_updated = true;
+
+		/* Free the locally-allocated bulk-deletion result */
+		pfree(istat_res);
+	}
+
+	/*
+	 * Update the status to completed. No need to lock here since each worker
+	 * touches different indexes.
+	 */
+	pindstats->status = PARALLEL_INDVAC_STATUS_COMPLETED;
 }
 
 /*
@@ -4171,6 +4136,7 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
 {
 	Relation	rel;
 	Relation   *indrels;
+	LVParallelIndStats *lvpindstats;
 	LVShared   *lvshared;
 	LVDeadItems *dead_items;
 	BufferUsage *buffer_usage;
@@ -4190,10 +4156,7 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
 										   false);
 	elevel = lvshared->elevel;
 
-	if (lvshared->for_cleanup)
-		elog(DEBUG1, "starting parallel vacuum worker for cleanup");
-	else
-		elog(DEBUG1, "starting parallel vacuum worker for bulk delete");
+	elog(DEBUG1, "starting parallel vacuum worker");
 
 	/* Set debug_query_string for individual workers */
 	sharedquery = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_QUERY_TEXT, true);
@@ -4214,6 +4177,11 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
 	vac_open_indexes(rel, RowExclusiveLock, &nindexes, &indrels);
 	Assert(nindexes > 0);
 
+	/* Set index statistics */
+	lvpindstats = (LVParallelIndStats *) shm_toc_lookup(toc,
+														PARALLEL_VACUUM_KEY_INDEX_STATS,
+														false);
+
 	/* Set dead_items space (set as worker's vacrel dead_items below) */
 	dead_items = (LVDeadItems *) shm_toc_lookup(toc,
 												PARALLEL_VACUUM_KEY_DEAD_ITEMS,
@@ -4259,7 +4227,7 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
 	InstrStartParallelQuery();
 
 	/* Process indexes to perform vacuum/cleanup */
-	do_parallel_processing(&vacrel, lvshared);
+	parallel_vacuum_process_safe_indexes(&vacrel, lvshared, lvpindstats);
 
 	/* Report buffer/WAL usage during parallel execution */
 	buffer_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, false);
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index f41ef0d..0c61ccb 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1307,6 +1307,8 @@ LSEG
 LUID
 LVDeadTuples
 LVPagePruneState
+LVParallelIndStats
+LVParallelIndVacStatus
 LVParallelState
 LVRelState
 LVSavedErrInfo
-- 
1.8.3.1

