diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c
index 558cc88a08..8cde368a10 100644
--- a/src/backend/access/heap/vacuumlazy.c
+++ b/src/backend/access/heap/vacuumlazy.c
@@ -24,18 +24,9 @@
  *
  * Lazy vacuum supports parallel execution with parallel worker processes.  In
  * a parallel vacuum, we perform both index vacuum and index cleanup with
- * parallel worker processes.  Individual indexes are processed by one vacuum
- * process.  At the beginning of a lazy vacuum (at lazy_scan_heap) we prepare
- * the parallel context and initialize the DSM segment that contains shared
- * information as well as the memory space for storing dead tuples.  When
- * starting either index vacuum or index cleanup, we launch parallel worker
- * processes.  Once all indexes are processed the parallel worker processes
- * exit.  After that, the leader process re-initializes the parallel context
- * so that it can use the same DSM for multiple passes of index vacuum and
- * for performing index cleanup.  For updating the index statistics, we need
- * to update the system table and since updates are not allowed during
- * parallel mode we update the index statistics after exiting from the
- * parallel mode.
+ * parallel worker processes.  For updating the index statistics, we need to
+ * update the system table and since updates are not allowed during parallel
+ * mode we update the index statistics after exiting from the parallel mode.
  *
  * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
@@ -143,22 +134,11 @@
  */
 #define PREFETCH_SIZE			((BlockNumber) 32)
 
-/*
- * DSM keys for parallel vacuum.  Unlike other parallel execution code, since
- * we don't need to worry about DSM keys conflicting with plan_node_id we can
- * use small integers.
- */
-#define PARALLEL_VACUUM_KEY_SHARED			1
-#define PARALLEL_VACUUM_KEY_DEAD_TUPLES		2
-#define PARALLEL_VACUUM_KEY_QUERY_TEXT		3
-#define PARALLEL_VACUUM_KEY_BUFFER_USAGE	4
-#define PARALLEL_VACUUM_KEY_WAL_USAGE		5
-
 /*
  * Macro to check if we are in a parallel vacuum.  If true, we are in the
  * parallel mode and the DSM segment is initialized.
  */
-#define ParallelVacuumIsActive(vacrel) ((vacrel)->lps != NULL)
+#define ParallelVacuumIsActive(vacrel) ((vacrel)->pvc != NULL)
 
 /* Phases of vacuum during which we report error context. */
 typedef enum
@@ -171,137 +151,6 @@ typedef enum
 	VACUUM_ERRCB_PHASE_TRUNCATE
 } VacErrPhase;
 
-/*
- * LVDeadTuples stores the dead tuple TIDs collected during the heap scan.
- * This is allocated in the DSM segment in parallel mode and in local memory
- * in non-parallel mode.
- */
-typedef struct LVDeadTuples
-{
-	int			max_tuples;		/* # slots allocated in array */
-	int			num_tuples;		/* current # of entries */
-	/* List of TIDs of tuples we intend to delete */
-	/* NB: this list is ordered by TID address */
-	ItemPointerData itemptrs[FLEXIBLE_ARRAY_MEMBER];	/* array of
-														 * ItemPointerData */
-} LVDeadTuples;
-
-/* The dead tuple space consists of LVDeadTuples and dead tuple TIDs */
-#define SizeOfDeadTuples(cnt) \
-	add_size(offsetof(LVDeadTuples, itemptrs), \
-			 mul_size(sizeof(ItemPointerData), cnt))
-#define MAXDEADTUPLES(max_size) \
-		(((max_size) - offsetof(LVDeadTuples, itemptrs)) / sizeof(ItemPointerData))
-
-/*
- * Shared information among parallel workers.  So this is allocated in the DSM
- * segment.
- */
-typedef struct LVShared
-{
-	/*
-	 * Target table relid and log level.  These fields are not modified during
-	 * the lazy vacuum.
-	 */
-	Oid			relid;
-	int			elevel;
-
-	/*
-	 * An indication for vacuum workers to perform either index vacuum or
-	 * index cleanup.  first_time is true only if for_cleanup is true and
-	 * bulk-deletion is not performed yet.
-	 */
-	bool		for_cleanup;
-	bool		first_time;
-
-	/*
-	 * Fields for both index vacuum and cleanup.
-	 *
-	 * reltuples is the total number of input heap tuples.  We set either old
-	 * live tuples in the index vacuum case or the new live tuples in the
-	 * index cleanup case.
-	 *
-	 * estimated_count is true if reltuples is an estimated value.  (Note that
-	 * reltuples could be -1 in this case, indicating we have no idea.)
-	 */
-	double		reltuples;
-	bool		estimated_count;
-
-	/*
-	 * In single process lazy vacuum we could consume more memory during index
-	 * vacuuming or cleanup apart from the memory for heap scanning.  In
-	 * parallel vacuum, since individual vacuum workers can consume memory
-	 * equal to maintenance_work_mem, the new maintenance_work_mem for each
-	 * worker is set such that the parallel operation doesn't consume more
-	 * memory than single process lazy vacuum.
-	 */
-	int			maintenance_work_mem_worker;
-
-	/*
-	 * Shared vacuum cost balance.  During parallel vacuum,
-	 * VacuumSharedCostBalance points to this value and it accumulates the
-	 * balance of each parallel vacuum worker.
-	 */
-	pg_atomic_uint32 cost_balance;
-
-	/*
-	 * Number of active parallel workers.  This is used for computing the
-	 * minimum threshold of the vacuum cost balance before a worker sleeps for
-	 * cost-based delay.
-	 */
-	pg_atomic_uint32 active_nworkers;
-
-	/*
-	 * Variables to control parallel vacuum.  We have a bitmap to indicate
-	 * which index has stats in shared memory.  The set bit in the map
-	 * indicates that the particular index supports a parallel vacuum.
-	 */
-	pg_atomic_uint32 idx;		/* counter for vacuuming and clean up */
-	uint32		offset;			/* sizeof header incl. bitmap */
-	bits8		bitmap[FLEXIBLE_ARRAY_MEMBER];	/* bit map of NULLs */
-
-	/* Shared index statistics data follows at end of struct */
-} LVShared;
-
-#define SizeOfLVShared (offsetof(LVShared, bitmap) + sizeof(bits8))
-#define GetSharedIndStats(s) \
-	((LVSharedIndStats *)((char *)(s) + ((LVShared *)(s))->offset))
-#define IndStatsIsNull(s, i) \
-	(!(((LVShared *)(s))->bitmap[(i) >> 3] & (1 << ((i) & 0x07))))
-
-/*
- * Struct for an index bulk-deletion statistic used for parallel vacuum.  This
- * is allocated in the DSM segment.
- */
-typedef struct LVSharedIndStats
-{
-	bool		updated;		/* are the stats updated? */
-	IndexBulkDeleteResult istat;
-} LVSharedIndStats;
-
-/* Struct for maintaining a parallel vacuum state. */
-typedef struct LVParallelState
-{
-	ParallelContext *pcxt;
-
-	/* Shared information among parallel vacuum workers */
-	LVShared   *lvshared;
-
-	/* Points to buffer usage area in DSM */
-	BufferUsage *buffer_usage;
-
-	/* Points to WAL usage area in DSM */
-	WalUsage   *wal_usage;
-
-	/*
-	 * The number of indexes that support parallel index bulk-deletion and
-	 * parallel index cleanup respectively.
-	 */
-	int			nindexes_parallel_bulkdel;
-	int			nindexes_parallel_cleanup;
-	int			nindexes_parallel_condcleanup;
-} LVParallelState;
-
 typedef struct LVRelState
 {
 	/* Target heap relation and its indexes */
@@ -321,7 +170,7 @@ typedef struct LVRelState
 
 	/* Buffer access strategy and parallel state */
 	BufferAccessStrategy bstrategy;
-	LVParallelState *lps;
+	ParallelVacuumContext *pvc;
 
 	/* rel's initial relfrozenxid and relminmxid */
 	TransactionId relfrozenxid;
@@ -345,7 +194,7 @@ typedef struct LVRelState
 	/*
 	 * State managed by lazy_scan_heap() follows
 	 */
-	LVDeadTuples *dead_tuples;	/* items to vacuum from indexes */
+	VacDeadTuples *dead_tuples;	/* items to vacuum from indexes */
 	BlockNumber rel_pages;		/* total number of pages */
 	BlockNumber scanned_pages;	/* number of pages we examined */
 	BlockNumber pinskipped_pages;	/* # of pages skipped due to a pin */
@@ -416,18 +265,6 @@ static int	lazy_vacuum_heap_page(LVRelState *vacrel, BlockNumber blkno,
 static bool lazy_check_needs_freeze(Buffer buf, bool *hastup,
 									LVRelState *vacrel);
 static bool lazy_check_wraparound_failsafe(LVRelState *vacrel);
-static void do_parallel_lazy_vacuum_all_indexes(LVRelState *vacrel);
-static void do_parallel_lazy_cleanup_all_indexes(LVRelState *vacrel);
-static void do_parallel_vacuum_or_cleanup(LVRelState *vacrel, int nworkers);
-static void do_parallel_processing(LVRelState *vacrel,
-								   LVShared *lvshared);
-static void do_serial_processing_for_unsafe_indexes(LVRelState *vacrel,
-													LVShared *lvshared);
-static IndexBulkDeleteResult *parallel_process_one_index(Relation indrel,
-														 IndexBulkDeleteResult *istat,
-														 LVShared *lvshared,
-														 LVSharedIndStats *shared_indstats,
-														 LVRelState *vacrel);
 static void lazy_cleanup_all_indexes(LVRelState *vacrel);
 static IndexBulkDeleteResult *lazy_vacuum_one_index(Relation indrel,
 													IndexBulkDeleteResult *istat,
@@ -446,20 +283,9 @@ static long compute_max_dead_tuples(BlockNumber relblocks, bool hasindex);
 static void lazy_space_alloc(LVRelState *vacrel, int nworkers,
 							 BlockNumber relblocks);
 static void lazy_space_free(LVRelState *vacrel);
-static bool lazy_tid_reaped(ItemPointer itemptr, void *state);
-static int	vac_cmp_itemptr(const void *left, const void *right);
 static bool heap_page_is_all_visible(LVRelState *vacrel, Buffer buf,
 									 TransactionId *visibility_cutoff_xid, bool *all_frozen);
-static int	compute_parallel_vacuum_workers(LVRelState *vacrel,
-											int nrequested,
-											bool *will_parallel_vacuum);
 static void update_index_statistics(LVRelState *vacrel);
-static LVParallelState *begin_parallel_vacuum(LVRelState *vacrel,
-											  BlockNumber nblocks,
-											  int nrequested);
-static void end_parallel_vacuum(LVRelState *vacrel);
-static LVSharedIndStats *parallel_stats_for_idx(LVShared *lvshared, int getidx);
-static bool parallel_processing_is_safe(Relation indrel, LVShared *lvshared);
 static void vacuum_error_callback(void *arg);
 static void update_vacuum_error_info(LVRelState *vacrel,
 									 LVSavedErrInfo *saved_vacrel,
@@ -905,7 +731,7 @@ heap_vacuum_rel(Relation rel, VacuumParams *params,
 static void
 lazy_scan_heap(LVRelState *vacrel, VacuumParams *params, bool aggressive)
 {
-	LVDeadTuples *dead_tuples;
+	VacDeadTuples *dead_tuples;
 	BlockNumber nblocks,
 				blkno,
 				next_unskippable_block,
@@ -2039,7 +1865,7 @@ retry:
 	 */
 	if (lpdead_items > 0)
 	{
-		LVDeadTuples *dead_tuples = vacrel->dead_tuples;
+		VacDeadTuples *dead_tuples = vacrel->dead_tuples;
 		ItemPointerData tmp;
 
 		Assert(!prunestate->all_visible);
@@ -2251,7 +2077,7 @@ lazy_vacuum_all_indexes(LVRelState *vacrel)
 	else
 	{
 		/* Outsource everything to parallel variant */
-		do_parallel_lazy_vacuum_all_indexes(vacrel);
+		perform_parallel_index_bulkdel(vacrel->pvc, vacrel->old_live_tuples);
 
 		/*
 		 * Do a postcheck to consider applying wraparound failsafe now.  Note
@@ -2404,7 +2230,7 @@ static int
 lazy_vacuum_heap_page(LVRelState *vacrel, BlockNumber blkno, Buffer buffer,
 					  int tupindex, Buffer *vmbuffer)
 {
-	LVDeadTuples *dead_tuples = vacrel->dead_tuples;
+	VacDeadTuples *dead_tuples = vacrel->dead_tuples;
 	Page		page = BufferGetPage(buffer);
 	OffsetNumber unused[MaxHeapTuplesPerPage];
 	int			uncnt = 0;
@@ -2625,351 +2451,6 @@ lazy_check_wraparound_failsafe(LVRelState *vacrel)
 	return false;
 }
 
-/*
- * Perform lazy_vacuum_all_indexes() steps in parallel
- */
-static void
-do_parallel_lazy_vacuum_all_indexes(LVRelState *vacrel)
-{
-	/* Tell parallel workers to do index vacuuming */
-	vacrel->lps->lvshared->for_cleanup = false;
-	vacrel->lps->lvshared->first_time = false;
-
-	/*
-	 * We can only provide an approximate value of num_heap_tuples, at least
-	 * for now.  Matches serial VACUUM case.
-	 */
-	vacrel->lps->lvshared->reltuples = vacrel->old_live_tuples;
-	vacrel->lps->lvshared->estimated_count = true;
-
-	do_parallel_vacuum_or_cleanup(vacrel,
-								  vacrel->lps->nindexes_parallel_bulkdel);
-}
-
-/*
- * Perform lazy_cleanup_all_indexes() steps in parallel
- */
-static void
-do_parallel_lazy_cleanup_all_indexes(LVRelState *vacrel)
-{
-	int			nworkers;
-
-	/*
-	 * If parallel vacuum is active we perform index cleanup with parallel
-	 * workers.
-	 *
-	 * Tell parallel workers to do index cleanup.
-	 */
-	vacrel->lps->lvshared->for_cleanup = true;
-	vacrel->lps->lvshared->first_time = (vacrel->num_index_scans == 0);
-
-	/*
-	 * Now we can provide a better estimate of total number of surviving
-	 * tuples (we assume indexes are more interested in that than in the
-	 * number of nominally live tuples).
-	 */
-	vacrel->lps->lvshared->reltuples = vacrel->new_rel_tuples;
-	vacrel->lps->lvshared->estimated_count =
-		(vacrel->tupcount_pages < vacrel->rel_pages);
-
-	/* Determine the number of parallel workers to launch */
-	if (vacrel->lps->lvshared->first_time)
-		nworkers = vacrel->lps->nindexes_parallel_cleanup +
-			vacrel->lps->nindexes_parallel_condcleanup;
-	else
-		nworkers = vacrel->lps->nindexes_parallel_cleanup;
-
-	do_parallel_vacuum_or_cleanup(vacrel, nworkers);
-}
-
-/*
- * Perform index vacuum or index cleanup with parallel workers.  This function
- * must be used by the parallel vacuum leader process.  The caller must set
- * lps->lvshared->for_cleanup to indicate whether to perform vacuum or
- * cleanup.
- */
-static void
-do_parallel_vacuum_or_cleanup(LVRelState *vacrel, int nworkers)
-{
-	LVParallelState *lps = vacrel->lps;
-
-	Assert(!IsParallelWorker());
-	Assert(ParallelVacuumIsActive(vacrel));
-	Assert(vacrel->nindexes > 0);
-
-	/* The leader process will participate */
-	nworkers--;
-
-	/*
-	 * It is possible that parallel context is initialized with fewer workers
-	 * than the number of indexes that need a separate worker in the current
-	 * phase, so we need to consider it.  See compute_parallel_vacuum_workers.
-	 */
-	nworkers = Min(nworkers, lps->pcxt->nworkers);
-
-	/* Setup the shared cost-based vacuum delay and launch workers */
-	if (nworkers > 0)
-	{
-		if (vacrel->num_index_scans > 0)
-		{
-			/* Reset the parallel index processing counter */
-			pg_atomic_write_u32(&(lps->lvshared->idx), 0);
-
-			/* Reinitialize the parallel context to relaunch parallel workers */
-			ReinitializeParallelDSM(lps->pcxt);
-		}
-
-		/*
-		 * Set up shared cost balance and the number of active workers for
-		 * vacuum delay.  We need to do this before launching workers as
-		 * otherwise, they might not see the updated values for these
-		 * parameters.
-		 */
-		pg_atomic_write_u32(&(lps->lvshared->cost_balance), VacuumCostBalance);
-		pg_atomic_write_u32(&(lps->lvshared->active_nworkers), 0);
-
-		/*
-		 * The number of workers can vary between bulkdelete and cleanup
-		 * phase.
-		 */
-		ReinitializeParallelWorkers(lps->pcxt, nworkers);
-
-		LaunchParallelWorkers(lps->pcxt);
-
-		if (lps->pcxt->nworkers_launched > 0)
-		{
-			/*
-			 * Reset the local cost values for leader backend as we have
-			 * already accumulated the remaining balance of heap.
-			 */
-			VacuumCostBalance = 0;
-			VacuumCostBalanceLocal = 0;
-
-			/* Enable shared cost balance for leader backend */
-			VacuumSharedCostBalance = &(lps->lvshared->cost_balance);
-			VacuumActiveNWorkers = &(lps->lvshared->active_nworkers);
-		}
-
-		if (lps->lvshared->for_cleanup)
-			ereport(elevel,
-					(errmsg(ngettext("launched %d parallel vacuum worker for index cleanup (planned: %d)",
-									 "launched %d parallel vacuum workers for index cleanup (planned: %d)",
-									 lps->pcxt->nworkers_launched),
-							lps->pcxt->nworkers_launched, nworkers)));
-		else
-			ereport(elevel,
-					(errmsg(ngettext("launched %d parallel vacuum worker for index vacuuming (planned: %d)",
-									 "launched %d parallel vacuum workers for index vacuuming (planned: %d)",
-									 lps->pcxt->nworkers_launched),
-							lps->pcxt->nworkers_launched, nworkers)));
-	}
-
-	/* Process the indexes that can be processed by only leader process */
-	do_serial_processing_for_unsafe_indexes(vacrel, lps->lvshared);
-
-	/*
-	 * Join as a parallel worker.  The leader process alone processes all the
-	 * indexes in the case where no workers are launched.
-	 */
-	do_parallel_processing(vacrel, lps->lvshared);
-
-	/*
-	 * Next, accumulate buffer and WAL usage.  (This must wait for the workers
-	 * to finish, or we might get incomplete data.)
-	 */
-	if (nworkers > 0)
-	{
-		/* Wait for all vacuum workers to finish */
-		WaitForParallelWorkersToFinish(lps->pcxt);
-
-		for (int i = 0; i < lps->pcxt->nworkers_launched; i++)
-			InstrAccumParallelQuery(&lps->buffer_usage[i], &lps->wal_usage[i]);
-	}
-
-	/*
-	 * Carry the shared balance value to heap scan and disable shared costing
-	 */
-	if (VacuumSharedCostBalance)
-	{
-		VacuumCostBalance = pg_atomic_read_u32(VacuumSharedCostBalance);
-		VacuumSharedCostBalance = NULL;
-		VacuumActiveNWorkers = NULL;
-	}
-}
-
-/*
- * Index vacuum/cleanup routine used by the leader process and parallel
- * vacuum worker processes to process the indexes in parallel.
- */
-static void
-do_parallel_processing(LVRelState *vacrel, LVShared *lvshared)
-{
-	/*
-	 * Increment the active worker count if we are able to launch any worker.
-	 */
-	if (VacuumActiveNWorkers)
-		pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1);
-
-	/* Loop until all indexes are vacuumed */
-	for (;;)
-	{
-		int			idx;
-		LVSharedIndStats *shared_istat;
-		Relation	indrel;
-		IndexBulkDeleteResult *istat;
-
-		/* Get an index number to process */
-		idx = pg_atomic_fetch_add_u32(&(lvshared->idx), 1);
-
-		/* Done for all indexes? */
-		if (idx >= vacrel->nindexes)
-			break;
-
-		/* Get the index statistics space from DSM, if any */
-		shared_istat = parallel_stats_for_idx(lvshared, idx);
-
-		/* Skip indexes not participating in parallelism */
-		if (shared_istat == NULL)
-			continue;
-
-		indrel = vacrel->indrels[idx];
-
-		/*
-		 * Skip processing indexes that are unsafe for workers (these are
-		 * processed in do_serial_processing_for_unsafe_indexes() by leader)
-		 */
-		if (!parallel_processing_is_safe(indrel, lvshared))
-			continue;
-
-		/* Do vacuum or cleanup of the index */
-		istat = vacrel->indstats[idx];
-		vacrel->indstats[idx] = parallel_process_one_index(indrel, istat,
-														   lvshared,
-														   shared_istat,
-														   vacrel);
-	}
-
-	/*
-	 * We have completed the index vacuum so decrement the active worker
-	 * count.
-	 */
-	if (VacuumActiveNWorkers)
-		pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1);
-}
-
-/*
- * Perform parallel processing of indexes in leader process.
- *
- * Handles index vacuuming (or index cleanup) for indexes that are not
- * parallel safe.  It's possible that this will vary for a given index, based
- * on details like whether we're performing for_cleanup processing right now.
- *
- * Also performs processing of smaller indexes that fell under the size cutoff
- * enforced by compute_parallel_vacuum_workers().  These indexes never get a
- * slot for statistics in DSM.
- */
-static void
-do_serial_processing_for_unsafe_indexes(LVRelState *vacrel, LVShared *lvshared)
-{
-	Assert(!IsParallelWorker());
-
-	/*
-	 * Increment the active worker count if we are able to launch any worker.
-	 */
-	if (VacuumActiveNWorkers)
-		pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1);
-
-	for (int idx = 0; idx < vacrel->nindexes; idx++)
-	{
-		LVSharedIndStats *shared_istat;
-		Relation	indrel;
-		IndexBulkDeleteResult *istat;
-
-		shared_istat = parallel_stats_for_idx(lvshared, idx);
-		indrel = vacrel->indrels[idx];
-
-		/*
-		 * We're only here for the indexes that parallel workers won't
-		 * process.  Note that the shared_istat test ensures that we process
-		 * indexes that fell under initial size cutoff.
-		 */
-		if (shared_istat != NULL &&
-			parallel_processing_is_safe(indrel, lvshared))
-			continue;
-
-		/* Do vacuum or cleanup of the index */
-		istat = vacrel->indstats[idx];
-		vacrel->indstats[idx] = parallel_process_one_index(indrel, istat,
-														   lvshared,
-														   shared_istat,
-														   vacrel);
-	}
-
-	/*
-	 * We have completed the index vacuum so decrement the active worker
-	 * count.
-	 */
-	if (VacuumActiveNWorkers)
-		pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1);
-}
-
-/*
- * Vacuum or cleanup index either by leader process or by one of the worker
- * process.  After processing the index this function copies the index
- * statistics returned from ambulkdelete and amvacuumcleanup to the DSM
- * segment.
- */
-static IndexBulkDeleteResult *
-parallel_process_one_index(Relation indrel,
-						   IndexBulkDeleteResult *istat,
-						   LVShared *lvshared,
-						   LVSharedIndStats *shared_istat,
-						   LVRelState *vacrel)
-{
-	IndexBulkDeleteResult *istat_res;
-
-	/*
-	 * Update the pointer to the corresponding bulk-deletion result if someone
-	 * has already updated it
-	 */
-	if (shared_istat && shared_istat->updated && istat == NULL)
-		istat = &shared_istat->istat;
-
-	/* Do vacuum or cleanup of the index */
-	if (lvshared->for_cleanup)
-		istat_res = lazy_cleanup_one_index(indrel, istat, lvshared->reltuples,
-										   lvshared->estimated_count, vacrel);
-	else
-		istat_res = lazy_vacuum_one_index(indrel, istat, lvshared->reltuples,
-										  vacrel);
-
-	/*
-	 * Copy the index bulk-deletion result returned from ambulkdelete and
-	 * amvacuumcleanup to the DSM segment if it's the first cycle because they
-	 * allocate locally and it's possible that an index will be vacuumed by a
-	 * different vacuum process the next cycle.  Copying the result normally
-	 * happens only the first time an index is vacuumed.  For any additional
-	 * vacuum pass, we directly point to the result on the DSM segment and
-	 * pass it to vacuum index APIs so that workers can update it directly.
-	 *
-	 * Since all vacuum workers write the bulk-deletion result at different
-	 * slots we can write them without locking.
-	 */
-	if (shared_istat && !shared_istat->updated && istat_res != NULL)
-	{
-		memcpy(&shared_istat->istat, istat_res, sizeof(IndexBulkDeleteResult));
-		shared_istat->updated = true;
-
-		/* Free the locally-allocated bulk-deletion result */
-		pfree(istat_res);
-
-		/* return the pointer to the result from shared memory */
-		return &shared_istat->istat;
-	}
-
-	return istat_res;
-}
-
 /*
  *	lazy_cleanup_all_indexes() -- cleanup all indexes of relation.
  */
@@ -3002,7 +2483,8 @@ lazy_cleanup_all_indexes(LVRelState *vacrel)
 	else
 	{
 		/* Outsource everything to parallel variant */
-		do_parallel_lazy_cleanup_all_indexes(vacrel);
+		perform_parallel_index_cleanup(vacrel->pvc, vacrel->new_rel_tuples,
+									   (vacrel->tupcount_pages < vacrel->rel_pages));
 	}
 }
 
@@ -3048,13 +2530,7 @@ lazy_vacuum_one_index(Relation indrel, IndexBulkDeleteResult *istat,
 							 InvalidBlockNumber, InvalidOffsetNumber);
 
 	/* Do bulk deletion */
-	istat = index_bulk_delete(&ivinfo, istat, lazy_tid_reaped,
-							  (void *) vacrel->dead_tuples);
-
-	ereport(elevel,
-			(errmsg("scanned index \"%s\" to remove %d row versions",
-					vacrel->indname, vacrel->dead_tuples->num_tuples),
-			 errdetail_internal("%s", pg_rusage_show(&ru0))));
+	istat = vacuum_one_index(&ivinfo, istat, vacrel->dead_tuples);
 
 	/* Revert to the previous phase information for error traceback */
 	restore_vacuum_error_info(vacrel, &saved_err_info);
@@ -3088,7 +2564,6 @@ lazy_cleanup_one_index(Relation indrel, IndexBulkDeleteResult *istat,
 	ivinfo.report_progress = false;
 	ivinfo.estimated_count = estimated_count;
 	ivinfo.message_level = elevel;
-
 	ivinfo.num_heap_tuples = reltuples;
 	ivinfo.strategy = vacrel->bstrategy;
 
@@ -3104,24 +2579,7 @@ lazy_cleanup_one_index(Relation indrel, IndexBulkDeleteResult *istat,
 							 VACUUM_ERRCB_PHASE_INDEX_CLEANUP,
 							 InvalidBlockNumber, InvalidOffsetNumber);
 
-	istat = index_vacuum_cleanup(&ivinfo, istat);
-
-	if (istat)
-	{
-		ereport(elevel,
-				(errmsg("index \"%s\" now contains %.0f row versions in %u pages",
-						RelationGetRelationName(indrel),
-						istat->num_index_tuples,
-						istat->num_pages),
-				 errdetail("%.0f index row versions were removed.\n"
-						   "%u index pages were newly deleted.\n"
-						   "%u index pages are currently deleted, of which %u are currently reusable.\n"
-						   "%s.",
-						   istat->tuples_removed,
-						   istat->pages_newly_deleted,
-						   istat->pages_deleted, istat->pages_free,
-						   pg_rusage_show(&ru0))));
-	}
+	istat = cleanup_one_index(&ivinfo, istat);
 
 	/* Revert to the previous phase information for error traceback */
 	restore_vacuum_error_info(vacrel, &saved_err_info);
@@ -3479,9 +2937,11 @@ compute_max_dead_tuples(BlockNumber relblocks, bool hasindex)
 static void
 lazy_space_alloc(LVRelState *vacrel, int nworkers, BlockNumber nblocks)
 {
-	LVDeadTuples *dead_tuples;
+	VacDeadTuples *dead_tuples;
 	long		maxtuples;
 
+	maxtuples = compute_max_dead_tuples(nblocks, vacrel->nindexes > 0);
+
 	/*
 	 * Initialize state for a parallel vacuum.  As of now, only one worker can
 	 * be used for an index, so we invoke parallelism only if there are at
@@ -3505,16 +2965,29 @@ lazy_space_alloc(LVRelState *vacrel, int nworkers, BlockNumber nblocks)
 								vacrel->relname)));
 		}
 		else
-			vacrel->lps = begin_parallel_vacuum(vacrel, nblocks, nworkers);
+		{
+			ParallelVacuumCtl ctl;
+
+			/* Create parallel vacuum context */
+			ctl.rel = vacrel->rel;
+			ctl.indrels = vacrel->indrels;
+			ctl.nindexes = vacrel->nindexes;
+			ctl.nrequested_workers = nworkers;
+			ctl.maxtuples = maxtuples;
+			ctl.elevel = elevel;
+			ctl.bstrategy = vacrel->bstrategy;
+			vacrel->pvc = begin_parallel_vacuum(&ctl);
+		}
 
 		/* If parallel mode started, we're done */
 		if (ParallelVacuumIsActive(vacrel))
+		{
+			vacrel->dead_tuples = get_vacuum_dead_tuples(vacrel->pvc);
 			return;
+		}
 	}
 
-	maxtuples = compute_max_dead_tuples(nblocks, vacrel->nindexes > 0);
-
-	dead_tuples = (LVDeadTuples *) palloc(SizeOfDeadTuples(maxtuples));
+	dead_tuples = (VacDeadTuples *) palloc(SizeOfDeadTuples(maxtuples));
 	dead_tuples->num_tuples = 0;
 	dead_tuples->max_tuples = (int) maxtuples;
 
@@ -3534,75 +3007,8 @@ lazy_space_free(LVRelState *vacrel)
 	 * End parallel mode before updating index statistics as we cannot write
 	 * during parallel mode.
 	 */
-	end_parallel_vacuum(vacrel);
-}
-
-/*
- *	lazy_tid_reaped() -- is a particular tid deletable?
- *
- *		This has the right signature to be an IndexBulkDeleteCallback.
- *
- *		Assumes dead_tuples array is in sorted order.
- */
-static bool
-lazy_tid_reaped(ItemPointer itemptr, void *state)
-{
-	LVDeadTuples *dead_tuples = (LVDeadTuples *) state;
-	int64		litem,
-				ritem,
-				item;
-	ItemPointer res;
-
-	litem = itemptr_encode(&dead_tuples->itemptrs[0]);
-	ritem = itemptr_encode(&dead_tuples->itemptrs[dead_tuples->num_tuples - 1]);
-	item = itemptr_encode(itemptr);
-
-	/*
-	 * Doing a simple bound check before bsearch() is useful to avoid the
-	 * extra cost of bsearch(), especially if dead tuples on the heap are
-	 * concentrated in a certain range.  Since this function is called for
-	 * every index tuple, it pays to be really fast.
-	 */
-	if (item < litem || item > ritem)
-		return false;
-
-	res = (ItemPointer) bsearch((void *) itemptr,
-								(void *) dead_tuples->itemptrs,
-								dead_tuples->num_tuples,
-								sizeof(ItemPointerData),
-								vac_cmp_itemptr);
-
-	return (res != NULL);
-}
-
-/*
- * Comparator routines for use with qsort() and bsearch().
- */
-static int
-vac_cmp_itemptr(const void *left, const void *right)
-{
-	BlockNumber lblk,
-				rblk;
-	OffsetNumber loff,
-				roff;
-
-	lblk = ItemPointerGetBlockNumber((ItemPointer) left);
-	rblk = ItemPointerGetBlockNumber((ItemPointer) right);
-
-	if (lblk < rblk)
-		return -1;
-	if (lblk > rblk)
-		return 1;
-
-	loff = ItemPointerGetOffsetNumber((ItemPointer) left);
-	roff = ItemPointerGetOffsetNumber((ItemPointer) right);
-
-	if (loff < roff)
-		return -1;
-	if (loff > roff)
-		return 1;
-
-	return 0;
+	end_parallel_vacuum(vacrel->pvc, vacrel->indstats);
+	vacrel->pvc = NULL;
 }
 
 /*
@@ -3725,76 +3131,6 @@ heap_page_is_all_visible(LVRelState *vacrel, Buffer buf,
 	return all_visible;
 }
 
-/*
- * Compute the number of parallel worker processes to request.  Both index
- * vacuum and index cleanup can be executed with parallel workers.  The index
- * is eligible for parallel vacuum iff its size is greater than
- * min_parallel_index_scan_size as invoking workers for very small indexes
- * can hurt performance.
- *
- * nrequested is the number of parallel workers that user requested.  If
- * nrequested is 0, we compute the parallel degree based on nindexes, that is
- * the number of indexes that support parallel vacuum.  This function also
- * sets will_parallel_vacuum to remember indexes that participate in parallel
- * vacuum.
- */
-static int
-compute_parallel_vacuum_workers(LVRelState *vacrel, int nrequested,
-								bool *will_parallel_vacuum)
-{
-	int			nindexes_parallel = 0;
-	int			nindexes_parallel_bulkdel = 0;
-	int			nindexes_parallel_cleanup = 0;
-	int			parallel_workers;
-
-	/*
-	 * We don't allow performing parallel operation in standalone backend or
-	 * when parallelism is disabled.
-	 */
-	if (!IsUnderPostmaster || max_parallel_maintenance_workers == 0)
-		return 0;
-
-	/*
-	 * Compute the number of indexes that can participate in parallel vacuum.
-	 */
-	for (int idx = 0; idx < vacrel->nindexes; idx++)
-	{
-		Relation	indrel = vacrel->indrels[idx];
-		uint8		vacoptions = indrel->rd_indam->amparallelvacuumoptions;
-
-		if (vacoptions == VACUUM_OPTION_NO_PARALLEL ||
-			RelationGetNumberOfBlocks(indrel) < min_parallel_index_scan_size)
-			continue;
-
-		will_parallel_vacuum[idx] = true;
-
-		if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0)
-			nindexes_parallel_bulkdel++;
-		if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0) ||
-			((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0))
-			nindexes_parallel_cleanup++;
-	}
-
-	nindexes_parallel = Max(nindexes_parallel_bulkdel,
-							nindexes_parallel_cleanup);
-
-	/* The leader process takes one index */
-	nindexes_parallel--;
-
-	/* No index supports parallel vacuum */
-	if (nindexes_parallel <= 0)
-		return 0;
-
-	/* Compute the parallel degree */
-	parallel_workers = (nrequested > 0) ?
-		Min(nrequested, nindexes_parallel) : nindexes_parallel;
-
-	/* Cap by max_parallel_maintenance_workers */
-	parallel_workers = Min(parallel_workers, max_parallel_maintenance_workers);
-
-	return parallel_workers;
-}
-
 /*
  * Update index statistics in pg_class if the statistics are accurate.
  */
@@ -3827,426 +3163,6 @@ update_index_statistics(LVRelState *vacrel)
 	}
 }
 
-/*
- * This function prepares and returns parallel vacuum state if we can launch
- * even one worker.  This function is responsible for entering parallel mode,
- * create a parallel context, and then initialize the DSM segment.
- */
-static LVParallelState *
-begin_parallel_vacuum(LVRelState *vacrel, BlockNumber nblocks,
-					  int nrequested)
-{
-	LVParallelState *lps = NULL;
-	Relation   *indrels = vacrel->indrels;
-	int			nindexes = vacrel->nindexes;
-	ParallelContext *pcxt;
-	LVShared   *shared;
-	LVDeadTuples *dead_tuples;
-	BufferUsage *buffer_usage;
-	WalUsage   *wal_usage;
-	bool	   *will_parallel_vacuum;
-	long		maxtuples;
-	Size		est_shared;
-	Size		est_deadtuples;
-	int			nindexes_mwm = 0;
-	int			parallel_workers = 0;
-	int			querylen;
-
-	/*
-	 * A parallel vacuum must be requested and there must be indexes on the
-	 * relation
-	 */
-	Assert(nrequested >= 0);
-	Assert(nindexes > 0);
-
-	/*
-	 * Compute the number of parallel vacuum workers to launch
-	 */
-	will_parallel_vacuum = (bool *) palloc0(sizeof(bool) * nindexes);
-	parallel_workers = compute_parallel_vacuum_workers(vacrel,
-													   nrequested,
-													   will_parallel_vacuum);
-
-	/* Can't perform vacuum in parallel */
-	if (parallel_workers <= 0)
-	{
-		pfree(will_parallel_vacuum);
-		return lps;
-	}
-
-	lps = (LVParallelState *) palloc0(sizeof(LVParallelState));
-
-	EnterParallelMode();
-	pcxt = CreateParallelContext("postgres", "parallel_vacuum_main",
-								 parallel_workers);
-	Assert(pcxt->nworkers > 0);
-	lps->pcxt = pcxt;
-
-	/* Estimate size for shared information -- PARALLEL_VACUUM_KEY_SHARED */
-	est_shared = MAXALIGN(add_size(SizeOfLVShared, BITMAPLEN(nindexes)));
-	for (int idx = 0; idx < nindexes; idx++)
-	{
-		Relation	indrel = indrels[idx];
-		uint8		vacoptions = indrel->rd_indam->amparallelvacuumoptions;
-
-		/*
-		 * Cleanup option should be either disabled, always performing in
-		 * parallel or conditionally performing in parallel.
-		 */
-		Assert(((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) ||
-			   ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0));
-		Assert(vacoptions <= VACUUM_OPTION_MAX_VALID_VALUE);
-
-		/* Skip indexes that don't participate in parallel vacuum */
-		if (!will_parallel_vacuum[idx])
-			continue;
-
-		if (indrel->rd_indam->amusemaintenanceworkmem)
-			nindexes_mwm++;
-
-		est_shared = add_size(est_shared, sizeof(LVSharedIndStats));
-
-		/*
-		 * Remember the number of indexes that support parallel operation for
-		 * each phase.
-		 */
-		if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0)
-			lps->nindexes_parallel_bulkdel++;
-		if ((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0)
-			lps->nindexes_parallel_cleanup++;
-		if ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0)
-			lps->nindexes_parallel_condcleanup++;
-	}
-	shm_toc_estimate_chunk(&pcxt->estimator, est_shared);
-	shm_toc_estimate_keys(&pcxt->estimator, 1);
-
-	/* Estimate size for dead tuples -- PARALLEL_VACUUM_KEY_DEAD_TUPLES */
-	maxtuples = compute_max_dead_tuples(nblocks, true);
-	est_deadtuples = MAXALIGN(SizeOfDeadTuples(maxtuples));
-	shm_toc_estimate_chunk(&pcxt->estimator, est_deadtuples);
-	shm_toc_estimate_keys(&pcxt->estimator, 1);
-
-	/*
-	 * Estimate space for BufferUsage and WalUsage --
-	 * PARALLEL_VACUUM_KEY_BUFFER_USAGE and PARALLEL_VACUUM_KEY_WAL_USAGE.
-	 *
-	 * If there are no extensions loaded that care, we could skip this.  We
-	 * have no way of knowing whether anyone's looking at pgBufferUsage or
-	 * pgWalUsage, so do it unconditionally.
-	 */
-	shm_toc_estimate_chunk(&pcxt->estimator,
-						   mul_size(sizeof(BufferUsage), pcxt->nworkers));
-	shm_toc_estimate_keys(&pcxt->estimator, 1);
-	shm_toc_estimate_chunk(&pcxt->estimator,
-						   mul_size(sizeof(WalUsage), pcxt->nworkers));
-	shm_toc_estimate_keys(&pcxt->estimator, 1);
-
-	/* Finally, estimate PARALLEL_VACUUM_KEY_QUERY_TEXT space */
-	if (debug_query_string)
-	{
-		querylen = strlen(debug_query_string);
-		shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1);
-		shm_toc_estimate_keys(&pcxt->estimator, 1);
-	}
-	else
-		querylen = 0;			/* keep compiler quiet */
-
-	InitializeParallelDSM(pcxt);
-
-	/* Prepare shared information */
-	shared = (LVShared *) shm_toc_allocate(pcxt->toc, est_shared);
-	MemSet(shared, 0, est_shared);
-	shared->relid = RelationGetRelid(vacrel->rel);
-	shared->elevel = elevel;
-	shared->maintenance_work_mem_worker =
-		(nindexes_mwm > 0) ?
-		maintenance_work_mem / Min(parallel_workers, nindexes_mwm) :
-		maintenance_work_mem;
-
-	pg_atomic_init_u32(&(shared->cost_balance), 0);
-	pg_atomic_init_u32(&(shared->active_nworkers), 0);
-	pg_atomic_init_u32(&(shared->idx), 0);
-	shared->offset = MAXALIGN(add_size(SizeOfLVShared, BITMAPLEN(nindexes)));
-
-	/*
-	 * Initialize variables for shared index statistics, set NULL bitmap and
-	 * the size of stats for each index.
-	 */
-	memset(shared->bitmap, 0x00, BITMAPLEN(nindexes));
-	for (int idx = 0; idx < nindexes; idx++)
-	{
-		if (!will_parallel_vacuum[idx])
-			continue;
-
-		/* Set NOT NULL as this index does support parallelism */
-		shared->bitmap[idx >> 3] |= 1 << (idx & 0x07);
-	}
-
-	shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_SHARED, shared);
-	lps->lvshared = shared;
-
-	/* Prepare the dead tuple space */
-	dead_tuples = (LVDeadTuples *) shm_toc_allocate(pcxt->toc, est_deadtuples);
-	dead_tuples->max_tuples = maxtuples;
-	dead_tuples->num_tuples = 0;
-	MemSet(dead_tuples->itemptrs, 0, sizeof(ItemPointerData) * maxtuples);
-	shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_DEAD_TUPLES, dead_tuples);
-	vacrel->dead_tuples = dead_tuples;
-
-	/*
-	 * Allocate space for each worker's BufferUsage and WalUsage; no need to
-	 * initialize
-	 */
-	buffer_usage = shm_toc_allocate(pcxt->toc,
-									mul_size(sizeof(BufferUsage), pcxt->nworkers));
-	shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, buffer_usage);
-	lps->buffer_usage = buffer_usage;
-	wal_usage = shm_toc_allocate(pcxt->toc,
-								 mul_size(sizeof(WalUsage), pcxt->nworkers));
-	shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_WAL_USAGE, wal_usage);
-	lps->wal_usage = wal_usage;
-
-	/* Store query string for workers */
-	if (debug_query_string)
-	{
-		char	   *sharedquery;
-
-		sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1);
-		memcpy(sharedquery, debug_query_string, querylen + 1);
-		sharedquery[querylen] = '\0';
-		shm_toc_insert(pcxt->toc,
-					   PARALLEL_VACUUM_KEY_QUERY_TEXT, sharedquery);
-	}
-
-	pfree(will_parallel_vacuum);
-	return lps;
-}
-
-/*
- * Destroy the parallel context, and end parallel mode.
- *
- * Since writes are not allowed during parallel mode, copy the
- * updated index statistics from DSM into local memory and then later use that
- * to update the index statistics.  One might think that we can exit from
- * parallel mode, update the index statistics and then destroy parallel
- * context, but that won't be safe (see ExitParallelMode).
- */
-static void
-end_parallel_vacuum(LVRelState *vacrel)
-{
-	IndexBulkDeleteResult **indstats = vacrel->indstats;
-	LVParallelState *lps = vacrel->lps;
-	int			nindexes = vacrel->nindexes;
-
-	Assert(!IsParallelWorker());
-
-	/* Copy the updated statistics */
-	for (int idx = 0; idx < nindexes; idx++)
-	{
-		LVSharedIndStats *shared_istat;
-
-		shared_istat = parallel_stats_for_idx(lps->lvshared, idx);
-
-		/*
-		 * Skip index -- it must have been processed by the leader, from
-		 * inside do_serial_processing_for_unsafe_indexes()
-		 */
-		if (shared_istat == NULL)
-			continue;
-
-		if (shared_istat->updated)
-		{
-			indstats[idx] = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
-			memcpy(indstats[idx], &shared_istat->istat, sizeof(IndexBulkDeleteResult));
-		}
-		else
-			indstats[idx] = NULL;
-	}
-
-	DestroyParallelContext(lps->pcxt);
-	ExitParallelMode();
-
-	/* Deactivate parallel vacuum */
-	pfree(lps);
-	vacrel->lps = NULL;
-}
-
-/*
- * Return shared memory statistics for index at offset 'getidx', if any
- *
- * Returning NULL indicates that compute_parallel_vacuum_workers() determined
- * that the index is a totally unsuitable target for all parallel processing
- * up front.  For example, the index could be < min_parallel_index_scan_size
- * cutoff.
- */
-static LVSharedIndStats *
-parallel_stats_for_idx(LVShared *lvshared, int getidx)
-{
-	char	   *p;
-
-	if (IndStatsIsNull(lvshared, getidx))
-		return NULL;
-
-	p = (char *) GetSharedIndStats(lvshared);
-	for (int idx = 0; idx < getidx; idx++)
-	{
-		if (IndStatsIsNull(lvshared, idx))
-			continue;
-
-		p += sizeof(LVSharedIndStats);
-	}
-
-	return (LVSharedIndStats *) p;
-}
-
-/*
- * Returns false, if the given index can't participate in parallel index
- * vacuum or parallel index cleanup
- */
-static bool
-parallel_processing_is_safe(Relation indrel, LVShared *lvshared)
-{
-	uint8		vacoptions = indrel->rd_indam->amparallelvacuumoptions;
-
-	/* first_time must be true only if for_cleanup is true */
-	Assert(lvshared->for_cleanup || !lvshared->first_time);
-
-	if (lvshared->for_cleanup)
-	{
-		/* Skip, if the index does not support parallel cleanup */
-		if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) &&
-			((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0))
-			return false;
-
-		/*
-		 * Skip, if the index supports parallel cleanup conditionally, but we
-		 * have already processed the index (for bulkdelete).  See the
-		 * comments for option VACUUM_OPTION_PARALLEL_COND_CLEANUP to know
-		 * when indexes support parallel cleanup conditionally.
-		 */
-		if (!lvshared->first_time &&
-			((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0))
-			return false;
-	}
-	else if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) == 0)
-	{
-		/* Skip if the index does not support parallel bulk deletion */
-		return false;
-	}
-
-	return true;
-}
-
-/*
- * Perform work within a launched parallel process.
- *
- * Since parallel vacuum workers perform only index vacuum or index cleanup,
- * we don't need to report progress information.
- */
-void
-parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
-{
-	Relation	rel;
-	Relation   *indrels;
-	LVShared   *lvshared;
-	LVDeadTuples *dead_tuples;
-	BufferUsage *buffer_usage;
-	WalUsage   *wal_usage;
-	int			nindexes;
-	char	   *sharedquery;
-	LVRelState	vacrel;
-	ErrorContextCallback errcallback;
-
-	lvshared = (LVShared *) shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_SHARED,
-										   false);
-	elevel = lvshared->elevel;
-
-	if (lvshared->for_cleanup)
-		elog(DEBUG1, "starting parallel vacuum worker for cleanup");
-	else
-		elog(DEBUG1, "starting parallel vacuum worker for bulk delete");
-
-	/* Set debug_query_string for individual workers */
-	sharedquery = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_QUERY_TEXT, true);
-	debug_query_string = sharedquery;
-	pgstat_report_activity(STATE_RUNNING, debug_query_string);
-
-	/*
-	 * Open table.  The lock mode is the same as the leader process.  It's
-	 * okay because the lock mode does not conflict among the parallel
-	 * workers.
-	 */
-	rel = table_open(lvshared->relid, ShareUpdateExclusiveLock);
-
-	/*
-	 * Open all indexes. indrels are sorted in order by OID, which should be
-	 * matched to the leader's one.
-	 */
-	vac_open_indexes(rel, RowExclusiveLock, &nindexes, &indrels);
-	Assert(nindexes > 0);
-
-	/* Set dead tuple space */
-	dead_tuples = (LVDeadTuples *) shm_toc_lookup(toc,
-												  PARALLEL_VACUUM_KEY_DEAD_TUPLES,
-												  false);
-
-	/* Set cost-based vacuum delay */
-	VacuumCostActive = (VacuumCostDelay > 0);
-	VacuumCostBalance = 0;
-	VacuumPageHit = 0;
-	VacuumPageMiss = 0;
-	VacuumPageDirty = 0;
-	VacuumCostBalanceLocal = 0;
-	VacuumSharedCostBalance = &(lvshared->cost_balance);
-	VacuumActiveNWorkers = &(lvshared->active_nworkers);
-
-	vacrel.rel = rel;
-	vacrel.indrels = indrels;
-	vacrel.nindexes = nindexes;
-	/* Each parallel VACUUM worker gets its own access strategy */
-	vacrel.bstrategy = GetAccessStrategy(BAS_VACUUM);
-	vacrel.indstats = (IndexBulkDeleteResult **)
-		palloc0(nindexes * sizeof(IndexBulkDeleteResult *));
-
-	if (lvshared->maintenance_work_mem_worker > 0)
-		maintenance_work_mem = lvshared->maintenance_work_mem_worker;
-
-	/*
-	 * Initialize vacrel for use as error callback arg by parallel worker.
-	 */
-	vacrel.relnamespace = get_namespace_name(RelationGetNamespace(rel));
-	vacrel.relname = pstrdup(RelationGetRelationName(rel));
-	vacrel.indname = NULL;
-	vacrel.phase = VACUUM_ERRCB_PHASE_UNKNOWN;	/* Not yet processing */
-	vacrel.dead_tuples = dead_tuples;
-
-	/* Setup error traceback support for ereport() */
-	errcallback.callback = vacuum_error_callback;
-	errcallback.arg = &vacrel;
-	errcallback.previous = error_context_stack;
-	error_context_stack = &errcallback;
-
-	/* Prepare to track buffer usage during parallel execution */
-	InstrStartParallelQuery();
-
-	/* Process indexes to perform vacuum/cleanup */
-	do_parallel_processing(&vacrel, lvshared);
-
-	/* Report buffer/WAL usage during parallel execution */
-	buffer_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, false);
-	wal_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_WAL_USAGE, false);
-	InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber],
-						  &wal_usage[ParallelWorkerNumber]);
-
-	/* Pop the error context stack */
-	error_context_stack = errcallback.previous;
-
-	vac_close_indexes(nindexes, indrels, RowExclusiveLock);
-	table_close(rel, ShareUpdateExclusiveLock);
-	FreeAccessStrategy(vacrel.bstrategy);
-	pfree(vacrel.indstats);
-}
-
 /*
  * Error context callback for errors occurring during vacuum.
  */
diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c
index bb1881f573..6b427772d5 100644
--- a/src/backend/access/transam/parallel.c
+++ b/src/backend/access/transam/parallel.c
@@ -25,6 +25,7 @@
 #include "catalog/pg_enum.h"
 #include "catalog/storage.h"
 #include "commands/async.h"
+#include "commands/vacuum.h"
 #include "executor/execParallel.h"
 #include "libpq/libpq.h"
 #include "libpq/pqformat.h"
diff --git a/src/backend/commands/Makefile b/src/backend/commands/Makefile
index e8504f0ae4..48f7348f91 100644
--- a/src/backend/commands/Makefile
+++ b/src/backend/commands/Makefile
@@ -59,6 +59,7 @@ OBJS = \
 	typecmds.o \
 	user.o \
 	vacuum.o \
+	vacuumparallel.o \
 	variable.o \
 	view.o
 
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index 5c4bc15b44..2fcb576540 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -32,6 +32,7 @@
 #include "access/transam.h"
 #include "access/xact.h"
 #include "catalog/namespace.h"
+#include "catalog/index.h"
 #include "catalog/pg_database.h"
 #include "catalog/pg_inherits.h"
 #include "catalog/pg_namespace.h"
@@ -51,6 +52,7 @@
 #include "utils/fmgroids.h"
 #include "utils/guc.h"
 #include "utils/memutils.h"
+#include "utils/pg_rusage.h"
 #include "utils/snapmgr.h"
 #include "utils/syscache.h"
 
@@ -90,6 +92,9 @@ static bool vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params);
 static double compute_parallel_delay(void);
 static VacOptValue get_vacoptval_from_boolean(DefElem *def);
 
+static bool vac_tid_reaped(ItemPointer itemptr, void *state);
+static int	vac_cmp_itemptr(const void *left, const void *right);
+
 /*
  * Primary entry point for manual VACUUM and ANALYZE commands
  *
@@ -2258,3 +2263,140 @@ get_vacoptval_from_boolean(DefElem *def)
 {
 	return defGetBoolean(def) ? VACOPTVALUE_ENABLED : VACOPTVALUE_DISABLED;
 }
+
+/*
+ *	lazy_vacuum_one_index() -- vacuum index relation.
+ *
+ *		Delete all the index entries pointing to tuples listed in
+ *		dead_tuples, and update running statistics.
+ *
+ *		reltuples is the number of heap tuples to be passed to the
+ *		bulkdelete callback.  It's always assumed to be estimated.
+ *
+ * Returns bulk delete stats derived from input stats
+ */
+IndexBulkDeleteResult *
+vacuum_one_index(IndexVacuumInfo *ivinfo, IndexBulkDeleteResult *istat,
+				 VacDeadTuples *dead_tuples)
+{
+	PGRUsage	ru0;
+
+	pg_rusage_init(&ru0);
+
+	/* Do bulk deletion */
+	istat = index_bulk_delete(ivinfo, istat, vac_tid_reaped,
+							  (void *) dead_tuples);
+
+	ereport(ivinfo->message_level,
+			(errmsg("scanned index \"%s\" to remove %d row versions",
+					RelationGetRelationName(ivinfo->index),
+					dead_tuples->num_tuples),
+			 errdetail_internal("%s", pg_rusage_show(&ru0))));
+
+	return istat;
+}
+
+/*
+ *	lazy_cleanup_one_index() -- do post-vacuum cleanup for index relation.
+ *
+ *		reltuples is the number of heap tuples and estimated_count is true
+ *		if reltuples is an estimated value.
+ *
+ * Returns bulk delete stats derived from input stats
+ */
+IndexBulkDeleteResult *
+cleanup_one_index(IndexVacuumInfo *ivinfo, IndexBulkDeleteResult *istat)
+{
+	PGRUsage	ru0;
+
+	pg_rusage_init(&ru0);
+
+	istat = index_vacuum_cleanup(ivinfo, istat);
+
+	if (istat)
+	{
+		ereport(ivinfo->message_level,
+				(errmsg("index \"%s\" now contains %.0f row versions in %u pages",
+						RelationGetRelationName(ivinfo->index),
+						istat->num_index_tuples,
+						istat->num_pages),
+				 errdetail("%.0f index row versions were removed.\n"
+						   "%u index pages were newly deleted.\n"
+						   "%u index pages are currently deleted, of which %u are currently reusable.\n"
+						   "%s.",
+						   istat->tuples_removed,
+						   istat->pages_newly_deleted,
+						   istat->pages_deleted, istat->pages_free,
+						   pg_rusage_show(&ru0))));
+	}
+
+	return istat;
+}
+
+/*
+ *	lazy_tid_reaped() -- is a particular tid deletable?
+ *
+ *		This has the right signature to be an IndexBulkDeleteCallback.
+ *
+ *		Assumes dead_tuples array is in sorted order.
+ */
+static bool
+vac_tid_reaped(ItemPointer itemptr, void *state)
+{
+	VacDeadTuples *dead_tuples = (VacDeadTuples *) state;
+	int64		litem,
+				ritem,
+				item;
+	ItemPointer res;
+
+	litem = itemptr_encode(&dead_tuples->itemptrs[0]);
+	ritem = itemptr_encode(&dead_tuples->itemptrs[dead_tuples->num_tuples - 1]);
+	item = itemptr_encode(itemptr);
+
+	/*
+	 * Doing a simple bound check before bsearch() is useful to avoid the
+	 * extra cost of bsearch(), especially if dead tuples on the heap are
+	 * concentrated in a certain range.  Since this function is called for
+	 * every index tuple, it pays to be really fast.
+	 */
+	if (item < litem || item > ritem)
+		return false;
+
+	res = (ItemPointer) bsearch((void *) itemptr,
+								(void *) dead_tuples->itemptrs,
+								dead_tuples->num_tuples,
+								sizeof(ItemPointerData),
+								vac_cmp_itemptr);
+
+	return (res != NULL);
+}
+
+/*
+ * Comparator routines for use with qsort() and bsearch().
+ */
+static int
+vac_cmp_itemptr(const void *left, const void *right)
+{
+	BlockNumber lblk,
+				rblk;
+	OffsetNumber loff,
+				roff;
+
+	lblk = ItemPointerGetBlockNumber((ItemPointer) left);
+	rblk = ItemPointerGetBlockNumber((ItemPointer) right);
+
+	if (lblk < rblk)
+		return -1;
+	if (lblk > rblk)
+		return 1;
+
+	loff = ItemPointerGetOffsetNumber((ItemPointer) left);
+	roff = ItemPointerGetOffsetNumber((ItemPointer) right);
+
+	if (loff < roff)
+		return -1;
+	if (loff > roff)
+		return 1;
+
+	return 0;
+}
diff --git a/src/backend/commands/vacuumparallel.c b/src/backend/commands/vacuumparallel.c
new file mode 100644
index 0000000000..165fa28be4
--- /dev/null
+++ b/src/backend/commands/vacuumparallel.c
@@ -0,0 +1,1042 @@
+/*-------------------------------------------------------------------------
+ *
+ * vacuumparallel.c
+ *	  Support routines for parallel vacuum execution.
+ *
+ * This file contains routines that are intended to support setting up, using
+ * and tearing down a ParallelVacuumContext.
+ *
+ * In a parallel vacuum, we perform both index bulk-deletion and index cleanup
+ * with parallel worker processes.  Individual indexes are processed by one
+ * vacuum process.  ParalleVacuumContext contains shared information as well
+ * as the memory space for storing dead tuples allocated in the DSM segment.
+ * When starting either parallel index bulk-deletion or index cleanup, we
+ * launch parallel worker processes.  Once all index are processed, the
+ * parallel worker processes exit.  In the next time, the parallel context
+ * is re-initialized so that the same DSM can be used for multiple passes of
+ * index bulk-deletion and index cleanup.  At the end of a parallel vacuum,
+ * ParallelVacuumContext is destroyed while returning index statistics so
+ * that we can update them after exiting from the parallel mode.
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ *	  src/backend/commands/vacuumparallel.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/amapi.h"
+#include "access/genam.h"
+#include "access/parallel.h"
+#include "access/table.h"
+#include "access/transam.h"
+#include "access/xact.h"
+#include "catalog/index.h"
+#include "commands/vacuum.h"
+#include "miscadmin.h"
+#include "optimizer/paths.h"
+#include "pgstat.h"
+#include "storage/bufmgr.h"
+#include "storage/lmgr.h"
+#include "tcop/tcopprot.h"
+#include "utils/elog.h"
+#include "utils/rel.h"
+#include "utils/lsyscache.h"
+#include "utils/memutils.h"
+
+/*
+ * DSM keys for parallel vacuum.  Unlike other parallel execution code, since
+ * we don't need to worry about DSM keys conflicting with plan_node_id we can
+ * use small integers.
+ */
+#define PARALLEL_VACUUM_KEY_SHARED			1
+#define PARALLEL_VACUUM_KEY_DEAD_TUPLES		2
+#define PARALLEL_VACUUM_KEY_QUERY_TEXT		3
+#define PARALLEL_VACUUM_KEY_BUFFER_USAGE	4
+#define PARALLEL_VACUUM_KEY_WAL_USAGE		5
+#define PARALLEL_VACUUM_KEY_INDEX_STATS		6
+
+/*
+ * Shared information among parallel workers.  So this is allocated in the DSM
+ * segment.
+ */
+typedef struct PVShared
+{
+	/*
+	 * Target table relid and log level.  These fields are not modified during
+	 * the lazy vacuum.
+	 */
+	Oid			relid;
+	int			elevel;
+
+	/*
+	 * Fields for both index vacuum and cleanup.
+	 *
+	 * reltuples is the total number of input heap tuples.  We set either old
+	 * live tuples in the index vacuum case or the new live tuples in the
+	 * index cleanup case.
+	 *
+	 * estimated_count is true if reltuples is an estimated value.  (Note that
+	 * reltuples could be -1 in this case, indicating we have no idea.)
+	 */
+	double		num_table_tuples;
+	bool		estimated_count;
+
+	/*
+	 * In single process lazy vacuum we could consume more memory during index
+	 * vacuuming or cleanup apart from the memory for heap scanning.  In
+	 * parallel vacuum, since individual vacuum workers can consume memory
+	 * equal to maintenance_work_mem, the new maintenance_work_mem for each
+	 * worker is set such that the parallel operation doesn't consume more
+	 * memory than single process lazy vacuum.
+	 */
+	int			maintenance_work_mem_worker;
+
+	/*
+	 * Shared vacuum cost balance.  During parallel vacuum,
+	 * VacuumSharedCostBalance points to this value and it accumulates the
+	 * balance of each parallel vacuum worker.
+	 */
+	pg_atomic_uint32 cost_balance;
+
+	/*
+	 * Number of active parallel workers.  This is used for computing the
+	 * minimum threshold of the vacuum cost balance before a worker sleeps for
+	 * cost-based delay.
+	 */
+	pg_atomic_uint32 active_nworkers;
+
+	/* Counter for vacuuming and cleanup */
+	pg_atomic_uint32 idx;
+} PVShared;
+
+/* Status used during parallel index vacuum or cleanup */
+typedef enum PVIndVacStatus
+{
+	INDVAC_STATUS_INITIAL = 0,
+	INDVAC_STATUS_NEED_BULKDELETE,
+	INDVAC_STATUS_NEED_CLEANUP,
+	INDVAC_STATUS_COMPLETED,
+} PVIndVacStatus;
+
+/*
+ * Struct for an index bulk-deletion statistic used for parallel vacuum.  This
+ * is allocated in the DSM segment.
+ */
+typedef struct PVIndStats
+{
+	/*
+	 * The following two fields are set by leader process before executing
+	 * parallel index vacuum or parallel index cleanup.
+	 *
+	 * parallel_workers_can_process is true if both leader and worker can
+	 * process the index, otherwise only leader can process it.  This value
+	 * is not a fixed for the entire VACUUM operation.  It is only fixed for
+	 * an individual parallel index vacuum and cleanup.
+	 */
+	PVIndVacStatus status;
+	bool	parallel_workers_can_process;
+
+	/*
+	 * Individual worker or leader stores the result of index vacuum or
+	 * cleanup.
+	 */
+	bool	istat_updated;		/* are the stats updated? */
+	IndexBulkDeleteResult istat;
+} PVIndStats;
+
+/*
+ * Struct for a parallel index vacuum. */
+typedef struct PVState
+{
+	/* Target indexes */
+	Relation	*indrels;
+	int			nindexes;
+
+	PVIndStats	*indstats;
+	PVShared	*shared;
+	VacDeadTuples *dead_tuples;
+	BufferAccessStrategy bstrategy;
+
+	/* Error reporting state */
+	char	*relnamespace;
+	char	*relname;
+	char	*indname;
+	PVIndVacStatus status;
+} PVState;
+
+/* Struct for maintaining a parallel vacuum state. */
+typedef struct ParallelVacuumContext
+{
+	ParallelContext *pcxt;
+
+	/* Target indexes */
+	Relation	*indrels;
+	int			nindexes;
+
+	/* Shared information among parallel vacuum workers */
+	PVShared   *shared;
+
+	/* Shared index statistics among parallel vacuum workers */
+	PVIndStats *indstats;
+
+	/* Shared dead tuple space among parallel vacuum workers */
+	VacDeadTuples	*dead_tuples;
+
+	/* Points to buffer usage area in DSM */
+	BufferUsage *buffer_usage;
+
+	/* Points to WAL usage area in DSM */
+	WalUsage   *wal_usage;
+
+	/*
+	 * The number of indexes that support parallel index bulk-deletion and
+	 * parallel index cleanup respectively.
+	 */
+	int			nindexes_parallel_bulkdel;
+	int			nindexes_parallel_cleanup;
+	int			nindexes_parallel_condcleanup;
+
+	/* Incremented by each bulkdel or cleanup */
+	int			num_index_scans;
+
+	/* Buffer access strategy used by leader process */
+	BufferAccessStrategy bstrategy;
+} ParallelVacuumContext;
+
+static int compute_parallel_vacuum_workers(Relation *indrels, int nindexes,
+										   int nrequested);
+static void set_parallel_vacuum_index_status(ParallelVacuumContext *pvc,
+											 bool bulkdel);
+static void parallel_vacuum_all_indexes(ParallelVacuumContext *pvc, bool bulkdel);
+static void parallel_vacuum_indexes(PVState *pvstate);
+static void serial_vacuum_unsafe_indexes(PVState *pvstate);
+static void parallel_vacuum_one_index(PVState *pvstate, Relation indrel,
+									  PVIndStats *stats);
+static bool index_parallel_vacuum_is_safe(Relation indrel, int num_index_scans,
+											  bool bulkdel);
+static void parallel_index_vacuum_error_callback(void *arg);
+
+/*
+ * This function prepares and returns parallel vacuum context if we can launch
+ * even one worker.  This function is responsible for entering parallel mode,
+ * create a parallel context, and then initialize the DSM segment.
+ */
+ParallelVacuumContext *
+begin_parallel_vacuum(ParallelVacuumCtl *pvctl)
+{
+	ParallelVacuumContext *pvc = NULL;
+	ParallelContext *pcxt;
+	PVIndStats	*indstats;
+	PVShared   	*shared;
+	VacDeadTuples	*dead_tuples;
+	BufferUsage		*buffer_usage;
+	WalUsage   *wal_usage;
+	Size		est_indstats = 0;
+	Size		est_shared = 0;
+	Size		est_deadtuples = 0;
+	int			nindexes_mwm = 0;
+	int			parallel_workers = 0;
+	int			querylen;
+
+	/*
+	 * A parallel vacuum must be requested and there must be indexes on the
+	 * relation
+	 */
+	Assert(pvctl);
+	Assert(pvctl->nrequested_workers >= 0);
+	Assert(pvctl->nindexes > 0);
+
+	/*
+	 * Compute the number of parallel vacuum workers to launch
+	 */
+	parallel_workers = compute_parallel_vacuum_workers(pvctl->indrels,
+													   pvctl->nindexes,
+													   pvctl->nrequested_workers);
+
+	/* Can't perform vacuum in parallel */
+	if (parallel_workers <= 0)
+		return pvc;
+
+	pvc = (ParallelVacuumContext *) palloc0(sizeof(ParallelVacuumContext));
+	pvc->indrels = pvctl->indrels;
+	pvc->nindexes = pvctl->nindexes;
+	pvc->bstrategy = pvctl->bstrategy;
+
+	EnterParallelMode();
+	pcxt = CreateParallelContext("postgres", "parallel_vacuum_main",
+								 parallel_workers);
+	Assert(pcxt->nworkers > 0);
+	pvc->pcxt = pcxt;
+
+	/* Estimate size for index vacuum stats -- PARALLEL_VACUUM_KEY_STATS */
+	est_indstats = mul_size(sizeof(PVIndStats), pvctl->nindexes);
+	shm_toc_estimate_chunk(&pcxt->estimator, est_indstats);
+	shm_toc_estimate_keys(&pcxt->estimator, 1);
+
+	/* Estimate size for shared information -- PARALLEL_VACUUM_KEY_SHARED */
+	est_shared = MAXALIGN(sizeof(PVShared));
+	shm_toc_estimate_chunk(&pcxt->estimator, est_shared);
+	shm_toc_estimate_keys(&pcxt->estimator, 1);
+
+	/* Estimate size for dead tuples -- PARALLEL_VACUUM_KEY_DEAD_TUPLES */
+	est_deadtuples = MAXALIGN(SizeOfDeadTuples(pvctl->maxtuples));
+	shm_toc_estimate_chunk(&pcxt->estimator, est_deadtuples);
+	shm_toc_estimate_keys(&pcxt->estimator, 1);
+
+	/*
+	 * Estimate space for BufferUsage and WalUsage --
+	 * PARALLEL_VACUUM_KEY_BUFFER_USAGE and PARALLEL_VACUUM_KEY_WAL_USAGE.
+	 *
+	 * If there are no extensions loaded that care, we could skip this.  We
+	 * have no way of knowing whether anyone's looking at pgBufferUsage or
+	 * pgWalUsage, so do it unconditionally.
+	 */
+	shm_toc_estimate_chunk(&pcxt->estimator,
+						   mul_size(sizeof(BufferUsage), pcxt->nworkers));
+	shm_toc_estimate_keys(&pcxt->estimator, 1);
+	shm_toc_estimate_chunk(&pcxt->estimator,
+						   mul_size(sizeof(WalUsage), pcxt->nworkers));
+	shm_toc_estimate_keys(&pcxt->estimator, 1);
+
+	/* Finally, estimate PARALLEL_VACUUM_KEY_QUERY_TEXT space */
+	if (debug_query_string)
+	{
+		querylen = strlen(debug_query_string);
+		shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1);
+		shm_toc_estimate_keys(&pcxt->estimator, 1);
+	}
+	else
+		querylen = 0;			/* keep compiler quiet */
+
+	InitializeParallelDSM(pcxt);
+
+	/* Prepare index vacuum stats */
+	indstats = (PVIndStats *) shm_toc_allocate(pcxt->toc, est_indstats);
+	MemSet(indstats, 0, est_indstats);
+	for (int i = 0; i < pvctl->nindexes; i++)
+	{
+		Relation	indrel = pvctl->indrels[i];
+		uint8		vacoptions = indrel->rd_indam->amparallelvacuumoptions;
+
+		/*
+		 * Cleanup option should be either disabled, always performing in
+		 * parallel or conditionally performing in parallel.
+		 */
+		Assert(((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) ||
+			   ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0));
+		Assert(vacoptions <= VACUUM_OPTION_MAX_VALID_VALUE);
+
+		/* Skip indexes that don't participate in parallel vacuum */
+		if (vacoptions == VACUUM_OPTION_NO_PARALLEL ||
+			RelationGetNumberOfBlocks(indrel) < min_parallel_index_scan_size)
+			continue;
+
+		if (indrel->rd_indam->amusemaintenanceworkmem)
+			nindexes_mwm++;
+
+		/*
+		 * Remember the number of indexes that support parallel operation for
+		 * each phase.
+		 */
+		if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0)
+			pvc->nindexes_parallel_bulkdel++;
+		if ((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0)
+			pvc->nindexes_parallel_cleanup++;
+		if ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0)
+			pvc->nindexes_parallel_condcleanup++;
+	}
+
+	shm_toc_insert(pcxt->toc,PARALLEL_VACUUM_KEY_INDEX_STATS, indstats);
+	pvc->indstats = indstats;
+
+	/* Prepare shared information */
+	shared = (PVShared *) shm_toc_allocate(pcxt->toc, est_shared);
+	MemSet(shared, 0, est_shared);
+	shared->relid = RelationGetRelid(pvctl->rel);
+	shared->elevel = pvctl->elevel;
+	shared->maintenance_work_mem_worker =
+		(nindexes_mwm > 0) ?
+		maintenance_work_mem / Min(parallel_workers, nindexes_mwm) :
+		maintenance_work_mem;
+
+	pg_atomic_init_u32(&(shared->cost_balance), 0);
+	pg_atomic_init_u32(&(shared->active_nworkers), 0);
+	pg_atomic_init_u32(&(shared->idx), 0);
+
+	shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_SHARED, shared);
+	pvc->shared = shared;
+
+	/* Prepare the dead tuple space */
+	dead_tuples = (VacDeadTuples *) shm_toc_allocate(pcxt->toc, est_deadtuples);
+	dead_tuples->max_tuples = pvctl->maxtuples;
+	dead_tuples->num_tuples = 0;
+	MemSet(dead_tuples->itemptrs, 0,
+		   sizeof(ItemPointerData) * pvctl->maxtuples);
+	shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_DEAD_TUPLES, dead_tuples);
+	pvc->dead_tuples = dead_tuples;
+
+	/*
+	 * Allocate space for each worker's BufferUsage and WalUsage; no need to
+	 * initialize
+	 */
+	buffer_usage = shm_toc_allocate(pcxt->toc,
+									mul_size(sizeof(BufferUsage), pcxt->nworkers));
+	shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, buffer_usage);
+	pvc->buffer_usage = buffer_usage;
+	wal_usage = shm_toc_allocate(pcxt->toc,
+								 mul_size(sizeof(WalUsage), pcxt->nworkers));
+	shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_WAL_USAGE, wal_usage);
+	pvc->wal_usage = wal_usage;
+
+	/* Store query string for workers */
+	if (debug_query_string)
+	{
+		char	   *sharedquery;
+
+		sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1);
+		memcpy(sharedquery, debug_query_string, querylen + 1);
+		sharedquery[querylen] = '\0';
+		shm_toc_insert(pcxt->toc,
+					   PARALLEL_VACUUM_KEY_QUERY_TEXT, sharedquery);
+	}
+
+	return pvc;
+}
+
+/*
+ * Destroy the parallel context, and end parallel mode.
+ *
+ * Since writes are not allowed during parallel mode, copy updated index
+ * statistics from DSM into local memory so that the caller uses that to
+ * update the index statistics.  One might think that we can exit from
+ * parallel mode, update the index statistics and then destroy parallel
+ * context, but that won't be safe (see ExitParallelMode).
+ */
+void
+end_parallel_vacuum(ParallelVacuumContext *pvc, IndexBulkDeleteResult **indstats)
+{
+	/* Copy the updated statistics */
+	for (int i = 0; i < pvc->nindexes; i++)
+	{
+		PVIndStats *stats = &(pvc->indstats[i]);
+
+		if (stats->istat_updated)
+		{
+			indstats[i] = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
+			memcpy(indstats[i], &stats->istat, sizeof(IndexBulkDeleteResult));
+		}
+		else
+			indstats[i] = NULL;
+	}
+
+	DestroyParallelContext(pvc->pcxt);
+	ExitParallelMode();
+
+	pfree(pvc);
+}
+
+/* Returns the dead tuple space */
+VacDeadTuples *
+get_vacuum_dead_tuples(ParallelVacuumContext *pvc)
+{
+	return pvc->dead_tuples;
+}
+
+/*
+ * Do parallel index bulk-deletion with parallel workers.
+ */
+void
+perform_parallel_index_bulkdel(ParallelVacuumContext *pvc, long num_table_tuples)
+{
+	/*
+	 * We can only provide an approximate value of num_heap_tuples, at least
+	 * for now.  Matches serial VACUUM case.
+	 */
+	pvc->shared->num_table_tuples = num_table_tuples;
+	pvc->shared->estimated_count = true;
+
+	parallel_vacuum_all_indexes(pvc, true);
+}
+
+/*
+ * Do parallel index cleanup with parallel workers.
+ */
+void
+perform_parallel_index_cleanup(ParallelVacuumContext *pvc, long num_table_tuples,
+							   bool estimated_count)
+{
+	/*
+	 * We can provide a better estimate of total number of surviving
+	 * tuples (we assume indexes are more interested in that than in the
+	 * number of nominally live tuples).
+	 */
+	pvc->shared->num_table_tuples = num_table_tuples;
+	pvc->shared->estimated_count = estimated_count;
+
+	parallel_vacuum_all_indexes(pvc, false);
+}
+
+/*
+ * Perform work within a launched parallel process.
+ *
+ * Since parallel vacuum workers perform only index vacuum or index cleanup,
+ * we don't need to report progress information.
+ */
+void
+parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
+{
+	Relation	rel;
+	Relation   *indrels;
+	PVIndStats *indstats;
+	PVShared   *shared;
+	VacDeadTuples	*dead_tuples;
+	BufferUsage *buffer_usage;
+	WalUsage   *wal_usage;
+	PVState		pvstate;
+	int			nindexes;
+	char	   *sharedquery;
+	ErrorContextCallback errcallback;
+
+	shared = (PVShared *) shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_SHARED,
+										   false);
+	elog(DEBUG1, "starting parallel vacuum worker");
+
+	/* Set debug_query_string for individual workers */
+	sharedquery = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_QUERY_TEXT, true);
+	debug_query_string = sharedquery;
+	pgstat_report_activity(STATE_RUNNING, debug_query_string);
+
+	/*
+	 * Open table.  The lock mode is the same as the leader process.  It's
+	 * okay because the lock mode does not conflict among the parallel
+	 * workers.
+	 */
+	rel = table_open(shared->relid, ShareUpdateExclusiveLock);
+
+	/*
+	 * Open all indexes. indrels are sorted in order by OID, which should be
+	 * matched to the leader's one.
+	 */
+	vac_open_indexes(rel, RowExclusiveLock, &nindexes, &indrels);
+	Assert(nindexes > 0);
+
+	/* Set index statistics */
+	indstats = (PVIndStats *) shm_toc_lookup(toc,
+											 PARALLEL_VACUUM_KEY_INDEX_STATS,
+											 false);
+
+	/* Set dead tuple space */
+	dead_tuples = (VacDeadTuples *) shm_toc_lookup(toc,
+												   PARALLEL_VACUUM_KEY_DEAD_TUPLES,
+												   false);
+
+	/* Set cost-based vacuum delay */
+	VacuumCostActive = (VacuumCostDelay > 0);
+	VacuumCostBalance = 0;
+	VacuumPageHit = 0;
+	VacuumPageMiss = 0;
+	VacuumPageDirty = 0;
+	VacuumCostBalanceLocal = 0;
+	VacuumSharedCostBalance = &(shared->cost_balance);
+	VacuumActiveNWorkers = &(shared->active_nworkers);
+
+	if (shared->maintenance_work_mem_worker > 0)
+		maintenance_work_mem = shared->maintenance_work_mem_worker;
+
+	/* Parallel vacuum state and the error callback arg */
+	pvstate.indrels = indrels;
+	pvstate.nindexes = nindexes;
+	pvstate.indstats = indstats;
+	pvstate.shared = shared;
+	pvstate.dead_tuples = dead_tuples;
+	pvstate.bstrategy = GetAccessStrategy(BAS_VACUUM);
+	pvstate.relnamespace = get_namespace_name(RelationGetNamespace(rel));
+	pvstate.relname = pstrdup(RelationGetRelationName(rel));
+	pvstate.indname = NULL;		/* filled later during index vacuuming */
+
+	/* Setup error traceback support for ereport() */
+	errcallback.callback = parallel_index_vacuum_error_callback;
+	errcallback.arg = &pvstate;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* Prepare to track buffer usage during parallel execution */
+	InstrStartParallelQuery();
+
+	/* Process indexes to perform bulk-deletion/cleanup */
+	parallel_vacuum_indexes(&pvstate);
+
+	/* Report buffer/WAL usage during parallel execution */
+	buffer_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, false);
+	wal_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_WAL_USAGE, false);
+	InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber],
+						  &wal_usage[ParallelWorkerNumber]);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+
+	vac_close_indexes(nindexes, indrels, RowExclusiveLock);
+	table_close(rel, ShareUpdateExclusiveLock);
+	FreeAccessStrategy(pvstate.bstrategy);
+}
+
+/*
+ * Compute the number of parallel worker processes to request.  Both index
+ * vacuum and index cleanup can be executed with parallel workers.  The index
+ * is eligible for parallel vacuum iff its size is greater than
+ * min_parallel_index_scan_size as invoking workers for very small indexes
+ * can hurt performance.
+ *
+ * nrequested is the number of parallel workers that user requested.  If
+ * nrequested is 0, we compute the parallel degree based on nindexes, that is
+ * the number of indexes that support parallel vacuum.
+ */
+static int
+compute_parallel_vacuum_workers(Relation *indrels, int nindexes, int nrequested)
+{
+	int			nindexes_parallel = 0;
+	int			nindexes_parallel_bulkdel = 0;
+	int			nindexes_parallel_cleanup = 0;
+	int			parallel_workers;
+
+	/*
+	 * We don't allow performing parallel operation in standalone backend or
+	 * when parallelism is disabled.
+	 */
+	if (!IsUnderPostmaster || max_parallel_maintenance_workers == 0)
+		return 0;
+
+	for (int i = 0; i < nindexes; i++)
+	{
+		Relation	indrel = indrels[i];
+		uint8		vacoptions = indrel->rd_indam->amparallelvacuumoptions;
+
+		if (vacoptions == VACUUM_OPTION_NO_PARALLEL ||
+			RelationGetNumberOfBlocks(indrel) < min_parallel_index_scan_size)
+			continue;
+
+		if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0)
+			nindexes_parallel_bulkdel++;
+		if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0) ||
+			((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0))
+			nindexes_parallel_cleanup++;
+	}
+
+	nindexes_parallel = Max(nindexes_parallel_bulkdel,
+							nindexes_parallel_cleanup);
+
+	/* The leader process takes one index */
+	nindexes_parallel--;
+
+	/* No index supports parallel vacuum */
+	if (nindexes_parallel <= 0)
+		return 0;
+
+	/* Compute the parallel degree */
+	parallel_workers = (nrequested > 0) ?
+		Min(nrequested, nindexes_parallel) : nindexes_parallel;
+
+	/* Cap by max_parallel_maintenance_workers */
+	parallel_workers = Min(parallel_workers, max_parallel_maintenance_workers);
+
+	return parallel_workers;
+}
+
+static void
+set_parallel_vacuum_index_status(ParallelVacuumContext *pvc, bool bulkdel)
+{
+	PVIndVacStatus new_status = bulkdel
+		? INDVAC_STATUS_NEED_BULKDELETE
+		: INDVAC_STATUS_NEED_CLEANUP;
+
+	/* Set index vacuum status and mark as parallel safe or not */
+	for (int i = 0; i < pvc->nindexes; i++)
+	{
+		PVIndStats *stats = &(pvc->indstats[i]);
+
+		Assert(stats->status == INDVAC_STATUS_INITIAL);
+
+		stats->status = new_status;
+		stats->parallel_workers_can_process =
+			index_parallel_vacuum_is_safe(pvc->indrels[i],
+										  pvc->num_index_scans,
+										  bulkdel);
+	}
+}
+
+/*
+ * Perform index vacuum or index cleanup with parallel workers.  This function
+ * must be used by the parallel vacuum leader process.
+ */
+static void
+parallel_vacuum_all_indexes(ParallelVacuumContext *pvc, bool bulkdel)
+{
+	int	nworkers;
+	PVState pvstate;
+
+	/* Determine the number of parallel workers to launch */
+	if (bulkdel)
+		nworkers = pvc->nindexes_parallel_bulkdel;
+	else
+	{
+		nworkers = pvc->nindexes_parallel_cleanup;
+
+		/* Add conditionally parallel-aware indexes if in the first time call */
+		if (pvc->num_index_scans == 0)
+			nworkers += pvc->nindexes_parallel_condcleanup;
+	}
+
+	/* The leader process will participate */
+	nworkers--;
+
+	/*
+	 * It is possible that parallel context is initialized with fewer workers
+	 * than the number of indexes that need a separate worker in the current
+	 * phase, so we need to consider it.  See compute_parallel_vacuum_workers.
+	 */
+	nworkers = Min(nworkers, pvc->pcxt->nworkers);
+
+	/* Reset the parallel index processing counter */
+	pg_atomic_write_u32(&(pvc->shared->idx), 0);
+
+	set_parallel_vacuum_index_status(pvc, bulkdel);
+
+	/* Setup the shared cost-based vacuum delay and launch workers */
+	if (nworkers > 0)
+	{
+		/* Reinitialize the parallel context to relaunch parallel workers */
+		if (pvc->num_index_scans > 0)
+			ReinitializeParallelDSM(pvc->pcxt);
+
+		/*
+		 * Set up shared cost balance and the number of active workers for
+		 * vacuum delay.  We need to do this before launching workers as
+		 * otherwise, they might not see the updated values for these
+		 * parameters.
+		 */
+		pg_atomic_write_u32(&(pvc->shared->cost_balance), VacuumCostBalance);
+		pg_atomic_write_u32(&(pvc->shared->active_nworkers), 0);
+
+		/*
+		 * The number of workers can vary between bulkdelete and cleanup
+		 * phase.
+		 */
+		ReinitializeParallelWorkers(pvc->pcxt, nworkers);
+
+		LaunchParallelWorkers(pvc->pcxt);
+
+		if (pvc->pcxt->nworkers_launched > 0)
+		{
+			/*
+			 * Reset the local cost values for leader backend as we have
+			 * already accumulated the remaining balance of heap.
+			 */
+			VacuumCostBalance = 0;
+			VacuumCostBalanceLocal = 0;
+
+			/* Enable shared cost balance for leader backend */
+			VacuumSharedCostBalance = &(pvc->shared->cost_balance);
+			VacuumActiveNWorkers = &(pvc->shared->active_nworkers);
+		}
+
+		if (bulkdel)
+			ereport(pvc->shared->elevel,
+					(errmsg(ngettext("launched %d parallel vacuum worker for index vacuuming (planned: %d)",
+									 "launched %d parallel vacuum workers for index vacuuming (planned: %d)",
+									 pvc->pcxt->nworkers_launched),
+							pvc->pcxt->nworkers_launched, nworkers)));
+		else
+			ereport(pvc->shared->elevel,
+					(errmsg(ngettext("launched %d parallel vacuum worker for index cleanup (planned: %d)",
+									 "launched %d parallel vacuum workers for index cleanup (planned: %d)",
+									 pvc->pcxt->nworkers_launched),
+							pvc->pcxt->nworkers_launched, nworkers)));
+	}
+
+	pvstate.indrels = pvc->indrels;
+	pvstate.nindexes = pvc->nindexes;
+	pvstate.indstats = pvc->indstats;
+	pvstate.shared = pvc->shared;
+	pvstate.dead_tuples = pvc->dead_tuples;
+	pvstate.bstrategy = pvc->bstrategy;
+
+	/* Process the indexes that can be processed by only leader process */
+	serial_vacuum_unsafe_indexes(&pvstate);
+
+	/*
+	 * Join as a parallel worker.  The leader process alone processes all
+	 * parallel-safe indexes in the case where no workers are launched.
+	 */
+	parallel_vacuum_indexes(&pvstate);
+
+	/*
+	 * Next, accumulate buffer and WAL usage.  (This must wait for the workers
+	 * to finish, or we might get incomplete data.)
+	 */
+	if (nworkers > 0)
+	{
+		/* Wait for all vacuum workers to finish */
+		WaitForParallelWorkersToFinish(pvc->pcxt);
+
+		for (int i = 0; i < pvc->pcxt->nworkers_launched; i++)
+			InstrAccumParallelQuery(&pvc->buffer_usage[i], &pvc->wal_usage[i]);
+	}
+
+	/*
+	 * Reset all index status back to invalid (while checking that we have
+	 * processed all indexes).
+	 */
+	for (int i = 0; i < pvc->nindexes; i++)
+	{
+		PVIndStats *stats = &(pvc->indstats[i]);
+
+		Assert(stats->status == INDVAC_STATUS_COMPLETED);
+		stats->status = INDVAC_STATUS_INITIAL;
+	}
+
+	/*
+	 * Carry the shared balance value to heap scan and disable shared costing
+	 */
+	if (VacuumSharedCostBalance)
+	{
+		VacuumCostBalance = pg_atomic_read_u32(VacuumSharedCostBalance);
+		VacuumSharedCostBalance = NULL;
+		VacuumActiveNWorkers = NULL;
+	}
+
+	/* Increment the count */
+	pvc->num_index_scans++;
+}
+
+
+/*
+ * Index vacuum/cleanup routine used by the leader process and parallel
+ * vacuum worker processes to process the indexes in parallel.
+ */
+static void
+parallel_vacuum_indexes(PVState *pvstate)
+{
+	/*
+	 * Increment the active worker count if we are able to launch any worker.
+	 */
+	if (VacuumActiveNWorkers)
+		pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1);
+
+	for (;;)
+	{
+		int idx;
+		PVIndStats *stats;
+
+		/* Get an index number to process */
+		idx = pg_atomic_fetch_add_u32(&(pvstate->shared->idx), 1);
+
+		/* Done for all indexes? */
+		if (idx >= pvstate->nindexes)
+			break;
+
+		stats = &(pvstate->indstats[idx]);
+
+		/*
+		 * Parallel unsafe indexes can be processed only by leader (these are
+		 * processed in lazy_serial_process_indexes() by leader.
+		 */
+		if (!stats->parallel_workers_can_process)
+			continue;
+
+		parallel_vacuum_one_index(pvstate, pvstate->indrels[idx], stats);
+	}
+
+	/*
+	 * We have completed the index vacuum so decrement the active worker
+	 * count.
+	 */
+	if (VacuumActiveNWorkers)
+		pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1);
+}
+
+/*
+ * Perform parallel processing of indexes in leader process.
+ *
+ * Handles index vacuuming (or index cleanup) for indexes that are not
+ * parallel safe.
+ *
+ * Also performs processing of smaller indexes that fell under the size cutoff
+ * enforced by compute_parallel_vacuum_workers().
+ */
+static void
+serial_vacuum_unsafe_indexes(PVState *pvstate)
+{
+	Assert(!IsParallelWorker());
+
+	/*
+	 * Increment the active worker count if we are able to launch any worker.
+	 */
+	if (VacuumActiveNWorkers)
+		pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1);
+
+	for (int i = 0; i < pvstate->nindexes; i++)
+	{
+		PVIndStats *stats = &(pvstate->indstats[i]);
+
+		/* Skip, safe indexes as they are processed by workers */
+		if (stats->parallel_workers_can_process)
+			continue;
+
+		parallel_vacuum_one_index(pvstate, pvstate->indrels[i], stats);
+	}
+
+	/*
+	 * We have completed the index vacuum so decrement the active worker
+	 * count.
+	 */
+	if (VacuumActiveNWorkers)
+		pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1);
+}
+
+/*
+ * Bulk-delete or cleanup index either by leader process or by one of the worker
+ * process.  After processing the index this function copies the index
+ * statistics returned from ambulkdelete and amvacuumcleanup to the DSM
+ * segment.
+ */
+static void
+parallel_vacuum_one_index(PVState *pvstate, Relation indrel, PVIndStats *stats)
+{
+	IndexBulkDeleteResult *istat = NULL;
+	IndexBulkDeleteResult *istat_res;
+	IndexVacuumInfo ivinfo;
+
+	/* Get the index statistics space, if already updated */
+	if (stats->istat_updated)
+		istat = &(stats->istat);
+
+	ivinfo.index = indrel;
+	ivinfo.analyze_only = false;
+	ivinfo.report_progress = false;
+	ivinfo.message_level = pvstate->shared->elevel;
+	ivinfo.estimated_count = pvstate->shared->estimated_count;
+	ivinfo.num_heap_tuples = pvstate->shared->num_table_tuples;
+	ivinfo.strategy = pvstate->bstrategy;
+
+	/* Update error traceback information */
+	pvstate->indname = pstrdup(RelationGetRelationName(indrel));
+	pvstate->status = stats->status;
+
+	switch (stats->status)
+	{
+		case INDVAC_STATUS_NEED_BULKDELETE:
+			istat_res = vacuum_one_index(&ivinfo, istat, pvstate->dead_tuples);
+			break;
+		case INDVAC_STATUS_NEED_CLEANUP:
+			istat_res = cleanup_one_index(&ivinfo, istat);
+			break;
+		default:
+			elog(ERROR, "unexpected parallel index vacuum status %d",
+				 stats->status);
+	}
+
+	/*
+	 * Copy the index bulk-deletion result returned from ambulkdelete and
+	 * amvacuumcleanup to the DSM segment if it's the first cycle because they
+	 * allocate locally and it's possible that an index will be vacuumed by a
+	 * different vacuum process the next cycle.  Copying the result normally
+	 * happens only the first time an index is vacuumed.  For any additional
+	 * vacuum pass, we directly point to the result on the DSM segment and
+	 * pass it to vacuum index APIs so that workers can update it directly.
+	 *
+	 * Since all vacuum workers write the bulk-deletion result at different
+	 * slots we can write them without locking.
+	 */
+	if (!stats->istat_updated && istat_res != NULL)
+	{
+		memcpy(&(stats->istat), istat_res, sizeof(IndexBulkDeleteResult));
+		stats->istat_updated = true;
+		pfree(istat_res);
+	}
+
+	/*
+	 * Update the status to completed. No need to lock here since each
+	 * worker touches different indexes.
+	 */
+	stats->status = INDVAC_STATUS_COMPLETED;
+
+	/* Reset error traceback information */
+	pfree(pvstate->indname);
+	pvstate->indname = NULL;
+	pvstate->status = INDVAC_STATUS_COMPLETED;
+}
+
+/*
+ * Returns false, if the given index can't participate in parallel index
+ * vacuum or parallel index cleanup.
+ */
+static bool
+index_parallel_vacuum_is_safe(Relation indrel, int num_index_scans,
+							  bool bulkdel)
+{
+	uint8	vacoptions = indrel->rd_indam->amparallelvacuumoptions;
+
+	/*
+	 * Check if the index is a totally unsuitable target for all parallel
+	 * processing up front. For example, the index could be
+	 * < min_parallel_index_scan_size cutoff.
+	 */
+	if (vacoptions == VACUUM_OPTION_NO_PARALLEL ||
+		RelationGetNumberOfBlocks(indrel) < min_parallel_index_scan_size)
+		return false;
+
+	/* In parallel vacuum case, check if it supports parallel bulk-deletion */
+	if (bulkdel)
+		return ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0);
+
+	/* Not safe, if the index does not support parallel cleanup */
+	if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) &&
+		((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0))
+		return false;
+
+	/*
+	 * Not safe, if the index supports parallel cleanup conditionally,
+	 * but we have already processed the index (for bulkdelete).  See the
+	 * comments for option VACUUM_OPTION_PARALLEL_COND_CLEANUP to know
+	 * when indexes support parallel cleanup conditionally.
+	 */
+	if (num_index_scans > 0 &&
+		((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0))
+		return false;
+
+	return true;
+}
+
+/*
+ * Error context callback for errors occurring during parallel index vacuum.
+ */
+static void
+parallel_index_vacuum_error_callback(void *arg)
+{
+	PVState *errinfo = arg;
+
+	switch (errinfo->status)
+	{
+		case INDVAC_STATUS_NEED_BULKDELETE:
+			errcontext("while parallelly vacuuming index \"%s\" of relation \"%s.%s\"",
+					   errinfo->indname,
+					   errinfo->relnamespace,
+					   errinfo->relname);
+			break;
+		case INDVAC_STATUS_NEED_CLEANUP:
+			errcontext("while parallelly cleanup index \"%s\" of relation \"%s.%s\"",
+					   errinfo->indname,
+					   errinfo->relnamespace,
+					   errinfo->relname);
+			break;
+		case INDVAC_STATUS_INITIAL:
+		case INDVAC_STATUS_COMPLETED:
+		default:
+			break;
+	}
+}
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index e63b49fc38..4a8022fee7 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -198,7 +198,6 @@ extern void heap_get_root_tuples(Page page, OffsetNumber *root_offsets);
 struct VacuumParams;
 extern void heap_vacuum_rel(Relation rel,
 							struct VacuumParams *params, BufferAccessStrategy bstrategy);
-extern void parallel_vacuum_main(dsm_segment *seg, shm_toc *toc);
 
 /* in heap/heapam_visibility.c */
 extern bool HeapTupleSatisfiesVisibility(HeapTuple stup, Snapshot snapshot,
diff --git a/src/include/commands/vacuum.h b/src/include/commands/vacuum.h
index 4cfd52eaf4..3dc9055715 100644
--- a/src/include/commands/vacuum.h
+++ b/src/include/commands/vacuum.h
@@ -15,6 +15,8 @@
 #define VACUUM_H
 
 #include "access/htup.h"
+#include "access/genam.h"
+#include "access/parallel.h"
 #include "catalog/pg_class.h"
 #include "catalog/pg_statistic.h"
 #include "catalog/pg_type.h"
@@ -62,6 +64,28 @@
 /* value for checking vacuum flags */
 #define VACUUM_OPTION_MAX_VALID_VALUE		((1 << 3) - 1)
 
+/* Actual bitmap representation is private to vacuumparallel.c */
+typedef struct ParallelVacuumContext ParallelVacuumContext;
+
+/* Parameter data structure for BeginParallelVacuum */
+typedef struct ParallelVacuumCtl
+{
+	/* Table and its indexes */
+	Relation	rel;
+	Relation	*indrels;
+	int			nindexes;
+
+	/* The number of workers requested to launch */
+	int		nrequested_workers;
+
+	/* The maximum dead tuples to store */
+	long	maxtuples;
+
+	/* Log level and the buffer access strategy */
+	int		elevel;
+	BufferAccessStrategy bstrategy;
+} ParallelVacuumCtl;
+
 /*----------
  * ANALYZE builds one of these structs for each attribute (column) that is
  * to be analyzed.  The struct and subsidiary data are in anl_context,
@@ -230,6 +254,28 @@ typedef struct VacuumParams
 	int			nworkers;
 } VacuumParams;
 
+/*
+ * VacDeadTuples stores the dead tuple TIDs collected during the heap scan.
+ * This is allocated in the DSM segment in parallel mode and in local memory
+ * in non-parallel mode.
+ */
+typedef struct VacDeadTuples
+{
+	int			max_tuples;		/* # slots allocated in array */
+	int			num_tuples;		/* current # of entries */
+	/* List of TIDs of tuples we intend to delete */
+	/* NB: this list is ordered by TID address */
+	ItemPointerData itemptrs[FLEXIBLE_ARRAY_MEMBER];	/* array of
+														 * ItemPointerData */
+} VacDeadTuples;
+
+/* The dead tuple space consists of LVDeadTuples and dead tuple TIDs */
+#define SizeOfDeadTuples(cnt) \
+	add_size(offsetof(VacDeadTuples, itemptrs), \
+			 mul_size(sizeof(ItemPointerData), cnt))
+#define MAXDEADTUPLES(max_size) \
+		(((max_size) - offsetof(VacDeadTuples, itemptrs)) / sizeof(ItemPointerData))
+
 /* GUC parameters */
 extern PGDLLIMPORT int default_statistics_target;	/* PGDLLIMPORT for PostGIS */
 extern int	vacuum_freeze_min_age;
@@ -282,6 +328,23 @@ extern bool vacuum_is_relation_owner(Oid relid, Form_pg_class reltuple,
 extern Relation vacuum_open_relation(Oid relid, RangeVar *relation,
 									 bits32 options, bool verbose,
 									 LOCKMODE lmode);
+extern IndexBulkDeleteResult *vacuum_one_index(IndexVacuumInfo *ivinfo,
+											   IndexBulkDeleteResult *istat,
+											   VacDeadTuples *dead_tuples);
+extern IndexBulkDeleteResult *cleanup_one_index(IndexVacuumInfo *ivinfo,
+												IndexBulkDeleteResult *istat);
+
+/* in commands/vacuumparallel.c */
+extern ParallelVacuumContext *begin_parallel_vacuum(ParallelVacuumCtl *pvctl);
+extern void end_parallel_vacuum(ParallelVacuumContext *pvc,
+								IndexBulkDeleteResult **indstats);
+extern VacDeadTuples *get_vacuum_dead_tuples(ParallelVacuumContext *pvc);
+extern void perform_parallel_index_bulkdel(ParallelVacuumContext *pvc,
+										   long num_table_tuples);
+extern void perform_parallel_index_cleanup(ParallelVacuumContext *pvc,
+											long num_table_tuples,
+											bool estimated_count);
+extern void parallel_vacuum_main(dsm_segment *seg, shm_toc *toc);
 
 /* in commands/analyze.c */
 extern void analyze_rel(Oid relid, RangeVar *relation,
diff --git a/src/test/regress/expected/vacuum_parallel.out b/src/test/regress/expected/vacuum_parallel.out
index ddf0ee544b..a7d8a801e0 100644
--- a/src/test/regress/expected/vacuum_parallel.out
+++ b/src/test/regress/expected/vacuum_parallel.out
@@ -45,5 +45,29 @@ VACUUM (PARALLEL 4, INDEX_CLEANUP ON) parallel_vacuum_table;
 INSERT INTO parallel_vacuum_table SELECT i FROM generate_series(1, 10000) i;
 RESET max_parallel_maintenance_workers;
 RESET min_parallel_index_scan_size;
+CREATE TABLE parallel_vacuum_table2 (a int, b int4[]) WITH (autovacuum_enabled = off);
+INSERT INTO parallel_vacuum_table2 SELECT g, ARRAY[1, 2, g] FROM generate_series(1, 10000) g;
+-- Create different types of indexes, i.g. having different parallelvacuumoptions.
+-- Also create a small index same as above.
+CREATE INDEX pv_bt_index ON parallel_vacuum_table2 USING btree (a);
+CREATE INDEX pv_hash_index ON parallel_vacuum_table2 USING hash (a);
+CREATE INDEX pv_gin_index ON parallel_vacuum_table2 USING gin (b);
+CREATE INDEX pv_brin_index ON parallel_vacuum_table2 USING brin (a);
+CREATE INDEX pv_small_index ON parallel_vacuum_table2 USING btree ((1));
+-- Parallel index vacuum for various types of indexes.
+DELETE FROM parallel_vacuum_table2;
+VACUUM (PARALLEL 4, INDEX_CLEANUP ON) parallel_vacuum_table2;
+-- Parallel index cleanup.
+VACUUM (PARALLEL 4, INDEX_CLEANUP ON) parallel_vacuum_table2;
+-- XXX: in order to execute index scan twice, we need about 200,000 garbage tuples
+-- with the minimum maintenance_work_mem.  However, it takes a long time to load.
+INSERT INTO parallel_vacuum_table2 SELECT g, ARRAY[1, 2, g] FROM generate_series(1, 200000) g;
+DELETE FROM parallel_vacuum_table2;
+SET maintenance_work_mem TO 1024;
+-- Parallel index vacuum for various types of indexes.
+VACUUM (PARALLEL 4, INDEX_CLEANUP ON) parallel_vacuum_table2;
+-- Parallel index cleanup.
+VACUUM (PARALLEL 4, INDEX_CLEANUP ON) parallel_vacuum_table2;
+RESET maintenance_work_mem;
 -- Deliberately don't drop table, to get further coverage from tools like
 -- pg_amcheck in some testing scenarios
diff --git a/src/test/regress/sql/vacuum_parallel.sql b/src/test/regress/sql/vacuum_parallel.sql
index 1d23f33e39..49f4f4ce6d 100644
--- a/src/test/regress/sql/vacuum_parallel.sql
+++ b/src/test/regress/sql/vacuum_parallel.sql
@@ -42,5 +42,40 @@ INSERT INTO parallel_vacuum_table SELECT i FROM generate_series(1, 10000) i;
 RESET max_parallel_maintenance_workers;
 RESET min_parallel_index_scan_size;
 
+CREATE TABLE parallel_vacuum_table2 (a int, b int4[]) WITH (autovacuum_enabled = off);
+INSERT INTO parallel_vacuum_table2 SELECT g, ARRAY[1, 2, g] FROM generate_series(1, 10000) g;
+
+-- Create different types of indexes, i.g. having different parallelvacuumoptions.
+-- Also create a small index same as above.
+CREATE INDEX pv_bt_index ON parallel_vacuum_table2 USING btree (a);
+CREATE INDEX pv_hash_index ON parallel_vacuum_table2 USING hash (a);
+CREATE INDEX pv_gin_index ON parallel_vacuum_table2 USING gin (b);
+CREATE INDEX pv_brin_index ON parallel_vacuum_table2 USING brin (a);
+CREATE INDEX pv_small_index ON parallel_vacuum_table2 USING btree ((1));
+
+
+-- Parallel index vacuum for various types of indexes.
+DELETE FROM parallel_vacuum_table2;
+VACUUM (PARALLEL 4, INDEX_CLEANUP ON) parallel_vacuum_table2;
+
+-- Parallel index cleanup.
+VACUUM (PARALLEL 4, INDEX_CLEANUP ON) parallel_vacuum_table2;
+
+-- XXX: in order to execute index scan twice, we need about 200,000 garbage tuples
+-- with the minimum maintenance_work_mem.  However, it takes a long time to load.
+INSERT INTO parallel_vacuum_table2 SELECT g, ARRAY[1, 2, g] FROM generate_series(1, 200000) g;
+
+DELETE FROM parallel_vacuum_table2;
+
+SET maintenance_work_mem TO 1024;
+
+-- Parallel index vacuum for various types of indexes.
+VACUUM (PARALLEL 4, INDEX_CLEANUP ON) parallel_vacuum_table2;
+
+-- Parallel index cleanup.
+VACUUM (PARALLEL 4, INDEX_CLEANUP ON) parallel_vacuum_table2;
+
+RESET maintenance_work_mem;
+
 -- Deliberately don't drop table, to get further coverage from tools like
 -- pg_amcheck in some testing scenarios
