diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index 6f8b1b7929..cf8c6614cd 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -2630,6 +2630,12 @@ static const TableAmRoutine heapam_methods = {
 	.relation_copy_data = heapam_relation_copy_data,
 	.relation_copy_for_cluster = heapam_relation_copy_for_cluster,
 	.relation_vacuum = heap_vacuum_rel,
+
+	.parallel_vacuum_compute_workers = heap_parallel_vacuum_compute_workers,
+	.parallel_vacuum_estimate = heap_parallel_vacuum_estimate,
+	.parallel_vacuum_initialize = heap_parallel_vacuum_initialize,
+	.parallel_vacuum_scan_worker = heap_parallel_vacuum_scan_worker,
+
 	.scan_analyze_next_block = heapam_scan_analyze_next_block,
 	.scan_analyze_next_tuple = heapam_scan_analyze_next_tuple,
 	.index_build_range_scan = heapam_index_build_range_scan,
diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c
index 3f88cf1e8e..ca44d04e66 100644
--- a/src/backend/access/heap/vacuumlazy.c
+++ b/src/backend/access/heap/vacuumlazy.c
@@ -49,6 +49,7 @@
 #include "common/int.h"
 #include "executor/instrument.h"
 #include "miscadmin.h"
+#include "optimizer/paths.h"
 #include "pgstat.h"
 #include "portability/instr_time.h"
 #include "postmaster/autovacuum.h"
@@ -117,10 +118,22 @@
 #define PREFETCH_SIZE			((BlockNumber) 32)
 
 /*
- * Macro to check if we are in a parallel vacuum.  If true, we are in the
- * parallel mode and the DSM segment is initialized.
+ * DSM keys for heap parallel vacuum scan. Unlike other parallel execution code, we
+ * we don't need to worry about DSM keys conflicting with plan_node_id, but need to
+ * avoid conflicting with DSM keys used in vacuumparallel.c.
+ */
+#define LV_PARALLEL_SCAN_SHARED			0xFFFF0001
+#define LV_PARALLEL_SCAN_DESC			0xFFFF0002
+#define LV_PARALLEL_SCAN_DESC_WORKER	0xFFFF0003
+
+/*
+ * Macro to check if we are in a parallel vacuum.  If ParallelVacuumIsActive() is
+ * true, we are in the parallel mode, meaning that we do either parallel index
+ * vacuuming or parallel table vacuuming, or both. If ParallelHeapVacuumIsActive()
+ * is true, we do at least parallel table vacuuming.
  */
 #define ParallelVacuumIsActive(vacrel) ((vacrel)->pvs != NULL)
+#define ParallelHeapVacuumIsActive(vacrel) ((vacrel)->phvstate != NULL)
 
 /* Phases of vacuum during which we report error context. */
 typedef enum
@@ -133,6 +146,80 @@ typedef enum
 	VACUUM_ERRCB_PHASE_TRUNCATE,
 } VacErrPhase;
 
+/*
+ * Relation statistics collected during heap scanning and need to be shared among
+ * parallel vacuum workers.
+ */
+typedef struct LVRelCounters
+{
+	BlockNumber scanned_pages;	/* # pages examined (not skipped via VM) */
+	BlockNumber removed_pages;	/* # pages removed by relation truncation */
+	BlockNumber frozen_pages;	/* # pages with newly frozen tuples */
+	BlockNumber lpdead_item_pages;	/* # pages with LP_DEAD items */
+	BlockNumber missed_dead_pages;	/* # pages with missed dead tuples */
+	BlockNumber nonempty_pages; /* actually, last nonempty page + 1 */
+
+	/* Counters that follow are only for scanned_pages */
+	int64		tuples_deleted; /* # deleted from table */
+	int64		tuples_frozen;	/* # newly frozen */
+	int64		lpdead_items;	/* # deleted from indexes */
+	int64		live_tuples;	/* # live tuples remaining */
+	int64		recently_dead_tuples;	/* # dead, but not yet removable */
+	int64		missed_dead_tuples; /* # removable, but not removed */
+
+	/* Tracks oldest extant XID/MXID for setting relfrozenxid/relminmxid. */
+	TransactionId NewRelfrozenXid;
+	MultiXactId NewRelminMxid;
+	bool		skippedallvis;
+}			LVRelCounters;
+
+/*
+ * Struct for information that need to be shared among parallel vacuum workers
+ */
+typedef struct PHVShared
+{
+	bool		aggressive;
+	bool		skipwithvm;
+
+	/* The initial values shared by the leader process */
+	TransactionId NewRelfrozenXid;
+	MultiXactId NewRelminMxid;
+	bool		skippedallvis;
+
+	/* VACUUM operation's cutoffs for freezing and pruning */
+	struct VacuumCutoffs cutoffs;
+	GlobalVisState vistest;
+
+	LVRelCounters worker_relcnts[FLEXIBLE_ARRAY_MEMBER];
+}			PHVShared;
+#define SizeOfPHVShared (offsetof(PHVShared, worker_relcnts))
+
+/* Per-worker scan state */
+typedef struct PHVScanWorkerState
+{
+	ParallelBlockTableScanWorkerData state;
+	bool		maybe_have_blocks;
+}			PHVScanWorkerState;
+
+/* Struct for parallel heap vacuum */
+typedef struct PHVState
+{
+	/* Parallel scan description shared among parallel workers */
+	ParallelBlockTableScanDesc pscandesc;
+
+	/* Shared information */
+	PHVShared  *shared;
+
+	/* Per-worker scan state */
+	PHVScanWorkerState *myscanstate;
+
+	/* Points to all per-worker scan state array */
+	PHVScanWorkerState *scanstates;
+
+	/* The number of workers launched for parallel heap vacuum */
+	int			nworkers_launched;
+}			PHVState;
+
 typedef struct LVRelState
 {
 	/* Target heap relation and its indexes */
@@ -144,6 +231,12 @@ typedef struct LVRelState
 	BufferAccessStrategy bstrategy;
 	ParallelVacuumState *pvs;
 
+	/* Parallel heap vacuum state and sizes for each struct */
+	PHVState   *phvstate;
+	Size		pscan_len;
+	Size		shared_len;
+	Size		pscanwork_len;
+
 	/* Aggressive VACUUM? (must set relfrozenxid >= FreezeLimit) */
 	bool		aggressive;
 	/* Use visibility map to skip? (disabled by DISABLE_PAGE_SKIPPING) */
@@ -159,10 +252,6 @@ typedef struct LVRelState
 	/* VACUUM operation's cutoffs for freezing and pruning */
 	struct VacuumCutoffs cutoffs;
 	GlobalVisState *vistest;
-	/* Tracks oldest extant XID/MXID for setting relfrozenxid/relminmxid */
-	TransactionId NewRelfrozenXid;
-	MultiXactId NewRelminMxid;
-	bool		skippedallvis;
 
 	/* Error reporting state */
 	char	   *dbname;
@@ -188,12 +277,10 @@ typedef struct LVRelState
 	VacDeadItemsInfo *dead_items_info;
 
 	BlockNumber rel_pages;		/* total number of pages */
-	BlockNumber scanned_pages;	/* # pages examined (not skipped via VM) */
-	BlockNumber removed_pages;	/* # pages removed by relation truncation */
-	BlockNumber frozen_pages;	/* # pages with newly frozen tuples */
-	BlockNumber lpdead_item_pages;	/* # pages with LP_DEAD items */
-	BlockNumber missed_dead_pages;	/* # pages with missed dead tuples */
-	BlockNumber nonempty_pages; /* actually, last nonempty page + 1 */
+	BlockNumber next_fsm_block_to_vacuum;
+
+	/* Block and tuple counters for the relation */
+	LVRelCounters *counters;
 
 	/* Statistics output by us, for table */
 	double		new_rel_tuples; /* new estimated total # of tuples */
@@ -203,13 +290,6 @@ typedef struct LVRelState
 
 	/* Instrumentation counters */
 	int			num_index_scans;
-	/* Counters that follow are only for scanned_pages */
-	int64		tuples_deleted; /* # deleted from table */
-	int64		tuples_frozen;	/* # newly frozen */
-	int64		lpdead_items;	/* # deleted from indexes */
-	int64		live_tuples;	/* # live tuples remaining */
-	int64		recently_dead_tuples;	/* # dead, but not yet removable */
-	int64		missed_dead_tuples; /* # removable, but not removed */
 
 	/* State maintained by heap_vac_scan_next_block() */
 	BlockNumber current_block;	/* last block returned */
@@ -229,6 +309,7 @@ typedef struct LVSavedErrInfo
 
 /* non-export function prototypes */
 static void lazy_scan_heap(LVRelState *vacrel);
+static bool do_lazy_scan_heap(LVRelState *vacrel);
 static bool heap_vac_scan_next_block(LVRelState *vacrel, BlockNumber *blkno,
 									 bool *all_visible_according_to_vm);
 static void find_next_unskippable_block(LVRelState *vacrel, bool *skipsallvis);
@@ -271,6 +352,12 @@ static void dead_items_cleanup(LVRelState *vacrel);
 static bool heap_page_is_all_visible(LVRelState *vacrel, Buffer buf,
 									 TransactionId *visibility_cutoff_xid, bool *all_frozen);
 static void update_relstats_all_indexes(LVRelState *vacrel);
+
+
+static void do_parallel_lazy_scan_heap(LVRelState *vacrel);
+static void parallel_heap_vacuum_gather_scan_stats(LVRelState *vacrel);
+static void parallel_heap_complete_unfinised_scan(LVRelState *vacrel);
+
 static void vacuum_error_callback(void *arg);
 static void update_vacuum_error_info(LVRelState *vacrel,
 									 LVSavedErrInfo *saved_vacrel,
@@ -296,6 +383,7 @@ heap_vacuum_rel(Relation rel, VacuumParams *params,
 				BufferAccessStrategy bstrategy)
 {
 	LVRelState *vacrel;
+	LVRelCounters *counters;
 	bool		verbose,
 				instrument,
 				skipwithvm,
@@ -406,14 +494,28 @@ heap_vacuum_rel(Relation rel, VacuumParams *params,
 		Assert(params->index_cleanup == VACOPTVALUE_AUTO);
 	}
 
+	vacrel->next_fsm_block_to_vacuum = 0;
+
 	/* Initialize page counters explicitly (be tidy) */
-	vacrel->scanned_pages = 0;
-	vacrel->removed_pages = 0;
-	vacrel->frozen_pages = 0;
-	vacrel->lpdead_item_pages = 0;
-	vacrel->missed_dead_pages = 0;
-	vacrel->nonempty_pages = 0;
-	/* dead_items_alloc allocates vacrel->dead_items later on */
+	counters = palloc(sizeof(LVRelCounters));
+	counters->scanned_pages = 0;
+	counters->removed_pages = 0;
+	counters->frozen_pages = 0;
+	counters->lpdead_item_pages = 0;
+	counters->missed_dead_pages = 0;
+	counters->nonempty_pages = 0;
+
+	/* Initialize remaining counters (be tidy) */
+	counters->tuples_deleted = 0;
+	counters->tuples_frozen = 0;
+	counters->lpdead_items = 0;
+	counters->live_tuples = 0;
+	counters->recently_dead_tuples = 0;
+	counters->missed_dead_tuples = 0;
+
+	vacrel->counters = counters;
+
+	vacrel->num_index_scans = 0;
 
 	/* Allocate/initialize output statistics state */
 	vacrel->new_rel_tuples = 0;
@@ -421,14 +523,7 @@ heap_vacuum_rel(Relation rel, VacuumParams *params,
 	vacrel->indstats = (IndexBulkDeleteResult **)
 		palloc0(vacrel->nindexes * sizeof(IndexBulkDeleteResult *));
 
-	/* Initialize remaining counters (be tidy) */
-	vacrel->num_index_scans = 0;
-	vacrel->tuples_deleted = 0;
-	vacrel->tuples_frozen = 0;
-	vacrel->lpdead_items = 0;
-	vacrel->live_tuples = 0;
-	vacrel->recently_dead_tuples = 0;
-	vacrel->missed_dead_tuples = 0;
+	/* dead_items_alloc allocates vacrel->dead_items later on */
 
 	/*
 	 * Get cutoffs that determine which deleted tuples are considered DEAD,
@@ -450,9 +545,9 @@ heap_vacuum_rel(Relation rel, VacuumParams *params,
 	vacrel->rel_pages = orig_rel_pages = RelationGetNumberOfBlocks(rel);
 	vacrel->vistest = GlobalVisTestFor(rel);
 	/* Initialize state used to track oldest extant XID/MXID */
-	vacrel->NewRelfrozenXid = vacrel->cutoffs.OldestXmin;
-	vacrel->NewRelminMxid = vacrel->cutoffs.OldestMxact;
-	vacrel->skippedallvis = false;
+	vacrel->counters->NewRelfrozenXid = vacrel->cutoffs.OldestXmin;
+	vacrel->counters->NewRelminMxid = vacrel->cutoffs.OldestMxact;
+	vacrel->counters->skippedallvis = false;
 	skipwithvm = true;
 	if (params->options & VACOPT_DISABLE_PAGE_SKIPPING)
 	{
@@ -533,15 +628,15 @@ heap_vacuum_rel(Relation rel, VacuumParams *params,
 	 * value >= FreezeLimit, and relminmxid to a value >= MultiXactCutoff.
 	 * Non-aggressive VACUUMs may advance them by any amount, or not at all.
 	 */
-	Assert(vacrel->NewRelfrozenXid == vacrel->cutoffs.OldestXmin ||
+	Assert(vacrel->counters->NewRelfrozenXid == vacrel->cutoffs.OldestXmin ||
 		   TransactionIdPrecedesOrEquals(vacrel->aggressive ? vacrel->cutoffs.FreezeLimit :
 										 vacrel->cutoffs.relfrozenxid,
-										 vacrel->NewRelfrozenXid));
-	Assert(vacrel->NewRelminMxid == vacrel->cutoffs.OldestMxact ||
+										 vacrel->counters->NewRelfrozenXid));
+	Assert(vacrel->counters->NewRelminMxid == vacrel->cutoffs.OldestMxact ||
 		   MultiXactIdPrecedesOrEquals(vacrel->aggressive ? vacrel->cutoffs.MultiXactCutoff :
 									   vacrel->cutoffs.relminmxid,
-									   vacrel->NewRelminMxid));
-	if (vacrel->skippedallvis)
+									   vacrel->counters->NewRelminMxid));
+	if (vacrel->counters->skippedallvis)
 	{
 		/*
 		 * Must keep original relfrozenxid in a non-aggressive VACUUM that
@@ -549,8 +644,8 @@ heap_vacuum_rel(Relation rel, VacuumParams *params,
 		 * values will have missed unfrozen XIDs from the pages we skipped.
 		 */
 		Assert(!vacrel->aggressive);
-		vacrel->NewRelfrozenXid = InvalidTransactionId;
-		vacrel->NewRelminMxid = InvalidMultiXactId;
+		vacrel->counters->NewRelfrozenXid = InvalidTransactionId;
+		vacrel->counters->NewRelminMxid = InvalidMultiXactId;
 	}
 
 	/*
@@ -571,7 +666,7 @@ heap_vacuum_rel(Relation rel, VacuumParams *params,
 	 */
 	vac_update_relstats(rel, new_rel_pages, vacrel->new_live_tuples,
 						new_rel_allvisible, vacrel->nindexes > 0,
-						vacrel->NewRelfrozenXid, vacrel->NewRelminMxid,
+						vacrel->counters->NewRelfrozenXid, vacrel->counters->NewRelminMxid,
 						&frozenxid_updated, &minmulti_updated, false);
 
 	/*
@@ -587,8 +682,8 @@ heap_vacuum_rel(Relation rel, VacuumParams *params,
 	pgstat_report_vacuum(RelationGetRelid(rel),
 						 rel->rd_rel->relisshared,
 						 Max(vacrel->new_live_tuples, 0),
-						 vacrel->recently_dead_tuples +
-						 vacrel->missed_dead_tuples);
+						 vacrel->counters->recently_dead_tuples +
+						 vacrel->counters->missed_dead_tuples);
 	pgstat_progress_end_command();
 
 	if (instrument)
@@ -651,21 +746,21 @@ heap_vacuum_rel(Relation rel, VacuumParams *params,
 							 vacrel->relname,
 							 vacrel->num_index_scans);
 			appendStringInfo(&buf, _("pages: %u removed, %u remain, %u scanned (%.2f%% of total)\n"),
-							 vacrel->removed_pages,
+							 vacrel->counters->removed_pages,
 							 new_rel_pages,
-							 vacrel->scanned_pages,
+							 vacrel->counters->scanned_pages,
 							 orig_rel_pages == 0 ? 100.0 :
-							 100.0 * vacrel->scanned_pages / orig_rel_pages);
+							 100.0 * vacrel->counters->scanned_pages / orig_rel_pages);
 			appendStringInfo(&buf,
 							 _("tuples: %lld removed, %lld remain, %lld are dead but not yet removable\n"),
-							 (long long) vacrel->tuples_deleted,
+							 (long long) vacrel->counters->tuples_deleted,
 							 (long long) vacrel->new_rel_tuples,
-							 (long long) vacrel->recently_dead_tuples);
-			if (vacrel->missed_dead_tuples > 0)
+							 (long long) vacrel->counters->recently_dead_tuples);
+			if (vacrel->counters->missed_dead_tuples > 0)
 				appendStringInfo(&buf,
 								 _("tuples missed: %lld dead from %u pages not removed due to cleanup lock contention\n"),
-								 (long long) vacrel->missed_dead_tuples,
-								 vacrel->missed_dead_pages);
+								 (long long) vacrel->counters->missed_dead_tuples,
+								 vacrel->counters->missed_dead_pages);
 			diff = (int32) (ReadNextTransactionId() -
 							vacrel->cutoffs.OldestXmin);
 			appendStringInfo(&buf,
@@ -673,25 +768,25 @@ heap_vacuum_rel(Relation rel, VacuumParams *params,
 							 vacrel->cutoffs.OldestXmin, diff);
 			if (frozenxid_updated)
 			{
-				diff = (int32) (vacrel->NewRelfrozenXid -
+				diff = (int32) (vacrel->counters->NewRelfrozenXid -
 								vacrel->cutoffs.relfrozenxid);
 				appendStringInfo(&buf,
 								 _("new relfrozenxid: %u, which is %d XIDs ahead of previous value\n"),
-								 vacrel->NewRelfrozenXid, diff);
+								 vacrel->counters->NewRelfrozenXid, diff);
 			}
 			if (minmulti_updated)
 			{
-				diff = (int32) (vacrel->NewRelminMxid -
+				diff = (int32) (vacrel->counters->NewRelminMxid -
 								vacrel->cutoffs.relminmxid);
 				appendStringInfo(&buf,
 								 _("new relminmxid: %u, which is %d MXIDs ahead of previous value\n"),
-								 vacrel->NewRelminMxid, diff);
+								 vacrel->counters->NewRelminMxid, diff);
 			}
 			appendStringInfo(&buf, _("frozen: %u pages from table (%.2f%% of total) had %lld tuples frozen\n"),
-							 vacrel->frozen_pages,
+							 vacrel->counters->frozen_pages,
 							 orig_rel_pages == 0 ? 100.0 :
-							 100.0 * vacrel->frozen_pages / orig_rel_pages,
-							 (long long) vacrel->tuples_frozen);
+							 100.0 * vacrel->counters->frozen_pages / orig_rel_pages,
+							 (long long) vacrel->counters->tuples_frozen);
 			if (vacrel->do_index_vacuuming)
 			{
 				if (vacrel->nindexes == 0 || vacrel->num_index_scans == 0)
@@ -711,10 +806,10 @@ heap_vacuum_rel(Relation rel, VacuumParams *params,
 				msgfmt = _("%u pages from table (%.2f%% of total) have %lld dead item identifiers\n");
 			}
 			appendStringInfo(&buf, msgfmt,
-							 vacrel->lpdead_item_pages,
+							 vacrel->counters->lpdead_item_pages,
 							 orig_rel_pages == 0 ? 100.0 :
-							 100.0 * vacrel->lpdead_item_pages / orig_rel_pages,
-							 (long long) vacrel->lpdead_items);
+							 100.0 * vacrel->counters->lpdead_item_pages / orig_rel_pages,
+							 (long long) vacrel->counters->lpdead_items);
 			for (int i = 0; i < vacrel->nindexes; i++)
 			{
 				IndexBulkDeleteResult *istat = vacrel->indstats[i];
@@ -815,14 +910,8 @@ heap_vacuum_rel(Relation rel, VacuumParams *params,
 static void
 lazy_scan_heap(LVRelState *vacrel)
 {
-	BlockNumber rel_pages = vacrel->rel_pages,
-				blkno,
-				next_fsm_block_to_vacuum = 0;
-	bool		all_visible_according_to_vm;
-
-	TidStore   *dead_items = vacrel->dead_items;
+	BlockNumber rel_pages = vacrel->rel_pages;
 	VacDeadItemsInfo *dead_items_info = vacrel->dead_items_info;
-	Buffer		vmbuffer = InvalidBuffer;
 	const int	initprog_index[] = {
 		PROGRESS_VACUUM_PHASE,
 		PROGRESS_VACUUM_TOTAL_HEAP_BLKS,
@@ -842,6 +931,70 @@ lazy_scan_heap(LVRelState *vacrel)
 	vacrel->next_unskippable_allvis = false;
 	vacrel->next_unskippable_vmbuffer = InvalidBuffer;
 
+	if (ParallelHeapVacuumIsActive(vacrel))
+		do_parallel_lazy_scan_heap(vacrel);
+	else
+		do_lazy_scan_heap(vacrel);
+
+	vacrel->blkno = InvalidBlockNumber;
+
+	/* report that everything is now scanned */
+	pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_SCANNED, rel_pages);
+
+	/* now we can compute the new value for pg_class.reltuples */
+	vacrel->new_live_tuples = vac_estimate_reltuples(vacrel->rel, rel_pages,
+													 vacrel->counters->scanned_pages,
+													 vacrel->counters->live_tuples);
+
+	/*
+	 * Also compute the total number of surviving heap entries.  In the
+	 * (unlikely) scenario that new_live_tuples is -1, take it as zero.
+	 */
+	vacrel->new_rel_tuples =
+		Max(vacrel->new_live_tuples, 0) + vacrel->counters->recently_dead_tuples +
+		vacrel->counters->missed_dead_tuples;
+
+	/*
+	 * Do index vacuuming (call each index's ambulkdelete routine), then do
+	 * related heap vacuuming
+	 */
+	if (dead_items_info->num_items > 0)
+		lazy_vacuum(vacrel);
+
+	/*
+	 * Vacuum the remainder of the Free Space Map.  We must do this whether or
+	 * not there were indexes, and whether or not we bypassed index vacuuming.
+	 */
+	if (rel_pages > vacrel->next_fsm_block_to_vacuum)
+		FreeSpaceMapVacuumRange(vacrel->rel, vacrel->next_fsm_block_to_vacuum,
+								rel_pages);
+
+	/* report all blocks vacuumed */
+	pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_VACUUMED, rel_pages);
+
+	/* Do final index cleanup (call each index's amvacuumcleanup routine) */
+	if (vacrel->nindexes > 0 && vacrel->do_index_cleanup)
+		lazy_cleanup_all_indexes(vacrel);
+}
+
+/*
+ * Workhorse for lazy_scan_heap().
+ *
+ * Return true if we processed all blocks, otherwise false if we exit from this function
+ * while not completing the heap scan due to full of dead item TIDs. In serial heap scan
+ * case, this function always returns true. In parallel heap vacuum scan, this function
+ * is called by both worker processes and the leader process, and could return false.
+ */
+static bool
+do_lazy_scan_heap(LVRelState *vacrel)
+{
+	bool		all_visible_according_to_vm;
+	TidStore   *dead_items = vacrel->dead_items;
+	VacDeadItemsInfo *dead_items_info = vacrel->dead_items_info;
+	BlockNumber blkno;
+	Buffer		vmbuffer = InvalidBuffer;
+	bool		scan_done = true;
+
 	while (heap_vac_scan_next_block(vacrel, &blkno, &all_visible_according_to_vm))
 	{
 		Buffer		buf;
@@ -849,7 +1002,7 @@ lazy_scan_heap(LVRelState *vacrel)
 		bool		has_lpdead_items;
 		bool		got_cleanup_lock = false;
 
-		vacrel->scanned_pages++;
+		vacrel->counters->scanned_pages++;
 
 		/* Report as block scanned, update error traceback information */
 		pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_SCANNED, blkno);
@@ -867,46 +1020,10 @@ lazy_scan_heap(LVRelState *vacrel)
 		 * one-pass strategy, and the two-pass strategy with the index_cleanup
 		 * param set to 'off'.
 		 */
-		if (vacrel->scanned_pages % FAILSAFE_EVERY_PAGES == 0)
+		if (!IsParallelWorker() &&
+			vacrel->counters->scanned_pages % FAILSAFE_EVERY_PAGES == 0)
 			lazy_check_wraparound_failsafe(vacrel);
 
-		/*
-		 * Consider if we definitely have enough space to process TIDs on page
-		 * already.  If we are close to overrunning the available space for
-		 * dead_items TIDs, pause and do a cycle of vacuuming before we tackle
-		 * this page.
-		 */
-		if (TidStoreMemoryUsage(dead_items) > dead_items_info->max_bytes)
-		{
-			/*
-			 * Before beginning index vacuuming, we release any pin we may
-			 * hold on the visibility map page.  This isn't necessary for
-			 * correctness, but we do it anyway to avoid holding the pin
-			 * across a lengthy, unrelated operation.
-			 */
-			if (BufferIsValid(vmbuffer))
-			{
-				ReleaseBuffer(vmbuffer);
-				vmbuffer = InvalidBuffer;
-			}
-
-			/* Perform a round of index and heap vacuuming */
-			vacrel->consider_bypass_optimization = false;
-			lazy_vacuum(vacrel);
-
-			/*
-			 * Vacuum the Free Space Map to make newly-freed space visible on
-			 * upper-level FSM pages.  Note we have not yet processed blkno.
-			 */
-			FreeSpaceMapVacuumRange(vacrel->rel, next_fsm_block_to_vacuum,
-									blkno);
-			next_fsm_block_to_vacuum = blkno;
-
-			/* Report that we are once again scanning the heap */
-			pgstat_progress_update_param(PROGRESS_VACUUM_PHASE,
-										 PROGRESS_VACUUM_PHASE_SCAN_HEAP);
-		}
-
 		/*
 		 * Pin the visibility map page in case we need to mark the page
 		 * all-visible.  In most cases this will be very cheap, because we'll
@@ -994,10 +1111,14 @@ lazy_scan_heap(LVRelState *vacrel)
 		 * also be no opportunity to update the FSM later, because we'll never
 		 * revisit this page. Since updating the FSM is desirable but not
 		 * absolutely required, that's OK.
+		 *
+		 * XXX: in parallel heap scan, some blocks before blkno might not been
+		 * processed yet. Is it worth vacuuming FSM?
 		 */
-		if (vacrel->nindexes == 0
-			|| !vacrel->do_index_vacuuming
-			|| !has_lpdead_items)
+		if (!IsParallelWorker() &&
+			(vacrel->nindexes == 0
+			 || !vacrel->do_index_vacuuming
+			 || !has_lpdead_items))
 		{
 			Size		freespace = PageGetHeapFreeSpace(page);
 
@@ -1011,57 +1132,154 @@ lazy_scan_heap(LVRelState *vacrel)
 			 * held the cleanup lock and lazy_scan_prune() was called.
 			 */
 			if (got_cleanup_lock && vacrel->nindexes == 0 && has_lpdead_items &&
-				blkno - next_fsm_block_to_vacuum >= VACUUM_FSM_EVERY_PAGES)
+				blkno - vacrel->next_fsm_block_to_vacuum >= VACUUM_FSM_EVERY_PAGES)
 			{
-				FreeSpaceMapVacuumRange(vacrel->rel, next_fsm_block_to_vacuum,
+				FreeSpaceMapVacuumRange(vacrel->rel, vacrel->next_fsm_block_to_vacuum,
 										blkno);
-				next_fsm_block_to_vacuum = blkno;
+				vacrel->next_fsm_block_to_vacuum = blkno;
 			}
 		}
 		else
 			UnlockReleaseBuffer(buf);
+
+		/*
+		 * Consider if we definitely have enough space to process TIDs on page
+		 * already.  If we are close to overrunning the available space for
+		 * dead_items TIDs, pause and do a cycle of vacuuming before we tackle
+		 * this page.
+		 */
+		if (TidStoreMemoryUsage(dead_items) > dead_items_info->max_bytes)
+		{
+			/*
+			 * Before beginning index vacuuming, we release any pin we may
+			 * hold on the visibility map page.  This isn't necessary for
+			 * correctness, but we do it anyway to avoid holding the pin
+			 * across a lengthy, unrelated operation.
+			 */
+			if (BufferIsValid(vmbuffer))
+			{
+				ReleaseBuffer(vmbuffer);
+				vmbuffer = InvalidBuffer;
+			}
+
+			if (ParallelHeapVacuumIsActive(vacrel))
+			{
+				/*
+				 * In parallel heap vacuum case, both the leader process and
+				 * the worker processes have to exit without invoking index
+				 * and heap vacuuming. The leader process will wait for all
+				 * workers to finish and perform index and heap vacuuming.
+				 */
+				scan_done = false;
+				break;
+			}
+
+			/* Perform a round of index and heap vacuuming */
+			vacrel->consider_bypass_optimization = false;
+			lazy_vacuum(vacrel);
+
+			/*
+			 * Vacuum the Free Space Map to make newly-freed space visible on
+			 * upper-level FSM pages.
+			 *
+			 * XXX: in parallel heap scan, some blocks before blkno might not
+			 * been processed yet. Is it worth vacuuming FSM?
+			 */
+			FreeSpaceMapVacuumRange(vacrel->rel, vacrel->next_fsm_block_to_vacuum,
+									blkno + 1);
+			vacrel->next_fsm_block_to_vacuum = blkno;
+
+			/* Report that we are once again scanning the heap */
+			pgstat_progress_update_param(PROGRESS_VACUUM_PHASE,
+										 PROGRESS_VACUUM_PHASE_SCAN_HEAP);
+
+			continue;
+		}
 	}
 
-	vacrel->blkno = InvalidBlockNumber;
 	if (BufferIsValid(vmbuffer))
 		ReleaseBuffer(vmbuffer);
 
-	/* report that everything is now scanned */
-	pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_SCANNED, blkno);
+	return scan_done;
+}
 
-	/* now we can compute the new value for pg_class.reltuples */
-	vacrel->new_live_tuples = vac_estimate_reltuples(vacrel->rel, rel_pages,
-													 vacrel->scanned_pages,
-													 vacrel->live_tuples);
+/*
+ * A parallel scan variant of heap_vac_scan_next_block.
+ *
+ * In parallel vacuum scan, we don't use the SKIP_PAGES_THRESHOLD optimization.
+ */
+static bool
+heap_vac_scan_next_block_parallel(LVRelState *vacrel, BlockNumber *blkno,
+								  bool *all_visible_according_to_vm)
+{
+	PHVState   *phvstate = vacrel->phvstate;
+	BlockNumber next_block;
+	Buffer		vmbuffer = InvalidBuffer;
+	uint8		mapbits = 0;
 
-	/*
-	 * Also compute the total number of surviving heap entries.  In the
-	 * (unlikely) scenario that new_live_tuples is -1, take it as zero.
-	 */
-	vacrel->new_rel_tuples =
-		Max(vacrel->new_live_tuples, 0) + vacrel->recently_dead_tuples +
-		vacrel->missed_dead_tuples;
+	Assert(ParallelHeapVacuumIsActive(vacrel));
 
-	/*
-	 * Do index vacuuming (call each index's ambulkdelete routine), then do
-	 * related heap vacuuming
-	 */
-	if (dead_items_info->num_items > 0)
-		lazy_vacuum(vacrel);
+	for (;;)
+	{
+		next_block = table_block_parallelscan_nextpage(vacrel->rel,
+													   &(phvstate->myscanstate->state),
+													   phvstate->pscandesc);
 
-	/*
-	 * Vacuum the remainder of the Free Space Map.  We must do this whether or
-	 * not there were indexes, and whether or not we bypassed index vacuuming.
-	 */
-	if (blkno > next_fsm_block_to_vacuum)
-		FreeSpaceMapVacuumRange(vacrel->rel, next_fsm_block_to_vacuum, blkno);
+		/* Have we reached the end of the table? */
+		if (!BlockNumberIsValid(next_block) || next_block >= vacrel->rel_pages)
+		{
+			if (BufferIsValid(vmbuffer))
+				ReleaseBuffer(vmbuffer);
 
-	/* report all blocks vacuumed */
-	pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_VACUUMED, blkno);
+			*blkno = vacrel->rel_pages;
+			return false;
+		}
 
-	/* Do final index cleanup (call each index's amvacuumcleanup routine) */
-	if (vacrel->nindexes > 0 && vacrel->do_index_cleanup)
-		lazy_cleanup_all_indexes(vacrel);
+		/* We always treat the last block as unsafe to skip */
+		if (next_block == vacrel->rel_pages - 1)
+			break;
+
+		mapbits = visibilitymap_get_status(vacrel->rel, next_block, &vmbuffer);
+
+		/*
+		 * A block is unskippable if it is not all visible according to the
+		 * visibility map.
+		 */
+		if ((mapbits & VISIBILITYMAP_ALL_VISIBLE) == 0)
+		{
+			Assert((mapbits & VISIBILITYMAP_ALL_FROZEN) == 0);
+			break;
+		}
+
+		/* DISABLE_PAGE_SKIPPING makes all skipping unsafe */
+		if (!vacrel->skipwithvm)
+			break;
+
+		/*
+		 * Aggressive VACUUM caller can't skip pages just because they are
+		 * all-visible.
+		 */
+		if ((mapbits & VISIBILITYMAP_ALL_FROZEN) == 0)
+		{
+
+			if (vacrel->aggressive)
+				break;
+
+			/*
+			 * All-visible block is safe to skip in non-aggressive case. But
+			 * remember that the final range contains such a block for later.
+			 */
+			vacrel->counters->skippedallvis = true;
+		}
+	}
+
+	if (BufferIsValid(vmbuffer))
+		ReleaseBuffer(vmbuffer);
+
+	*blkno = next_block;
+	*all_visible_according_to_vm = (mapbits & VISIBILITYMAP_ALL_VISIBLE) != 0;
+
+	return true;
 }
 
 /*
@@ -1088,6 +1306,9 @@ heap_vac_scan_next_block(LVRelState *vacrel, BlockNumber *blkno,
 {
 	BlockNumber next_block;
 
+	if (ParallelHeapVacuumIsActive(vacrel))
+		return heap_vac_scan_next_block_parallel(vacrel, blkno, all_visible_according_to_vm);
+
 	/* relies on InvalidBlockNumber + 1 overflowing to 0 on first call */
 	next_block = vacrel->current_block + 1;
 
@@ -1137,7 +1358,7 @@ heap_vac_scan_next_block(LVRelState *vacrel, BlockNumber *blkno,
 		{
 			next_block = vacrel->next_unskippable_block;
 			if (skipsallvis)
-				vacrel->skippedallvis = true;
+				vacrel->counters->skippedallvis = true;
 		}
 	}
 
@@ -1210,11 +1431,12 @@ find_next_unskippable_block(LVRelState *vacrel, bool *skipsallvis)
 
 		/*
 		 * Caller must scan the last page to determine whether it has tuples
-		 * (caller must have the opportunity to set vacrel->nonempty_pages).
-		 * This rule avoids having lazy_truncate_heap() take access-exclusive
-		 * lock on rel to attempt a truncation that fails anyway, just because
-		 * there are tuples on the last page (it is likely that there will be
-		 * tuples on other nearby pages as well, but those can be skipped).
+		 * (caller must have the opportunity to set
+		 * vacrel->counters->nonempty_pages). This rule avoids having
+		 * lazy_truncate_heap() take access-exclusive lock on rel to attempt a
+		 * truncation that fails anyway, just because there are tuples on the
+		 * last page (it is likely that there will be tuples on other nearby
+		 * pages as well, but those can be skipped).
 		 *
 		 * Implement this by always treating the last block as unsafe to skip.
 		 */
@@ -1439,10 +1661,10 @@ lazy_scan_prune(LVRelState *vacrel,
 	heap_page_prune_and_freeze(rel, buf, vacrel->vistest, prune_options,
 							   &vacrel->cutoffs, &presult, PRUNE_VACUUM_SCAN,
 							   &vacrel->offnum,
-							   &vacrel->NewRelfrozenXid, &vacrel->NewRelminMxid);
+							   &vacrel->counters->NewRelfrozenXid, &vacrel->counters->NewRelminMxid);
 
 	Assert(MultiXactIdIsValid(vacrel->NewRelminMxid));
-	Assert(TransactionIdIsValid(vacrel->NewRelfrozenXid));
+	Assert(TransactionIdIsValid(vacrel->counters->NewRelfrozenXid));
 
 	if (presult.nfrozen > 0)
 	{
@@ -1451,7 +1673,7 @@ lazy_scan_prune(LVRelState *vacrel,
 		 * nfrozen == 0, since it only counts pages with newly frozen tuples
 		 * (don't confuse that with pages newly set all-frozen in VM).
 		 */
-		vacrel->frozen_pages++;
+		vacrel->counters->frozen_pages++;
 	}
 
 	/*
@@ -1486,7 +1708,7 @@ lazy_scan_prune(LVRelState *vacrel,
 	 */
 	if (presult.lpdead_items > 0)
 	{
-		vacrel->lpdead_item_pages++;
+		vacrel->counters->lpdead_item_pages++;
 
 		/*
 		 * deadoffsets are collected incrementally in
@@ -1501,15 +1723,15 @@ lazy_scan_prune(LVRelState *vacrel,
 	}
 
 	/* Finally, add page-local counts to whole-VACUUM counts */
-	vacrel->tuples_deleted += presult.ndeleted;
-	vacrel->tuples_frozen += presult.nfrozen;
-	vacrel->lpdead_items += presult.lpdead_items;
-	vacrel->live_tuples += presult.live_tuples;
-	vacrel->recently_dead_tuples += presult.recently_dead_tuples;
+	vacrel->counters->tuples_deleted += presult.ndeleted;
+	vacrel->counters->tuples_frozen += presult.nfrozen;
+	vacrel->counters->lpdead_items += presult.lpdead_items;
+	vacrel->counters->live_tuples += presult.live_tuples;
+	vacrel->counters->recently_dead_tuples += presult.recently_dead_tuples;
 
 	/* Can't truncate this page */
 	if (presult.hastup)
-		vacrel->nonempty_pages = blkno + 1;
+		vacrel->counters->nonempty_pages = blkno + 1;
 
 	/* Did we find LP_DEAD items? */
 	*has_lpdead_items = (presult.lpdead_items > 0);
@@ -1659,8 +1881,8 @@ lazy_scan_noprune(LVRelState *vacrel,
 				missed_dead_tuples;
 	bool		hastup;
 	HeapTupleHeader tupleheader;
-	TransactionId NoFreezePageRelfrozenXid = vacrel->NewRelfrozenXid;
-	MultiXactId NoFreezePageRelminMxid = vacrel->NewRelminMxid;
+	TransactionId NoFreezePageRelfrozenXid = vacrel->counters->NewRelfrozenXid;
+	MultiXactId NoFreezePageRelminMxid = vacrel->counters->NewRelminMxid;
 	OffsetNumber deadoffsets[MaxHeapTuplesPerPage];
 
 	Assert(BufferGetBlockNumber(buf) == blkno);
@@ -1787,8 +2009,8 @@ lazy_scan_noprune(LVRelState *vacrel,
 	 * this particular page until the next VACUUM.  Remember its details now.
 	 * (lazy_scan_prune expects a clean slate, so we have to do this last.)
 	 */
-	vacrel->NewRelfrozenXid = NoFreezePageRelfrozenXid;
-	vacrel->NewRelminMxid = NoFreezePageRelminMxid;
+	vacrel->counters->NewRelfrozenXid = NoFreezePageRelfrozenXid;
+	vacrel->counters->NewRelminMxid = NoFreezePageRelminMxid;
 
 	/* Save any LP_DEAD items found on the page in dead_items */
 	if (vacrel->nindexes == 0)
@@ -1815,25 +2037,25 @@ lazy_scan_noprune(LVRelState *vacrel,
 		 * indexes will be deleted during index vacuuming (and then marked
 		 * LP_UNUSED in the heap)
 		 */
-		vacrel->lpdead_item_pages++;
+		vacrel->counters->lpdead_item_pages++;
 
 		dead_items_add(vacrel, blkno, deadoffsets, lpdead_items);
 
-		vacrel->lpdead_items += lpdead_items;
+		vacrel->counters->lpdead_items += lpdead_items;
 	}
 
 	/*
 	 * Finally, add relevant page-local counts to whole-VACUUM counts
 	 */
-	vacrel->live_tuples += live_tuples;
-	vacrel->recently_dead_tuples += recently_dead_tuples;
-	vacrel->missed_dead_tuples += missed_dead_tuples;
+	vacrel->counters->live_tuples += live_tuples;
+	vacrel->counters->recently_dead_tuples += recently_dead_tuples;
+	vacrel->counters->missed_dead_tuples += missed_dead_tuples;
 	if (missed_dead_tuples > 0)
-		vacrel->missed_dead_pages++;
+		vacrel->counters->missed_dead_pages++;
 
 	/* Can't truncate this page */
 	if (hastup)
-		vacrel->nonempty_pages = blkno + 1;
+		vacrel->counters->nonempty_pages = blkno + 1;
 
 	/* Did we find LP_DEAD items? */
 	*has_lpdead_items = (lpdead_items > 0);
@@ -1862,7 +2084,7 @@ lazy_vacuum(LVRelState *vacrel)
 
 	/* Should not end up here with no indexes */
 	Assert(vacrel->nindexes > 0);
-	Assert(vacrel->lpdead_item_pages > 0);
+	Assert(vacrel->counters->lpdead_item_pages > 0);
 
 	if (!vacrel->do_index_vacuuming)
 	{
@@ -1896,7 +2118,7 @@ lazy_vacuum(LVRelState *vacrel)
 		BlockNumber threshold;
 
 		Assert(vacrel->num_index_scans == 0);
-		Assert(vacrel->lpdead_items == vacrel->dead_items_info->num_items);
+		Assert(vacrel->counters->lpdead_items == vacrel->dead_items_info->num_items);
 		Assert(vacrel->do_index_vacuuming);
 		Assert(vacrel->do_index_cleanup);
 
@@ -1923,7 +2145,7 @@ lazy_vacuum(LVRelState *vacrel)
 		 * cases then this may need to be reconsidered.
 		 */
 		threshold = (double) vacrel->rel_pages * BYPASS_THRESHOLD_PAGES;
-		bypass = (vacrel->lpdead_item_pages < threshold &&
+		bypass = (vacrel->counters->lpdead_item_pages < threshold &&
 				  (TidStoreMemoryUsage(vacrel->dead_items) < (32L * 1024L * 1024L)));
 	}
 
@@ -2061,7 +2283,7 @@ lazy_vacuum_all_indexes(LVRelState *vacrel)
 	 * place).
 	 */
 	Assert(vacrel->num_index_scans > 0 ||
-		   vacrel->dead_items_info->num_items == vacrel->lpdead_items);
+		   vacrel->dead_items_info->num_items == vacrel->counters->lpdead_items);
 	Assert(allindexes || VacuumFailsafeActive);
 
 	/*
@@ -2165,8 +2387,8 @@ lazy_vacuum_heap_rel(LVRelState *vacrel)
 	 * the second heap pass.  No more, no less.
 	 */
 	Assert(vacrel->num_index_scans > 1 ||
-		   (vacrel->dead_items_info->num_items == vacrel->lpdead_items &&
-			vacuumed_pages == vacrel->lpdead_item_pages));
+		   (vacrel->dead_items_info->num_items == vacrel->counters->lpdead_items &&
+			vacuumed_pages == vacrel->counters->lpdead_item_pages));
 
 	ereport(DEBUG2,
 			(errmsg("table \"%s\": removed %lld dead item identifiers in %u pages",
@@ -2347,7 +2569,7 @@ static void
 lazy_cleanup_all_indexes(LVRelState *vacrel)
 {
 	double		reltuples = vacrel->new_rel_tuples;
-	bool		estimated_count = vacrel->scanned_pages < vacrel->rel_pages;
+	bool		estimated_count = vacrel->counters->scanned_pages < vacrel->rel_pages;
 	const int	progress_start_index[] = {
 		PROGRESS_VACUUM_PHASE,
 		PROGRESS_VACUUM_INDEXES_TOTAL
@@ -2528,7 +2750,7 @@ should_attempt_truncation(LVRelState *vacrel)
 	if (!vacrel->do_rel_truncate || VacuumFailsafeActive)
 		return false;
 
-	possibly_freeable = vacrel->rel_pages - vacrel->nonempty_pages;
+	possibly_freeable = vacrel->rel_pages - vacrel->counters->nonempty_pages;
 	if (possibly_freeable > 0 &&
 		(possibly_freeable >= REL_TRUNCATE_MINIMUM ||
 		 possibly_freeable >= vacrel->rel_pages / REL_TRUNCATE_FRACTION))
@@ -2554,7 +2776,7 @@ lazy_truncate_heap(LVRelState *vacrel)
 
 	/* Update error traceback information one last time */
 	update_vacuum_error_info(vacrel, NULL, VACUUM_ERRCB_PHASE_TRUNCATE,
-							 vacrel->nonempty_pages, InvalidOffsetNumber);
+							 vacrel->counters->nonempty_pages, InvalidOffsetNumber);
 
 	/*
 	 * Loop until no more truncating can be done.
@@ -2655,7 +2877,7 @@ lazy_truncate_heap(LVRelState *vacrel)
 		 * without also touching reltuples, since the tuple count wasn't
 		 * changed by the truncation.
 		 */
-		vacrel->removed_pages += orig_rel_pages - new_rel_pages;
+		vacrel->counters->removed_pages += orig_rel_pages - new_rel_pages;
 		vacrel->rel_pages = new_rel_pages;
 
 		ereport(vacrel->verbose ? INFO : DEBUG2,
@@ -2663,7 +2885,7 @@ lazy_truncate_heap(LVRelState *vacrel)
 						vacrel->relname,
 						orig_rel_pages, new_rel_pages)));
 		orig_rel_pages = new_rel_pages;
-	} while (new_rel_pages > vacrel->nonempty_pages && lock_waiter_detected);
+	} while (new_rel_pages > vacrel->counters->nonempty_pages && lock_waiter_detected);
 }
 
 /*
@@ -2691,7 +2913,7 @@ count_nondeletable_pages(LVRelState *vacrel, bool *lock_waiter_detected)
 	StaticAssertStmt((PREFETCH_SIZE & (PREFETCH_SIZE - 1)) == 0,
 					 "prefetch size must be power of 2");
 	prefetchedUntil = InvalidBlockNumber;
-	while (blkno > vacrel->nonempty_pages)
+	while (blkno > vacrel->counters->nonempty_pages)
 	{
 		Buffer		buf;
 		Page		page;
@@ -2803,7 +3025,7 @@ count_nondeletable_pages(LVRelState *vacrel, bool *lock_waiter_detected)
 	 * pages still are; we need not bother to look at the last known-nonempty
 	 * page.
 	 */
-	return vacrel->nonempty_pages;
+	return vacrel->counters->nonempty_pages;
 }
 
 /*
@@ -2821,12 +3043,8 @@ dead_items_alloc(LVRelState *vacrel, int nworkers)
 		autovacuum_work_mem != -1 ?
 		autovacuum_work_mem : maintenance_work_mem;
 
-	/*
-	 * Initialize state for a parallel vacuum.  As of now, only one worker can
-	 * be used for an index, so we invoke parallelism only if there are at
-	 * least two indexes on a table.
-	 */
-	if (nworkers >= 0 && vacrel->nindexes > 1 && vacrel->do_index_vacuuming)
+	/* Initialize state for a parallel vacuum */
+	if (nworkers >= 0)
 	{
 		/*
 		 * Since parallel workers cannot access data in temporary tables, we
@@ -2844,11 +3062,18 @@ dead_items_alloc(LVRelState *vacrel, int nworkers)
 								vacrel->relname)));
 		}
 		else
+		{
+			/*
+			 * For parallel index vacuuming, only one worker can be used for
+			 * an index, we invoke parallelism only if there are at least two
+			 * indexes on a table.
+			 */
 			vacrel->pvs = parallel_vacuum_init(vacrel->rel, vacrel->indrels,
 											   vacrel->nindexes, nworkers,
 											   vac_work_mem,
 											   vacrel->verbose ? INFO : DEBUG2,
-											   vacrel->bstrategy);
+											   vacrel->bstrategy, (void *) vacrel);
+		}
 
 		/*
 		 * If parallel mode started, dead_items and dead_items_info spaces are
@@ -2889,9 +3114,19 @@ dead_items_add(LVRelState *vacrel, BlockNumber blkno, OffsetNumber *offsets,
 	};
 	int64		prog_val[2];
 
+	/*
+	 * Protect both dead_items and dead_items_info from concurrent updates in
+	 * parallel heap scan cases.
+	 */
+	if (ParallelHeapVacuumIsActive(vacrel))
+		TidStoreLockExclusive(dead_items);
+
 	TidStoreSetBlockOffsets(dead_items, blkno, offsets, num_offsets);
 	vacrel->dead_items_info->num_items += num_offsets;
 
+	if (ParallelHeapVacuumIsActive(vacrel))
+		TidStoreUnlock(dead_items);
+
 	/* update the progress information */
 	prog_val[0] = vacrel->dead_items_info->num_items;
 	prog_val[1] = TidStoreMemoryUsage(dead_items);
@@ -3093,6 +3328,357 @@ update_relstats_all_indexes(LVRelState *vacrel)
 	}
 }
 
+/*
+ * Compute the number of parallel workers for parallel vacuum heap scan.
+ *
+ * The calculation logic is borrowed from compute_parallel_worker().
+ */
+int
+heap_parallel_vacuum_compute_workers(Relation rel, int nrequested)
+{
+	int			parallel_workers = 0;
+	int			heap_parallel_threshold;
+	int			heap_pages;
+
+	if (nrequested == 0)
+	{
+		/*
+		 * Select the number of workers based on the log of the size of the
+		 * relation.  This probably needs to be a good deal more
+		 * sophisticated, but we need something here for now.  Note that the
+		 * upper limit of the min_parallel_table_scan_size GUC is chosen to
+		 * prevent overflow here.
+		 */
+		heap_parallel_threshold = Max(min_parallel_table_scan_size, 1);
+		heap_pages = RelationGetNumberOfBlocks(rel);
+		while (heap_pages >= (BlockNumber) (heap_parallel_threshold * 3))
+		{
+			parallel_workers++;
+			heap_parallel_threshold *= 3;
+			if (heap_parallel_threshold > INT_MAX / 3)
+				break;
+		}
+	}
+	else
+		parallel_workers = nrequested;
+
+	return parallel_workers;
+}
+
+/*
+ * Compute the amount of space we'll need in the parallel heap vacuum
+ * DSM, and inform pcxt->estimator about our needs.
+ *
+ * nworkers is the number of workers for the table vacuum. Note that it could
+ * be different than pcxt->nworkers since it is the maximum of number of
+ * workers for table vacuum and index vacuum.
+ */
+void
+heap_parallel_vacuum_estimate(Relation rel, ParallelContext *pcxt,
+							  int nworkers, void *state)
+{
+	Size		size = 0;
+	LVRelState *vacrel = (LVRelState *) state;
+
+	/* space for PHVShared */
+	size = add_size(size, SizeOfPHVShared);
+	size = add_size(size, mul_size(sizeof(LVRelCounters), nworkers));
+	vacrel->shared_len = size;
+	shm_toc_estimate_chunk(&pcxt->estimator, size);
+	shm_toc_estimate_keys(&pcxt->estimator, 1);
+
+	/* space for ParallelBlockTableScanDesc */
+	vacrel->pscan_len = table_block_parallelscan_estimate(rel);
+	shm_toc_estimate_chunk(&pcxt->estimator, vacrel->pscan_len);
+	shm_toc_estimate_keys(&pcxt->estimator, 1);
+
+	/* space for per-worker scan state, PHVScanWorkerState */
+	vacrel->pscanwork_len = mul_size(sizeof(PHVScanWorkerState), nworkers);
+	shm_toc_estimate_chunk(&pcxt->estimator, vacrel->pscanwork_len);
+	shm_toc_estimate_keys(&pcxt->estimator, 1);
+}
+
+/*
+ * Set up shared memory for parallel heap vacuum.
+ */
+void
+heap_parallel_vacuum_initialize(Relation rel, ParallelContext *pcxt,
+								int nworkers, void *state)
+{
+	LVRelState *vacrel = (LVRelState *) state;
+	ParallelBlockTableScanDesc pscan;
+	PHVScanWorkerState *pscanwork;
+	PHVShared  *shared;
+	PHVState   *phvstate;
+
+	phvstate = (PHVState *) palloc(sizeof(PHVState));
+
+	shared = shm_toc_allocate(pcxt->toc, vacrel->shared_len);
+
+	/* Prepare the shared information */
+
+	MemSet(shared, 0, vacrel->shared_len);
+	shared->aggressive = vacrel->aggressive;
+	shared->skipwithvm = vacrel->skipwithvm;
+	shared->cutoffs = vacrel->cutoffs;
+	shared->NewRelfrozenXid = vacrel->counters->NewRelfrozenXid;
+	shared->NewRelminMxid = vacrel->counters->NewRelminMxid;
+	shared->skippedallvis = vacrel->counters->skippedallvis;
+
+	/*
+	 * XXX: we copy the contents of vistest to the shared area, but in order
+	 * to do that, we need to either expose GlobalVisTest or to provide
+	 * functions to copy contents of GlobalVisTest to somewhere. Currently we
+	 * do the former but not sure it's the best choice.
+	 *
+	 * Alternative idea is to have each worker determine cutoff and have their
+	 * own vistest. But we need to carefully consider it since parallel
+	 * workers end up having different cutoff and horizon.
+	 */
+	shared->vistest = *vacrel->vistest;
+
+	shm_toc_insert(pcxt->toc, LV_PARALLEL_SCAN_SHARED, shared);
+
+	phvstate->shared = shared;
+
+	/* prepare the  parallel block table scan description */
+	pscan = shm_toc_allocate(pcxt->toc, vacrel->pscan_len);
+	shm_toc_insert(pcxt->toc, LV_PARALLEL_SCAN_DESC, pscan);
+
+	/* initialize parallel scan description */
+	table_block_parallelscan_initialize(rel, (ParallelTableScanDesc) pscan);
+	phvstate->pscandesc = pscan;
+
+	/* prepare the workers' parallel block table scan state */
+	pscanwork = shm_toc_allocate(pcxt->toc, vacrel->pscanwork_len);
+	MemSet(pscanwork, 0, vacrel->pscanwork_len);
+	shm_toc_insert(pcxt->toc, LV_PARALLEL_SCAN_DESC_WORKER, pscanwork);
+	phvstate->scanstates = pscanwork;
+
+	vacrel->phvstate = phvstate;
+}
+
+/*
+ * Main function for parallel heap vacuum workers.
+ */
+void
+heap_parallel_vacuum_scan_worker(Relation rel, ParallelVacuumState *pvs,
+								 ParallelWorkerContext *pwcxt)
+{
+	LVRelState	vacrel = {0};
+	PHVState   *phvstate;
+	PHVShared  *shared;
+	ParallelBlockTableScanDesc pscandesc;
+	PHVScanWorkerState *scanstate;
+	LVRelCounters *counters;
+	bool		scan_done;
+
+	phvstate = palloc(sizeof(PHVState));
+
+	pscandesc = (ParallelBlockTableScanDesc) shm_toc_lookup(pwcxt->toc,
+															LV_PARALLEL_SCAN_DESC,
+															false);
+	phvstate->pscandesc = pscandesc;
+
+	shared = (PHVShared *) shm_toc_lookup(pwcxt->toc, LV_PARALLEL_SCAN_SHARED,
+										  false);
+	phvstate->shared = shared;
+
+	scanstate = (PHVScanWorkerState *) shm_toc_lookup(pwcxt->toc,
+													  LV_PARALLEL_SCAN_DESC_WORKER,
+													  false);
+
+	phvstate->myscanstate = &(scanstate[ParallelWorkerNumber]);
+	counters = &(shared->worker_relcnts[ParallelWorkerNumber]);
+
+	/* Prepare LVRelState */
+	vacrel.rel = rel;
+	vacrel.indrels = parallel_vacuum_get_table_indexes(pvs, &vacrel.nindexes);
+	vacrel.pvs = pvs;
+	vacrel.phvstate = phvstate;
+	vacrel.aggressive = shared->aggressive;
+	vacrel.skipwithvm = shared->skipwithvm;
+	vacrel.cutoffs = shared->cutoffs;
+	vacrel.vistest = &(shared->vistest);
+	vacrel.dead_items = parallel_vacuum_get_dead_items(pvs,
+													   &vacrel.dead_items_info);
+	vacrel.rel_pages = RelationGetNumberOfBlocks(rel);
+	vacrel.counters = counters;
+
+	/* initialize per-worker relation statistics */
+	MemSet(counters, 0, sizeof(LVRelCounters));
+
+	vacrel.counters->NewRelfrozenXid = shared->NewRelfrozenXid;
+	vacrel.counters->NewRelminMxid = shared->NewRelminMxid;
+	vacrel.counters->skippedallvis = shared->skippedallvis;
+
+	/*
+	 * XXX: the following fields are not set yet: - index vacuum related
+	 * fields such as consider_bypass_optimization, do_index_vacuuming etc. -
+	 * error reporting state. - statistics such as scanned_pages etc. - oldest
+	 * extant XID/MXID. - states maintained by heap_vac_scan_next_block()
+	 */
+
+	/* Initialize the start block if not yet */
+	if (!phvstate->myscanstate->maybe_have_blocks)
+	{
+		table_block_parallelscan_startblock_init(rel,
+												 &(phvstate->myscanstate->state),
+												 phvstate->pscandesc);
+
+		phvstate->myscanstate->maybe_have_blocks = false;
+	}
+
+	/*
+	 * XXX: if we want to support parallel heap *vacuum*, we need to allow
+	 * workers to call different function based on the shared information.
+	 */
+	scan_done = do_lazy_scan_heap(&vacrel);
+
+	phvstate->myscanstate->maybe_have_blocks = !scan_done;
+}
+
+/*
+ * Complete parallel heaps scans that have remaining blocks in their
+ * chunks.
+ */
+static void
+parallel_heap_complete_unfinised_scan(LVRelState *vacrel)
+{
+	int			nworkers;
+
+	Assert(!IsParallelWorker());
+
+	nworkers = parallel_vacuum_get_nworkers_table(vacrel->pvs);
+
+	for (int i = 0; i < nworkers; i++)
+	{
+		PHVScanWorkerState *wstate = &(vacrel->phvstate->scanstates[i]);
+		bool		scan_done PG_USED_FOR_ASSERTS_ONLY;
+
+		if (!wstate->maybe_have_blocks)
+			continue;
+
+		vacrel->phvstate->myscanstate = wstate;
+
+		scan_done = do_lazy_scan_heap(vacrel);
+
+		Assert(scan_done);
+	}
+}
+
+/*
+ * Accumulate relation counters that parallel workers collected into the
+ * leader's counters.
+ */
+static void
+parallel_heap_vacuum_gather_scan_stats(LVRelState *vacrel)
+{
+	PHVState   *phvstate = vacrel->phvstate;
+
+	Assert(ParallelHeapVacuumIsActive(vacrel));
+
+	for (int i = 0; i < phvstate->nworkers_launched; i++)
+	{
+		LVRelCounters *counters = &(phvstate->shared->worker_relcnts[i]);
+
+#define LV_ACCUM_ITEM(item) (vacrel)->counters->item += (counters)->item
+
+		LV_ACCUM_ITEM(scanned_pages);
+		LV_ACCUM_ITEM(removed_pages);
+		LV_ACCUM_ITEM(frozen_pages);
+		LV_ACCUM_ITEM(lpdead_item_pages);
+		LV_ACCUM_ITEM(missed_dead_pages);
+		LV_ACCUM_ITEM(nonempty_pages);
+		LV_ACCUM_ITEM(tuples_deleted);
+		LV_ACCUM_ITEM(tuples_frozen);
+		LV_ACCUM_ITEM(lpdead_items);
+		LV_ACCUM_ITEM(live_tuples);
+		LV_ACCUM_ITEM(recently_dead_tuples);
+		LV_ACCUM_ITEM(missed_dead_tuples);
+
+#undef LV_ACCUM_ITEM
+
+		if (TransactionIdPrecedes(counters->NewRelfrozenXid, vacrel->counters->NewRelfrozenXid))
+			vacrel->counters->NewRelfrozenXid = counters->NewRelfrozenXid;
+
+		if (MultiXactIdPrecedesOrEquals(counters->NewRelminMxid, vacrel->counters->NewRelminMxid))
+			vacrel->counters->NewRelminMxid = counters->NewRelminMxid;
+
+		if (!vacrel->counters->skippedallvis && counters->skippedallvis)
+			vacrel->counters->skippedallvis = true;
+	}
+}
+
+/*
+ * A parallel variant of do_lazy_scan_heap(). The leader process launches parallel
+ * workers to scan the heap in parallel.
+ */
+static void
+do_parallel_lazy_scan_heap(LVRelState *vacrel)
+{
+	PHVScanWorkerState *scanstate;
+	TidStore   *dead_items = vacrel->dead_items;
+	VacDeadItemsInfo *dead_items_info = vacrel->dead_items_info;
+
+	Assert(ParallelHeapVacuumIsActive(vacrel));
+	Assert(!IsParallelWorker());
+
+	/* launcher workers */
+	vacrel->phvstate->nworkers_launched = parallel_vacuum_table_scan_begin(vacrel->pvs);
+
+	/* initialize parallel scan description to join as a worker */
+	scanstate = palloc(sizeof(PHVScanWorkerState));
+	table_block_parallelscan_startblock_init(vacrel->rel, &(scanstate->state),
+											 vacrel->phvstate->pscandesc);
+	vacrel->phvstate->myscanstate = scanstate;
+
+	for (;;)
+	{
+		bool		scan_done PG_USED_FOR_ASSERTS_ONLY;
+
+		/*
+		 * Scan the table until either we are close to overrunning the
+		 * available space for dead_items TIDs or we reach the end of the
+		 * table.
+		 */
+		scan_done = do_lazy_scan_heap(vacrel);
+
+		/* stop parallel workers and gather the collected stats */
+		parallel_vacuum_table_scan_end(vacrel->pvs);
+		parallel_heap_vacuum_gather_scan_stats(vacrel);
+
+		/*
+		 * Consider if we definitely have enough space to process TIDs on page
+		 * already.  If we are close to overrunning the available space for
+		 * dead_items TIDs, pause and do a cycle of vacuuming before we tackle
+		 * this page.
+		 */
+		if (TidStoreMemoryUsage(dead_items) > dead_items_info->max_bytes)
+		{
+			/* Perform a round of index and heap vacuuming */
+			vacrel->consider_bypass_optimization = false;
+			lazy_vacuum(vacrel);
+
+			/* Report that we are once again scanning the heap */
+			pgstat_progress_update_param(PROGRESS_VACUUM_PHASE,
+										 PROGRESS_VACUUM_PHASE_SCAN_HEAP);
+
+			/* re-launcher workers */
+			vacrel->phvstate->nworkers_launched =
+				parallel_vacuum_table_scan_begin(vacrel->pvs);
+
+			continue;
+		}
+
+		/* We reach the end of the table */
+		Assert(scan_done);
+		break;
+	}
+
+	parallel_heap_complete_unfinised_scan(vacrel);
+}
+
 /*
  * Error context callback for errors occurring during vacuum.  The error
  * context messages for index phases should match the messages set in parallel
diff --git a/src/backend/commands/vacuumparallel.c b/src/backend/commands/vacuumparallel.c
index f26070bff2..e1759da69a 100644
--- a/src/backend/commands/vacuumparallel.c
+++ b/src/backend/commands/vacuumparallel.c
@@ -28,6 +28,7 @@
 
 #include "access/amapi.h"
 #include "access/table.h"
+#include "access/tableam.h"
 #include "access/xact.h"
 #include "commands/progress.h"
 #include "commands/vacuum.h"
@@ -64,6 +65,12 @@ typedef struct PVShared
 	Oid			relid;
 	int			elevel;
 
+	/*
+	 * True if the caller wants parallel workers to invoke vacuum table scan
+	 * callback.
+	 */
+	bool		do_vacuum_table_scan;
+
 	/*
 	 * Fields for both index vacuum and cleanup.
 	 *
@@ -163,6 +170,9 @@ struct ParallelVacuumState
 	/* NULL for worker processes */
 	ParallelContext *pcxt;
 
+	/* Passed to parallel table scan workers. NULL for leader process */
+	ParallelWorkerContext *pwcxt;
+
 	/* Parent Heap Relation */
 	Relation	heaprel;
 
@@ -192,6 +202,16 @@ struct ParallelVacuumState
 	/* Points to WAL usage area in DSM */
 	WalUsage   *wal_usage;
 
+	/*
+	 * The number of workers for parallel table scan/vacuuming and index
+	 * vacuuming, respectively.
+	 */
+	int			nworkers_for_table;
+	int			nworkers_for_index;
+
+	/* How many parallel table vacuum scan is called? */
+	int			num_table_scans;
+
 	/*
 	 * False if the index is totally unsuitable target for all parallel
 	 * processing. For example, the index could be <
@@ -220,8 +240,9 @@ struct ParallelVacuumState
 	PVIndVacStatus status;
 };
 
-static int	parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested,
-											bool *will_parallel_vacuum);
+static void parallel_vacuum_compute_workers(Relation rel, Relation *indrels, int nindexes,
+											int nrequested, int *nworkers_table,
+											int *nworkers_index, bool *will_parallel_vacuum);
 static void parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scans,
 												bool vacuum);
 static void parallel_vacuum_process_safe_indexes(ParallelVacuumState *pvs);
@@ -241,7 +262,7 @@ static void parallel_vacuum_error_callback(void *arg);
 ParallelVacuumState *
 parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes,
 					 int nrequested_workers, int vac_work_mem,
-					 int elevel, BufferAccessStrategy bstrategy)
+					 int elevel, BufferAccessStrategy bstrategy, void *state)
 {
 	ParallelVacuumState *pvs;
 	ParallelContext *pcxt;
@@ -255,6 +276,8 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes,
 	Size		est_shared_len;
 	int			nindexes_mwm = 0;
 	int			parallel_workers = 0;
+	int			nworkers_table;
+	int			nworkers_index;
 	int			querylen;
 
 	/*
@@ -262,15 +285,17 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes,
 	 * relation
 	 */
 	Assert(nrequested_workers >= 0);
-	Assert(nindexes > 0);
 
 	/*
 	 * Compute the number of parallel vacuum workers to launch
 	 */
 	will_parallel_vacuum = (bool *) palloc0(sizeof(bool) * nindexes);
-	parallel_workers = parallel_vacuum_compute_workers(indrels, nindexes,
-													   nrequested_workers,
-													   will_parallel_vacuum);
+	parallel_vacuum_compute_workers(rel, indrels, nindexes, nrequested_workers,
+									&nworkers_table, &nworkers_index,
+									will_parallel_vacuum);
+
+	parallel_workers = Max(nworkers_table, nworkers_index);
+
 	if (parallel_workers <= 0)
 	{
 		/* Can't perform vacuum in parallel -- return NULL */
@@ -284,6 +309,8 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes,
 	pvs->will_parallel_vacuum = will_parallel_vacuum;
 	pvs->bstrategy = bstrategy;
 	pvs->heaprel = rel;
+	pvs->nworkers_for_table = nworkers_table;
+	pvs->nworkers_for_index = nworkers_index;
 
 	EnterParallelMode();
 	pcxt = CreateParallelContext("postgres", "parallel_vacuum_main",
@@ -326,6 +353,10 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes,
 	else
 		querylen = 0;			/* keep compiler quiet */
 
+	/* Estimate AM-specific space for parallel table vacuum */
+	if (nworkers_table > 0)
+		table_parallel_vacuum_estimate(rel, pcxt, nworkers_table, state);
+
 	InitializeParallelDSM(pcxt);
 
 	/* Prepare index vacuum stats */
@@ -417,6 +448,10 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes,
 					   PARALLEL_VACUUM_KEY_QUERY_TEXT, sharedquery);
 	}
 
+	/* Prepare AM-specific DSM for parallel table vacuum */
+	if (nworkers_table > 0)
+		table_parallel_vacuum_initialize(rel, pcxt, nworkers_table, state);
+
 	/* Success -- return parallel vacuum state */
 	return pvs;
 }
@@ -538,27 +573,41 @@ parallel_vacuum_cleanup_all_indexes(ParallelVacuumState *pvs, long num_table_tup
  * min_parallel_index_scan_size as invoking workers for very small indexes
  * can hurt performance.
  *
+ * XXX needs to mention about the number of workers for table.
+ *
  * nrequested is the number of parallel workers that user requested.  If
  * nrequested is 0, we compute the parallel degree based on nindexes, that is
  * the number of indexes that support parallel vacuum.  This function also
  * sets will_parallel_vacuum to remember indexes that participate in parallel
  * vacuum.
  */
-static int
-parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested,
-								bool *will_parallel_vacuum)
+static void
+parallel_vacuum_compute_workers(Relation rel, Relation *indrels, int nindexes,
+								int nrequested, int *nworkers_table,
+								int *nworkers_index, bool *will_parallel_vacuum)
 {
 	int			nindexes_parallel = 0;
 	int			nindexes_parallel_bulkdel = 0;
 	int			nindexes_parallel_cleanup = 0;
-	int			parallel_workers;
+	int			parallel_workers_table = 0;
+	int			parallel_workers_index = 0;
+
+	*nworkers_table = 0;
+	*nworkers_index = 0;
 
 	/*
 	 * We don't allow performing parallel operation in standalone backend or
 	 * when parallelism is disabled.
 	 */
 	if (!IsUnderPostmaster || max_parallel_maintenance_workers == 0)
-		return 0;
+		return;
+
+	/*
+	 * Compute the number of workers for parallel table scan. Cap by
+	 * max_parallel_maintenance_workers.
+	 */
+	parallel_workers_table = Min(table_paralle_vacuum_compute_workers(rel, nrequested),
+								 max_parallel_maintenance_workers);
 
 	/*
 	 * Compute the number of indexes that can participate in parallel vacuum.
@@ -589,17 +638,18 @@ parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested,
 	nindexes_parallel--;
 
 	/* No index supports parallel vacuum */
-	if (nindexes_parallel <= 0)
-		return 0;
-
-	/* Compute the parallel degree */
-	parallel_workers = (nrequested > 0) ?
-		Min(nrequested, nindexes_parallel) : nindexes_parallel;
+	if (nindexes_parallel > 0)
+	{
+		/* Compute the parallel degree for parallel index vacuum */
+		parallel_workers_index = (nrequested > 0) ?
+			Min(nrequested, nindexes_parallel) : nindexes_parallel;
 
-	/* Cap by max_parallel_maintenance_workers */
-	parallel_workers = Min(parallel_workers, max_parallel_maintenance_workers);
+		/* Cap by max_parallel_maintenance_workers */
+		parallel_workers_index = Min(parallel_workers_index, max_parallel_maintenance_workers);
+	}
 
-	return parallel_workers;
+	*nworkers_table = parallel_workers_table;
+	*nworkers_index = parallel_workers_index;
 }
 
 /*
@@ -669,7 +719,7 @@ parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scan
 	if (nworkers > 0)
 	{
 		/* Reinitialize parallel context to relaunch parallel workers */
-		if (num_index_scans > 0)
+		if (num_index_scans > 0 || pvs->num_table_scans > 0)
 			ReinitializeParallelDSM(pvs->pcxt);
 
 		/*
@@ -978,6 +1028,120 @@ parallel_vacuum_index_is_parallel_safe(Relation indrel, int num_index_scans,
 	return true;
 }
 
+/*
+ * A parallel worker invokes table-AM specified vacuum scan callback.
+ */
+static void
+parallel_vacuum_process_table(ParallelVacuumState *pvs)
+{
+	/*
+	 * Increment the active worker count if we are able to launch any worker.
+	 */
+	if (VacuumActiveNWorkers)
+		pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1);
+
+	/* Do table vacuum scan */
+	table_parallel_vacuum_scan(pvs->heaprel, pvs, pvs->pwcxt);
+
+	/*
+	 * We have completed the table vacuum so decrement the active worker
+	 * count.
+	 */
+	if (VacuumActiveNWorkers)
+		pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1);
+}
+
+/*
+ * Prepare DSM and vacuum delay, and launch parallel workers for parallel
+ * table vacuum scan.
+ */
+int
+parallel_vacuum_table_scan_begin(ParallelVacuumState *pvs)
+{
+	Assert(!IsParallelWorker());
+
+	if (pvs->nworkers_for_table == 0)
+		return 0;
+
+	pg_atomic_write_u32(&(pvs->shared->cost_balance), VacuumCostBalance);
+	pg_atomic_write_u32(&(pvs->shared->active_nworkers), 0);
+
+	pvs->shared->do_vacuum_table_scan = true;
+
+	if (pvs->num_table_scans > 0)
+		ReinitializeParallelDSM(pvs->pcxt);
+
+	ReinitializeParallelWorkers(pvs->pcxt, pvs->nworkers_for_table);
+
+	LaunchParallelWorkers(pvs->pcxt);
+
+	if (pvs->pcxt->nworkers_launched > 0)
+	{
+		/*
+		 * Reset the local cost values for leader backend as we have already
+		 * accumulated the remaining balance of heap.
+		 */
+		VacuumCostBalance = 0;
+		VacuumCostBalanceLocal = 0;
+
+		/* Enable shared cost balance for leader backend */
+		VacuumSharedCostBalance = &(pvs->shared->cost_balance);
+		VacuumActiveNWorkers = &(pvs->shared->active_nworkers);
+	}
+
+	ereport(pvs->shared->elevel,
+			(errmsg(ngettext("launched %d parallel vacuum worker for table scanning (planned: %d)",
+							 "launched %d parallel vacuum workers for table scanning (planned: %d)",
+							 pvs->pcxt->nworkers_launched),
+					pvs->pcxt->nworkers_launched, pvs->nworkers_for_table)));
+
+	return pvs->pcxt->nworkers_launched;
+}
+
+/*
+ * Wait for all workers for parallel table vacuum scan, and gather statistics.
+ */
+void
+parallel_vacuum_table_scan_end(ParallelVacuumState *pvs)
+{
+	Assert(!IsParallelWorker());
+
+	if (pvs->nworkers_for_table == 0)
+		return;
+
+	WaitForParallelWorkersToFinish(pvs->pcxt);
+
+	for (int i = 0; i < pvs->pcxt->nworkers_launched; i++)
+		InstrAccumParallelQuery(&pvs->buffer_usage[i], &pvs->wal_usage[i]);
+
+	/*
+	 * Carry the shared balance value to heap scan and disable shared costing
+	 */
+	if (VacuumSharedCostBalance)
+	{
+		VacuumCostBalance = pg_atomic_read_u32(VacuumSharedCostBalance);
+		VacuumSharedCostBalance = NULL;
+		VacuumActiveNWorkers = NULL;
+	}
+
+	pvs->shared->do_vacuum_table_scan = false;
+	pvs->num_table_scans++;
+}
+
+Relation *
+parallel_vacuum_get_table_indexes(ParallelVacuumState *pvs, int *nindexes)
+{
+	*nindexes = pvs->nindexes;
+
+	return pvs->indrels;
+}
+
+int
+parallel_vacuum_get_nworkers_table(ParallelVacuumState *pvs)
+{
+	return pvs->nworkers_for_table;
+}
+
 /*
  * Perform work within a launched parallel process.
  *
@@ -1026,7 +1190,6 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
 	 * matched to the leader's one.
 	 */
 	vac_open_indexes(rel, RowExclusiveLock, &nindexes, &indrels);
-	Assert(nindexes > 0);
 
 	if (shared->maintenance_work_mem_worker > 0)
 		maintenance_work_mem = shared->maintenance_work_mem_worker;
@@ -1060,6 +1223,10 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
 	pvs.relname = pstrdup(RelationGetRelationName(rel));
 	pvs.heaprel = rel;
 
+	pvs.pwcxt = palloc(sizeof(ParallelWorkerContext));
+	pvs.pwcxt->toc = toc;
+	pvs.pwcxt->seg = seg;
+
 	/* These fields will be filled during index vacuum or cleanup */
 	pvs.indname = NULL;
 	pvs.status = PARALLEL_INDVAC_STATUS_INITIAL;
@@ -1077,8 +1244,15 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
 	/* Prepare to track buffer usage during parallel execution */
 	InstrStartParallelQuery();
 
-	/* Process indexes to perform vacuum/cleanup */
-	parallel_vacuum_process_safe_indexes(&pvs);
+	if (pvs.shared->do_vacuum_table_scan)
+	{
+		parallel_vacuum_process_table(&pvs);
+	}
+	else
+	{
+		/* Process indexes to perform vacuum/cleanup */
+		parallel_vacuum_process_safe_indexes(&pvs);
+	}
 
 	/* Report buffer/WAL usage during parallel execution */
 	buffer_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, false);
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index af3b15e93d..63c2548c54 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -164,15 +164,6 @@ typedef struct ProcArrayStruct
  *
  * The typedef is in the header.
  */
-struct GlobalVisState
-{
-	/* XIDs >= are considered running by some backend */
-	FullTransactionId definitely_needed;
-
-	/* XIDs < are not considered to be running by any backend */
-	FullTransactionId maybe_needed;
-};
-
 /*
  * Result of ComputeXidHorizons().
  */
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 9e9aec88a6..a80b3a17bf 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -20,6 +20,7 @@
 #include "access/skey.h"
 #include "access/table.h"		/* for backward compatibility */
 #include "access/tableam.h"
+#include "commands/vacuum.h"
 #include "nodes/lockoptions.h"
 #include "nodes/primnodes.h"
 #include "storage/bufpage.h"
@@ -393,6 +394,13 @@ extern void log_heap_prune_and_freeze(Relation relation, Buffer buffer,
 struct VacuumParams;
 extern void heap_vacuum_rel(Relation rel,
 							struct VacuumParams *params, BufferAccessStrategy bstrategy);
+extern int	heap_parallel_vacuum_compute_workers(Relation rel, int requested);
+extern void heap_parallel_vacuum_estimate(Relation rel, ParallelContext *pcxt,
+										  int nworkers, void *state);
+extern void heap_parallel_vacuum_initialize(Relation rel, ParallelContext *pcxt,
+											int nworkers, void *state);
+extern void heap_parallel_vacuum_scan_worker(Relation rel, ParallelVacuumState *pvs,
+											 ParallelWorkerContext *pwcxt);
 
 /* in heap/heapam_visibility.c */
 extern bool HeapTupleSatisfiesVisibility(HeapTuple htup, Snapshot snapshot,
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index da661289c1..e1bacc95cd 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -20,6 +20,7 @@
 #include "access/relscan.h"
 #include "access/sdir.h"
 #include "access/xact.h"
+#include "commands/vacuum.h"
 #include "executor/tuptable.h"
 #include "storage/read_stream.h"
 #include "utils/rel.h"
@@ -655,6 +656,46 @@ typedef struct TableAmRoutine
 									struct VacuumParams *params,
 									BufferAccessStrategy bstrategy);
 
+	/* ------------------------------------------------------------------------
+	 * Callbacks for parallel table vacuum.
+	 * ------------------------------------------------------------------------
+	 */
+
+	/*
+	 * Compute the number of parallel workers for parallel table vacuum. The
+	 * function must return 0 to disable parallel table vacuum.
+	 */
+	int			(*parallel_vacuum_compute_workers) (Relation rel, int requested);
+
+	/*
+	 * Compute the amount of DSM space AM need in the parallel table vacuum.
+	 *
+	 * Not called if parallel table vacuum is disabled.
+	 */
+	void		(*parallel_vacuum_estimate) (Relation rel,
+											 ParallelContext *pcxt,
+											 int nworkers,
+											 void *state);
+
+	/*
+	 * Initialize DSM space for parallel table vacuum.
+	 *
+	 * Not called if parallel table vacuum is disabled.
+	 */
+	void		(*parallel_vacuum_initialize) (Relation rel,
+											   ParallelContext *pctx,
+											   int nworkers,
+											   void *state);
+
+	/*
+	 * This callback is called for parallel table vacuum workers.
+	 *
+	 * Not called if parallel table vacuum is disabled.
+	 */
+	void		(*parallel_vacuum_scan_worker) (Relation rel,
+												ParallelVacuumState *pvs,
+												ParallelWorkerContext *pwcxt);
+
 	/*
 	 * Prepare to analyze block `blockno` of `scan`. The scan has been started
 	 * with table_beginscan_analyze().  See also
@@ -1710,6 +1751,33 @@ table_relation_vacuum(Relation rel, struct VacuumParams *params,
 	rel->rd_tableam->relation_vacuum(rel, params, bstrategy);
 }
 
+static inline int
+table_paralle_vacuum_compute_workers(Relation rel, int requested)
+{
+	return rel->rd_tableam->parallel_vacuum_compute_workers(rel, requested);
+}
+
+static inline void
+table_parallel_vacuum_estimate(Relation rel, ParallelContext *pcxt, int nworkers,
+							   void *state)
+{
+	rel->rd_tableam->parallel_vacuum_estimate(rel, pcxt, nworkers, state);
+}
+
+static inline void
+table_parallel_vacuum_initialize(Relation rel, ParallelContext *pcxt, int nworkers,
+								 void *state)
+{
+	rel->rd_tableam->parallel_vacuum_initialize(rel, pcxt, nworkers, state);
+}
+
+static inline void
+table_parallel_vacuum_scan(Relation rel, ParallelVacuumState *pvs,
+						   ParallelWorkerContext *pwcxt)
+{
+	rel->rd_tableam->parallel_vacuum_scan_worker(rel, pvs, pwcxt);
+}
+
 /*
  * Prepare to analyze the next block in the read stream. The scan needs to
  * have been  started with table_beginscan_analyze().  Note that this routine
diff --git a/src/include/commands/vacuum.h b/src/include/commands/vacuum.h
index 759f9a87d3..e665335b6f 100644
--- a/src/include/commands/vacuum.h
+++ b/src/include/commands/vacuum.h
@@ -360,7 +360,8 @@ extern void VacuumUpdateCosts(void);
 extern ParallelVacuumState *parallel_vacuum_init(Relation rel, Relation *indrels,
 												 int nindexes, int nrequested_workers,
 												 int vac_work_mem, int elevel,
-												 BufferAccessStrategy bstrategy);
+												 BufferAccessStrategy bstrategy,
+												 void *state);
 extern void parallel_vacuum_end(ParallelVacuumState *pvs, IndexBulkDeleteResult **istats);
 extern TidStore *parallel_vacuum_get_dead_items(ParallelVacuumState *pvs,
 												VacDeadItemsInfo **dead_items_info_p);
@@ -372,6 +373,10 @@ extern void parallel_vacuum_cleanup_all_indexes(ParallelVacuumState *pvs,
 												long num_table_tuples,
 												int num_index_scans,
 												bool estimated_count);
+extern int	parallel_vacuum_table_scan_begin(ParallelVacuumState *pvs);
+extern void parallel_vacuum_table_scan_end(ParallelVacuumState *pvs);
+extern int	parallel_vacuum_get_nworkers_table(ParallelVacuumState *pvs);
+extern Relation *parallel_vacuum_get_table_indexes(ParallelVacuumState *pvs, int *nindexes);
 extern void parallel_vacuum_main(dsm_segment *seg, shm_toc *toc);
 
 /* in commands/analyze.c */
diff --git a/src/include/utils/snapmgr.h b/src/include/utils/snapmgr.h
index 9398a84051..6ccb19a29f 100644
--- a/src/include/utils/snapmgr.h
+++ b/src/include/utils/snapmgr.h
@@ -102,8 +102,20 @@ extern char *ExportSnapshot(Snapshot snapshot);
 /*
  * These live in procarray.c because they're intimately linked to the
  * procarray contents, but thematically they better fit into snapmgr.h.
+ *
+ * XXX the struct definition is temporarily moved from procarray.c for
+ * parallel table vacuum development. We need to find a suitable way for
+ * parallel table vacuum workers to share the GlobalVisState.
  */
-typedef struct GlobalVisState GlobalVisState;
+typedef struct GlobalVisState
+{
+	/* XIDs >= are considered running by some backend */
+	FullTransactionId definitely_needed;
+
+	/* XIDs < are not considered to be running by any backend */
+	FullTransactionId maybe_needed;
+} GlobalVisState;
+
 extern GlobalVisState *GlobalVisTestFor(Relation rel);
 extern bool GlobalVisTestIsRemovableXid(GlobalVisState *state, TransactionId xid);
 extern bool GlobalVisTestIsRemovableFullXid(GlobalVisState *state, FullTransactionId fxid);
