diff --git a/doc/src/sgml/ref/vacuum.sgml b/doc/src/sgml/ref/vacuum.sgml
index e712226..59e90cb 100644
--- a/doc/src/sgml/ref/vacuum.sgml
+++ b/doc/src/sgml/ref/vacuum.sgml
@@ -30,6 +30,7 @@ VACUUM [ FULL ] [ FREEZE ] [ VERBOSE ] [ ANALYZE ] [ <replaceable class="PARAMET
     FREEZE
     VERBOSE
     ANALYZE
+    PARALLEL
     DISABLE_PAGE_SKIPPING
 
 <phrase>and <replaceable class="PARAMETER">table_and_columns</replaceable> is:</phrase>
@@ -142,6 +143,20 @@ VACUUM [ FULL ] [ FREEZE ] [ VERBOSE ] [ ANALYZE ] [ <replaceable class="PARAMET
    </varlistentry>
 
    <varlistentry>
+    <term><literal>PARALLEL <replaceable class="PARAMETER">N</replaceable></literal></term>
+    <listitem>
+     <para>
+      Execute <command>VACUUM</command> in parallel by <replaceable class="PARAMETER">N
+      </replaceable> background workers. Collecting garbage on table is processed
+      in block-level parallel. For tables with indexes, parallel vacuum assigns each
+      index to each parallel vacuum worker and all garbages on a index are processed
+      by particular parallel vacuum worker. This option can not use with <literal>FULL</>
+      option.
+     </para>
+    </listitem>
+   </varlistentry>
+
+   <varlistentry>
     <term><literal>DISABLE_PAGE_SKIPPING</literal></term>
     <listitem>
      <para>
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index c435482..8e64650 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -91,7 +91,6 @@ static HeapScanDesc heap_beginscan_internal(Relation relation,
 						bool is_samplescan,
 						bool temp_snap);
 static void heap_parallelscan_startblock_init(HeapScanDesc scan);
-static BlockNumber heap_parallelscan_nextpage(HeapScanDesc scan);
 static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup,
 					TransactionId xid, CommandId cid, int options);
 static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf,
@@ -1715,7 +1714,7 @@ retry:
  *		first backend gets an InvalidBlockNumber return.
  * ----------------
  */
-static BlockNumber
+BlockNumber
 heap_parallelscan_nextpage(HeapScanDesc scan)
 {
 	BlockNumber page;
diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c
index c6f7b7a..dd22bb9 100644
--- a/src/backend/access/transam/parallel.c
+++ b/src/backend/access/transam/parallel.c
@@ -20,6 +20,7 @@
 #include "access/xlog.h"
 #include "catalog/namespace.h"
 #include "commands/async.h"
+#include "commands/vacuum.h"
 #include "executor/execParallel.h"
 #include "libpq/libpq.h"
 #include "libpq/pqformat.h"
@@ -122,6 +123,9 @@ static const struct
 {
 	{
 		"ParallelQueryMain", ParallelQueryMain
+	},
+	{
+		"LazyVacuumWorkerMain", LazyVacuumWorkerMain
 	}
 };
 
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index f439b55..4c9ac4f 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -74,7 +74,7 @@ static void vac_truncate_clog(TransactionId frozenXID,
 				  MultiXactId minMulti,
 				  TransactionId lastSaneFrozenXid,
 				  MultiXactId lastSaneMinMulti);
-static bool vacuum_rel(Oid relid, RangeVar *relation, int options,
+static bool vacuum_rel(Oid relid, RangeVar *relation, VacuumOptions options,
 		   VacuumParams *params);
 
 /*
@@ -89,15 +89,15 @@ ExecVacuum(VacuumStmt *vacstmt, bool isTopLevel)
 	VacuumParams params;
 
 	/* sanity checks on options */
-	Assert(vacstmt->options & (VACOPT_VACUUM | VACOPT_ANALYZE));
-	Assert((vacstmt->options & VACOPT_VACUUM) ||
-		   !(vacstmt->options & (VACOPT_FULL | VACOPT_FREEZE)));
-	Assert(!(vacstmt->options & VACOPT_SKIPTOAST));
+	Assert(vacstmt->options.flags & (VACOPT_VACUUM | VACOPT_ANALYZE));
+	Assert((vacstmt->options.flags & VACOPT_VACUUM) ||
+		   !(vacstmt->options.flags & (VACOPT_FULL | VACOPT_FREEZE)));
+	Assert(!(vacstmt->options.flags & VACOPT_SKIPTOAST));
 
 	/*
 	 * Make sure VACOPT_ANALYZE is specified if any column lists are present.
 	 */
-	if (!(vacstmt->options & VACOPT_ANALYZE))
+	if (!(vacstmt->options.flags & VACOPT_ANALYZE))
 	{
 		ListCell   *lc;
 
@@ -116,7 +116,7 @@ ExecVacuum(VacuumStmt *vacstmt, bool isTopLevel)
 	 * All freeze ages are zero if the FREEZE option is given; otherwise pass
 	 * them as -1 which means to use the default values.
 	 */
-	if (vacstmt->options & VACOPT_FREEZE)
+	if (vacstmt->options.flags & VACOPT_FREEZE)
 	{
 		params.freeze_min_age = 0;
 		params.freeze_table_age = 0;
@@ -163,7 +163,7 @@ ExecVacuum(VacuumStmt *vacstmt, bool isTopLevel)
  * memory context that will not disappear at transaction commit.
  */
 void
-vacuum(int options, List *relations, VacuumParams *params,
+vacuum(VacuumOptions options, List *relations, VacuumParams *params,
 	   BufferAccessStrategy bstrategy, bool isTopLevel)
 {
 	static bool in_vacuum = false;
@@ -174,7 +174,7 @@ vacuum(int options, List *relations, VacuumParams *params,
 
 	Assert(params != NULL);
 
-	stmttype = (options & VACOPT_VACUUM) ? "VACUUM" : "ANALYZE";
+	stmttype = (options.flags & VACOPT_VACUUM) ? "VACUUM" : "ANALYZE";
 
 	/*
 	 * We cannot run VACUUM inside a user transaction block; if we were inside
@@ -184,7 +184,7 @@ vacuum(int options, List *relations, VacuumParams *params,
 	 *
 	 * ANALYZE (without VACUUM) can run either way.
 	 */
-	if (options & VACOPT_VACUUM)
+	if (options.flags & VACOPT_VACUUM)
 	{
 		PreventTransactionChain(isTopLevel, stmttype);
 		in_outer_xact = false;
@@ -206,17 +206,26 @@ vacuum(int options, List *relations, VacuumParams *params,
 	/*
 	 * Sanity check DISABLE_PAGE_SKIPPING option.
 	 */
-	if ((options & VACOPT_FULL) != 0 &&
-		(options & VACOPT_DISABLE_PAGE_SKIPPING) != 0)
+	if ((options.flags & VACOPT_FULL) != 0 &&
+		(options.flags & VACOPT_DISABLE_PAGE_SKIPPING) != 0)
 		ereport(ERROR,
 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 				 errmsg("VACUUM option DISABLE_PAGE_SKIPPING cannot be used with FULL")));
 
 	/*
+	 * Sanity check PARALLEL option.
+	 */
+	if ((options.flags & VACOPT_FULL) != 0 &&
+		(options.flags & VACOPT_PARALLEL) != 0)
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("VACUUM option PARALLEL cannnot be used with FULL")));
+
+	/*
 	 * Send info about dead objects to the statistics collector, unless we are
 	 * in autovacuum --- autovacuum.c does this for itself.
 	 */
-	if ((options & VACOPT_VACUUM) && !IsAutoVacuumWorkerProcess())
+	if ((options.flags & VACOPT_VACUUM) && !IsAutoVacuumWorkerProcess())
 		pgstat_vacuum_stat();
 
 	/*
@@ -281,11 +290,11 @@ vacuum(int options, List *relations, VacuumParams *params,
 	 * transaction block, and also in an autovacuum worker, use own
 	 * transactions so we can release locks sooner.
 	 */
-	if (options & VACOPT_VACUUM)
+	if (options.flags & VACOPT_VACUUM)
 		use_own_xacts = true;
 	else
 	{
-		Assert(options & VACOPT_ANALYZE);
+		Assert(options.flags & VACOPT_ANALYZE);
 		if (IsAutoVacuumWorkerProcess())
 			use_own_xacts = true;
 		else if (in_outer_xact)
@@ -335,13 +344,13 @@ vacuum(int options, List *relations, VacuumParams *params,
 		{
 			VacuumRelation *vrel = lfirst_node(VacuumRelation, cur);
 
-			if (options & VACOPT_VACUUM)
+			if (options.flags & VACOPT_VACUUM)
 			{
 				if (!vacuum_rel(vrel->oid, vrel->relation, options, params))
 					continue;
 			}
 
-			if (options & VACOPT_ANALYZE)
+			if (options.flags & VACOPT_ANALYZE)
 			{
 				/*
 				 * If using separate xacts, start one for analyze. Otherwise,
@@ -354,7 +363,7 @@ vacuum(int options, List *relations, VacuumParams *params,
 					PushActiveSnapshot(GetTransactionSnapshot());
 				}
 
-				analyze_rel(vrel->oid, vrel->relation, options, params,
+				analyze_rel(vrel->oid, vrel->relation, options.flags, params,
 							vrel->va_cols, in_outer_xact, vac_strategy);
 
 				if (use_own_xacts)
@@ -390,7 +399,7 @@ vacuum(int options, List *relations, VacuumParams *params,
 		StartTransactionCommand();
 	}
 
-	if ((options & VACOPT_VACUUM) && !IsAutoVacuumWorkerProcess())
+	if ((options.flags & VACOPT_VACUUM) && !IsAutoVacuumWorkerProcess())
 	{
 		/*
 		 * Update pg_database.datfrozenxid, and truncate pg_xact if possible.
@@ -1318,7 +1327,7 @@ vac_truncate_clog(TransactionId frozenXID,
  *		At entry and exit, we are not inside a transaction.
  */
 static bool
-vacuum_rel(Oid relid, RangeVar *relation, int options, VacuumParams *params)
+vacuum_rel(Oid relid, RangeVar *relation, VacuumOptions options, VacuumParams *params)
 {
 	LOCKMODE	lmode;
 	Relation	onerel;
@@ -1339,7 +1348,7 @@ vacuum_rel(Oid relid, RangeVar *relation, int options, VacuumParams *params)
 	 */
 	PushActiveSnapshot(GetTransactionSnapshot());
 
-	if (!(options & VACOPT_FULL))
+	if (!(options.flags & VACOPT_FULL))
 	{
 		/*
 		 * In lazy vacuum, we can set the PROC_IN_VACUUM flag, which lets
@@ -1379,7 +1388,7 @@ vacuum_rel(Oid relid, RangeVar *relation, int options, VacuumParams *params)
 	 * vacuum, but just ShareUpdateExclusiveLock for concurrent vacuum. Either
 	 * way, we can be sure that no other backend is vacuuming the same table.
 	 */
-	lmode = (options & VACOPT_FULL) ? AccessExclusiveLock : ShareUpdateExclusiveLock;
+	lmode = (options.flags & VACOPT_FULL) ? AccessExclusiveLock : ShareUpdateExclusiveLock;
 
 	/*
 	 * Open the relation and get the appropriate lock on it.
@@ -1390,7 +1399,7 @@ vacuum_rel(Oid relid, RangeVar *relation, int options, VacuumParams *params)
 	 * If we've been asked not to wait for the relation lock, acquire it first
 	 * in non-blocking mode, before calling try_relation_open().
 	 */
-	if (!(options & VACOPT_NOWAIT))
+	if (!(options.flags & VACOPT_NOWAIT))
 		onerel = try_relation_open(relid, lmode);
 	else if (ConditionalLockRelationOid(relid, lmode))
 		onerel = try_relation_open(relid, NoLock);
@@ -1510,7 +1519,7 @@ vacuum_rel(Oid relid, RangeVar *relation, int options, VacuumParams *params)
 	 * us to process it.  In VACUUM FULL, though, the toast table is
 	 * automatically rebuilt by cluster_rel so we shouldn't recurse to it.
 	 */
-	if (!(options & VACOPT_SKIPTOAST) && !(options & VACOPT_FULL))
+	if (!(options.flags & VACOPT_SKIPTOAST) && !(options.flags & VACOPT_FULL))
 		toast_relid = onerel->rd_rel->reltoastrelid;
 	else
 		toast_relid = InvalidOid;
@@ -1529,7 +1538,7 @@ vacuum_rel(Oid relid, RangeVar *relation, int options, VacuumParams *params)
 	/*
 	 * Do the actual work --- either FULL or "lazy" vacuum
 	 */
-	if (options & VACOPT_FULL)
+	if (options.flags & VACOPT_FULL)
 	{
 		/* close relation before vacuuming, but hold lock until commit */
 		relation_close(onerel, NoLock);
@@ -1537,7 +1546,7 @@ vacuum_rel(Oid relid, RangeVar *relation, int options, VacuumParams *params)
 
 		/* VACUUM FULL is now a variant of CLUSTER; see cluster.c */
 		cluster_rel(relid, InvalidOid, false,
-					(options & VACOPT_VERBOSE) != 0);
+					(options.flags & VACOPT_VERBOSE) != 0);
 	}
 	else
 		lazy_vacuum_rel(onerel, options, params, vac_strategy);
@@ -1591,8 +1600,7 @@ vacuum_rel(Oid relid, RangeVar *relation, int options, VacuumParams *params)
  * hit dangling index pointers.
  */
 void
-vac_open_indexes(Relation relation, LOCKMODE lockmode,
-				 int *nindexes, Relation **Irel)
+vac_open_indexes(Relation relation, LOCKMODE lockmode, int *nindexes, Relation **Irel)
 {
 	List	   *indexoidlist;
 	ListCell   *indexoidscan;
diff --git a/src/backend/commands/vacuumlazy.c b/src/backend/commands/vacuumlazy.c
index 30b1c08..f9be38e 100644
--- a/src/backend/commands/vacuumlazy.c
+++ b/src/backend/commands/vacuumlazy.c
@@ -22,6 +22,20 @@
  * of index scans performed.  So we don't use maintenance_work_mem memory for
  * the TID array, just enough to hold as many heap tuples as fit on one page.
  *
+ * In PostgreSQL 11, we support a parallel option for lazy vacuum. In parallel
+ * lazy vacuum, multiple vacuum worker processes get blocks in parallel using
+ * parallel heap scan and process them. If a table with indexes the parallel
+ * vacuum workers vacuum the heap and indexes in parallel.  Also, since dead
+ * tuple TIDs is shared with all vacuum processes including the leader process
+ * the parallel vacuum processes have to make two synchronization points in
+ * lazy vacuum processing: when before starting vacuum and when before clearing
+ * dead tuple TIDs. In these two points the leader treats dead tuple TIDs as
+ * an arbiter. The information required by parallel lazy vacuum such as the
+ * statistics of table, parallel heap scan description have to be shared with
+ * all vacuum processes, and table statistics are funneled by the leader
+ * process after finished. Note that dead tuple TIDs need to be shared only
+ * when the table has indexes. For table with no indexes, each parallel worker
+ * processes blocks and vacuum them independently.
  *
  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
@@ -41,8 +55,10 @@
 #include "access/heapam_xlog.h"
 #include "access/htup_details.h"
 #include "access/multixact.h"
+#include "access/relscan.h"
 #include "access/transam.h"
 #include "access/visibilitymap.h"
+#include "access/xact.h"
 #include "access/xlog.h"
 #include "catalog/catalog.h"
 #include "catalog/storage.h"
@@ -54,6 +70,7 @@
 #include "portability/instr_time.h"
 #include "postmaster/autovacuum.h"
 #include "storage/bufmgr.h"
+#include "storage/condition_variable.h"
 #include "storage/freespace.h"
 #include "storage/lmgr.h"
 #include "utils/lsyscache.h"
@@ -62,6 +79,7 @@
 #include "utils/timestamp.h"
 #include "utils/tqual.h"
 
+//#define PLV_TIME
 
 /*
  * Space/time tradeoff parameters: do these need to be user-tunable?
@@ -103,10 +121,81 @@
  */
 #define PREFETCH_SIZE			((BlockNumber) 32)
 
+/* DSM key for parallel lazy vacuum */
+#define VACUUM_KEY_PARALLEL_SCAN	UINT64CONST(0xFFFFFFFFFFF00001)
+#define VACUUM_KEY_VACUUM_STATS		UINT64CONST(0xFFFFFFFFFFF00002)
+#define VACUUM_KEY_INDEX_STATS	    UINT64CONST(0xFFFFFFFFFFF00003)
+#define VACUUM_KEY_DEAD_TUPLE_CTL	UINT64CONST(0xFFFFFFFFFFF00004)
+#define VACUUM_KEY_DEAD_TUPLES		UINT64CONST(0xFFFFFFFFFFF00005)
+#define VACUUM_KEY_PARALLEL_STATE	UINT64CONST(0xFFFFFFFFFFF00006)
+
+/*
+ * see note of lazy_scan_heap_get_nextpage about forcing scanning of
+ * last page
+ */
+#define FORCE_CHECK_PAGE(blk) \
+	(blkno == (blk - 1) && should_attempt_truncation(vacrelstats))
+
+/* Check if given index is assigned to this parallel vacuum worker */
+#define IsAssignedIndex(i, pstate) \
+	(pstate == NULL || \
+	 (((i) % ((LVParallelState *) (pstate))->nworkers -1 ) == ParallelWorkerNumber))
+
+#define IsDeadTupleShared(lvstate) \
+	((LVState *)(lvstate))->parallel_mode && \
+	((LVState *)(lvstate))->vacrelstats->nindexes > 0
+
+/* Vacuum worker state for parallel lazy vacuum */
+#define VACSTATE_SCAN			0x1	/* heap scan phase */
+#define VACSTATE_VACUUM			0x2	/* vacuuming on table and index */
+
+/*
+ * Vacuum relevant options and thresholds we need share with parallel
+ * vacuum workers.
+ */
+typedef struct VacuumInfo
+{
+	int				options;	/* VACUUM options */
+	bool			aggressive;	/* does each worker need to aggressive vacuum? */
+	TransactionId	oldestxmin;
+	TransactionId	freezelimit;
+	MultiXactId		multixactcutoff;
+	int				elevel;
+} VacuumInfo;
+
+/* Struct for index statistics that are used for parallel lazy vacuum */
+typedef struct LVIndStats
+{
+	bool		updated;	/* need to be updated? */
+	BlockNumber	num_pages;
+	BlockNumber	num_tuples;
+} LVIndStats;
+
+/* Struct for parallel lazy vacuum state */
+typedef struct LVParallelState
+{
+	int nworkers;			/* # of process doing vacuum */
+	VacuumInfo	info;
+	int	state;				/* current parallel vacuum status */
+	int	finish_count;
+	ConditionVariable cv;
+	slock_t	mutex;
+} LVParallelState;
+
+/* Struct for control dead tuple TIDs array */
+typedef struct LVDeadTupleCtl
+{
+	int			dt_max;	/* # slots allocated in array */
+	int 		dt_count; /* # of dead tuple */
+
+	/* Used only for parallel lazy vacuum */
+	int			dt_index;
+	slock_t 	mutex;
+} LVDeadTupleCtl;
+
 typedef struct LVRelStats
 {
-	/* hasindex = true means two-pass strategy; false means one-pass */
-	bool		hasindex;
+	int			nindexes; /* > 0 means two-pass strategy; = 0 means one-pass */
 	/* Overall statistics about rel */
 	BlockNumber old_rel_pages;	/* previous value of pg_class.relpages */
 	BlockNumber rel_pages;		/* total number of pages */
@@ -118,19 +207,46 @@ typedef struct LVRelStats
 	double		old_rel_tuples; /* previous value of pg_class.reltuples */
 	double		new_rel_tuples; /* new estimated total # of tuples */
 	double		new_dead_tuples;	/* new estimated total # of dead tuples */
-	BlockNumber pages_removed;
 	double		tuples_deleted;
-	BlockNumber nonempty_pages; /* actually, last nonempty page + 1 */
-	/* List of TIDs of tuples we intend to delete */
-	/* NB: this list is ordered by TID address */
-	int			num_dead_tuples;	/* current # of entries */
-	int			max_dead_tuples;	/* # slots allocated in array */
-	ItemPointer dead_tuples;	/* array of ItemPointerData */
 	int			num_index_scans;
+	BlockNumber pages_removed;
+	BlockNumber nonempty_pages; /* actually, last nonempty page + 1 */
 	TransactionId latestRemovedXid;
 	bool		lock_waiter_detected;
 } LVRelStats;
 
+/* Struct for lazy vacuum execution */
+typedef struct LVState
+{
+	bool		parallel_mode;
+	LVRelStats *vacrelstats;
+	/*
+	 * Used when both parallel and non-parallel lazy vacuum, but in parallel
+	 * lazy vacuum and table with index, dtctl points to a dynamic shared memory
+	 * and controlled by dtctl struct.
+	 */
+	LVDeadTupleCtl	*dtctl;
+	ItemPointer	deadtuples;
+
+	/* Used only for parallel lazy vacuum */
+	ParallelContext *pcxt;
+	LVParallelState *pstate;
+	ParallelHeapScanDesc pscan;
+	LVIndStats *indstats;
+} LVState;
+
+/*
+ * Scan description data for lazy vacuum. In parallel lazy vacuum,
+ * we use only heapscan instead.
+ */
+typedef struct LVScanDescData
+{
+	BlockNumber lv_cblock;					/* current scanning block number */
+	BlockNumber lv_next_unskippable_block;	/* next block number we cannot skip */
+	BlockNumber lv_nblocks;					/* the number blocks of relation */
+	HeapScanDesc heapscan;					/* field for parallel lazy vacuum */
+} LVScanDescData;
+typedef struct LVScanDescData *LVScanDesc;
 
 /* A few variables that don't seem worth passing around as parameters */
 static int	elevel = -1;
@@ -141,32 +257,47 @@ static MultiXactId MultiXactCutoff;
 
 static BufferAccessStrategy vac_strategy;
 
-
-/* non-export function prototypes */
-static void lazy_scan_heap(Relation onerel, int options,
-			   LVRelStats *vacrelstats, Relation *Irel, int nindexes,
-			   bool aggressive);
-static void lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats);
+/* nonf-export function prototypes */
+static void lazy_vacuum_heap(Relation onerel, LVState *lvstate);
 static bool lazy_check_needs_freeze(Buffer buf, bool *hastup);
 static void lazy_vacuum_index(Relation indrel,
 				  IndexBulkDeleteResult **stats,
-				  LVRelStats *vacrelstats);
-static void lazy_cleanup_index(Relation indrel,
-				   IndexBulkDeleteResult *stats,
-				   LVRelStats *vacrelstats);
+							  LVState *lvstate);
+static void lazy_cleanup_index(Relation indrel, IndexBulkDeleteResult *stats,
+							   LVRelStats *vacrelstats, LVIndStats *indstat);
 static int lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer,
-				 int tupindex, LVRelStats *vacrelstats, Buffer *vmbuffer);
+				 int tupindex, LVState *lvstate, Buffer *vmbuffer);
 static bool should_attempt_truncation(LVRelStats *vacrelstats);
 static void lazy_truncate_heap(Relation onerel, LVRelStats *vacrelstats);
 static BlockNumber count_nondeletable_pages(Relation onerel,
 						 LVRelStats *vacrelstats);
-static void lazy_space_alloc(LVRelStats *vacrelstats, BlockNumber relblocks);
-static void lazy_record_dead_tuple(LVRelStats *vacrelstats,
-					   ItemPointer itemptr);
+static void lazy_space_alloc(LVState *lvstate, BlockNumber relblocks);
+static void lazy_record_dead_tuple(LVState *state, ItemPointer itemptr);
 static bool lazy_tid_reaped(ItemPointer itemptr, void *state);
 static int	vac_cmp_itemptr(const void *left, const void *right);
 static bool heap_page_is_all_visible(Relation rel, Buffer buf,
 						 TransactionId *visibility_cutoff_xid, bool *all_frozen);
+static void do_lazy_scan_heap(LVState *lvstate, Relation onerel, Relation *Irels,
+							  int nindexes, int options, bool aggressive);
+static void lazy_scan_heap(Relation rel, LVState *lvstate, VacuumOptions options,
+						   bool aggressive);
+
+/* function prototypes for parallel vacuum */
+static void lazy_gather_vacuum_stats(ParallelContext *pxct,
+									 LVRelStats *valrelstats);
+static void lazy_estimate_dsm(ParallelContext *pcxt, LVRelStats *vacrelstats);
+static void lazy_initialize_dsm(ParallelContext *pcxt, Relation onrel,
+								LVState *lvstate, int options, bool aggressive);
+static LVState *lazy_initialize_worker(shm_toc *toc);
+static LVScanDesc lv_beginscan(Relation onerel, ParallelHeapScanDesc pscan);
+static void lv_endscan(LVScanDesc lvscan);
+static BlockNumber lazy_scan_get_nextpage(Relation onerel, LVState *lvstate,
+											   LVScanDesc lvscan,
+											   bool *all_visible_according_to_vm,
+											   Buffer *vmbuffer, int options, bool aggressive);
+static void lazy_prepare_vacuum(LVState *lvstate);
+static void lazy_end_vacuum(LVState *lvstate);
+static long lazy_get_max_dead_tuples(LVRelStats *vacrelstats);
 
 
 /*
@@ -179,12 +310,11 @@ static bool heap_page_is_all_visible(Relation rel, Buffer buf,
  *		and locked the relation.
  */
 void
-lazy_vacuum_rel(Relation onerel, int options, VacuumParams *params,
+lazy_vacuum_rel(Relation onerel, VacuumOptions options, VacuumParams *params,
 				BufferAccessStrategy bstrategy)
 {
-	LVRelStats *vacrelstats;
-	Relation   *Irel;
-	int			nindexes;
+	LVState		*lvstate;
+	LVRelStats	*vacrelstats;
 	PGRUsage	ru0;
 	TimestampTz starttime = 0;
 	long		secs;
@@ -211,7 +341,7 @@ lazy_vacuum_rel(Relation onerel, int options, VacuumParams *params,
 		starttime = GetCurrentTimestamp();
 	}
 
-	if (options & VACOPT_VERBOSE)
+	if (options.flags & VACOPT_VERBOSE)
 		elevel = INFO;
 	else
 		elevel = DEBUG2;
@@ -239,10 +369,12 @@ lazy_vacuum_rel(Relation onerel, int options, VacuumParams *params,
 											   xidFullScanLimit);
 	aggressive |= MultiXactIdPrecedesOrEquals(onerel->rd_rel->relminmxid,
 											  mxactFullScanLimit);
-	if (options & VACOPT_DISABLE_PAGE_SKIPPING)
+	if (options.flags & VACOPT_DISABLE_PAGE_SKIPPING)
 		aggressive = true;
 
+	lvstate = (LVState *) palloc0(sizeof(LVState));
 	vacrelstats = (LVRelStats *) palloc0(sizeof(LVRelStats));
+	lvstate->vacrelstats = vacrelstats;
 
 	vacrelstats->old_rel_pages = onerel->rd_rel->relpages;
 	vacrelstats->old_rel_tuples = onerel->rd_rel->reltuples;
@@ -250,15 +382,8 @@ lazy_vacuum_rel(Relation onerel, int options, VacuumParams *params,
 	vacrelstats->pages_removed = 0;
 	vacrelstats->lock_waiter_detected = false;
 
-	/* Open all indexes of the relation */
-	vac_open_indexes(onerel, RowExclusiveLock, &nindexes, &Irel);
-	vacrelstats->hasindex = (nindexes > 0);
-
 	/* Do the vacuuming */
-	lazy_scan_heap(onerel, options, vacrelstats, Irel, nindexes, aggressive);
-
-	/* Done with indexes */
-	vac_close_indexes(nindexes, Irel, NoLock);
+	lazy_scan_heap(onerel, lvstate, options, aggressive);
 
 	/*
 	 * Compute whether we actually scanned the all unfrozen pages. If we did,
@@ -267,7 +392,7 @@ lazy_vacuum_rel(Relation onerel, int options, VacuumParams *params,
 	 * NB: We need to check this before truncating the relation, because that
 	 * will change ->rel_pages.
 	 */
-	if ((vacrelstats->scanned_pages + vacrelstats->frozenskipped_pages)
+	if ((lvstate->vacrelstats->scanned_pages + vacrelstats->frozenskipped_pages)
 		< vacrelstats->rel_pages)
 	{
 		Assert(!aggressive);
@@ -329,7 +454,7 @@ lazy_vacuum_rel(Relation onerel, int options, VacuumParams *params,
 						new_rel_pages,
 						new_rel_tuples,
 						new_rel_allvisible,
-						vacrelstats->hasindex,
+						(vacrelstats->nindexes != 0),
 						new_frozen_xid,
 						new_min_multi,
 						false);
@@ -439,28 +564,166 @@ vacuum_log_cleanup_info(Relation rel, LVRelStats *vacrelstats)
 }
 
 /*
- *	lazy_scan_heap() -- scan an open heap relation
+ * If the number of workers is specified more than 0, we enter the parallel lazy
+ * vacuum mode. In parallel lazy vacuum mode, we initialize a dynamic shared memory
+ * and launch parallel vacuum workers. The launcher process also vacuums the table
+ * after launched and then waits for the all vacuum workers to finish. After all vacuum
+ * workers finished we gather the vacuum statistics of table and indexes, and update
+ * them.
+ */
+static void
+lazy_scan_heap(Relation onerel, LVState *lvstate, VacuumOptions options,
+			   bool aggressive)
+{
+	ParallelContext	*pcxt;
+	LVRelStats	*vacrelstats = lvstate->vacrelstats;
+	Relation	*Irel;
+	int			nindexes;
+
+	lvstate->parallel_mode = options.nworkers > 0;
+
+	/* Open indexes */
+	vac_open_indexes(onerel, RowExclusiveLock, &nindexes, &Irel);
+	vacrelstats->nindexes = nindexes;
+
+	if (lvstate->parallel_mode)
+	{
+		EnterParallelMode();
+
+		/* Create parallel context and initialize it */
+		pcxt = CreateParallelContext("postgres", "LazyVacuumWorkerMain",
+									 options.nworkers);
+		lvstate->pcxt = pcxt;
+
+		/* Estimate DSM size for parallel vacuum */
+		lazy_estimate_dsm(pcxt, lvstate->vacrelstats);
+
+		/* Initialize DSM for parallel vacuum */
+		InitializeParallelDSM(pcxt);
+		lazy_initialize_dsm(pcxt, onerel, lvstate, options.flags, aggressive);
+
+		/* Launch workers */
+		LaunchParallelWorkers(pcxt);
+	}
+
+	do_lazy_scan_heap(lvstate, onerel, Irel, nindexes, options.flags, aggressive);
+
+	/*
+	 * We can update relation statistics such as scanned page after gathered
+	 * statistics from all workers. Also, in parallel mode since we cannot update
+	 * index statistics at the same time the leader process have to do it.
+	 *
+	 * XXX : If we allows workers to update statistics tuples at the same time
+	 * the updating index statistics can be done in lazy_cleanup_index().
+	 */
+	if (lvstate->parallel_mode)
+	{
+		int i;
+		LVIndStats *indstats = palloc(sizeof(LVIndStats) * lvstate->vacrelstats->nindexes);
+
+		/* Wait for workers finished vacuum */
+		WaitForParallelWorkersToFinish(pcxt);
+
+		/* Gather the result of vacuum statistics from all workers */
+		lazy_gather_vacuum_stats(pcxt, vacrelstats);
+
+		/* Now we can compute the new value for pg_class.reltuples */
+		vacrelstats->new_rel_tuples = vac_estimate_reltuples(onerel, false,
+															 vacrelstats->rel_pages,
+															 vacrelstats->scanned_pages,
+															 vacrelstats->scanned_tuples);
+
+		/* Copy new index stats to local memory */
+		memcpy(indstats, lvstate->indstats, sizeof(LVIndStats) * vacrelstats->nindexes);
+
+		DestroyParallelContext(pcxt);
+		ExitParallelMode();
+
+		/* After exit parallel mode, update index statistics */
+		for (i = 0; i < vacrelstats->nindexes; i++)
+		{
+			Relation	ind = Irel[i];
+			LVIndStats *indstat = (LVIndStats *) &(indstats[i]);
+
+			if (indstat->updated)
+			   vac_update_relstats(ind,
+								   indstat->num_pages,
+								   indstat->num_tuples,
+								   0,
+								   false,
+								   InvalidTransactionId,
+								   InvalidMultiXactId,
+								   false);
+		}
+	}
+
+	vac_close_indexes(nindexes, Irel, RowExclusiveLock);
+}
+
+/*
+ * Entry point of parallel vacuum worker.
+ */
+void
+LazyVacuumWorkerMain(dsm_segment *seg, shm_toc *toc)
+{
+	LVState		*lvstate;
+	Relation rel;
+	Relation *indrel;
+	int nindexes_worker;
+
+	/* Look up dynamic shared memory and initialize */
+	lvstate = lazy_initialize_worker(toc);
+
+	Assert(lvstate != NULL);
+
+	rel = relation_open(lvstate->pscan->phs_relid, ShareUpdateExclusiveLock);
+
+	/* Open all indexes */
+	vac_open_indexes(rel, RowExclusiveLock, &nindexes_worker,
+					 &indrel);
+
+	/* Do lazy vacuum */
+	do_lazy_scan_heap(lvstate, rel, indrel, lvstate->vacrelstats->nindexes,
+					  lvstate->pstate->info.options, lvstate->pstate->info.aggressive);
+
+	vac_close_indexes(lvstate->vacrelstats->nindexes, indrel, RowExclusiveLock);
+	heap_close(rel, ShareUpdateExclusiveLock);
+}
+
+/*
+ *	do_lazy_scan_heap() -- scan an open heap relation
  *
  *		This routine prunes each page in the heap, which will among other
  *		things truncate dead tuples to dead line pointers, defragment the
- *		page, and set commit status bits (see heap_page_prune).  It also builds
+ *		page, and set commit status bits (see heap_page_prune).  It also uses
  *		lists of dead tuples and pages with free space, calculates statistics
  *		on the number of live tuples in the heap, and marks pages as
  *		all-visible if appropriate.  When done, or when we run low on space for
- *		dead-tuple TIDs, invoke vacuuming of indexes and call lazy_vacuum_heap
- *		to reclaim dead line pointers.
+ *		dead-tuple TIDs, invoke vacuuming of assigned indexes and call lazy_vacuum_heap
+ *		to reclaim dead line pointers. In parallel vacuum, we need to synchronize
+ *		at where scanning heap finished and vacuuming heap finished. The vacuum
+ *		worker reached to that point first need to wait for other vacuum workers
+ *		reached to the same point.
+ *
+ *		In parallel lazy scan, we get next page number using parallel heap scan.
+ *		Since the dead tuple TIDs are shared with all vacuum workers, we have to
+ *		wait for all other workers to reach to the same points where before starting
+ *		reclaiming dead tuple TIDs and before clearing dead tuple TIDs information
+ *		in dynamic shared memory.
  *
  *		If there are no indexes then we can reclaim line pointers on the fly;
  *		dead line pointers need only be retained until all index pointers that
  *		reference them have been killed.
  */
 static void
-lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
-			   Relation *Irel, int nindexes, bool aggressive)
+do_lazy_scan_heap(LVState *lvstate, Relation onerel, Relation *Irel,
+				  int nindexes, int options, bool aggressive)
 {
-	BlockNumber nblocks,
-				blkno;
+	LVRelStats *vacrelstats = lvstate->vacrelstats;
+	BlockNumber blkno;
+	BlockNumber nblocks;
 	HeapTupleData tuple;
+	LVScanDesc lvscan;
 	char	   *relname;
 	BlockNumber empty_pages,
 				vacuumed_pages;
@@ -471,11 +734,15 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 	IndexBulkDeleteResult **indstats;
 	int			i;
 	PGRUsage	ru0;
+#ifdef PLV_TIME
+	PGRUsage	ru_scan;
+	PGRUsage	ru_vacuum;
+#endif
 	Buffer		vmbuffer = InvalidBuffer;
-	BlockNumber next_unskippable_block;
-	bool		skipping_blocks;
 	xl_heap_freeze_tuple *frozen;
 	StringInfoData buf;
+	bool		all_visible_according_to_vm = false;
+
 	const int	initprog_index[] = {
 		PROGRESS_VACUUM_PHASE,
 		PROGRESS_VACUUM_TOTAL_HEAP_BLKS,
@@ -504,89 +771,24 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 	vacrelstats->nonempty_pages = 0;
 	vacrelstats->latestRemovedXid = InvalidTransactionId;
 
-	lazy_space_alloc(vacrelstats, nblocks);
+	lazy_space_alloc(lvstate, nblocks);
 	frozen = palloc(sizeof(xl_heap_freeze_tuple) * MaxHeapTuplesPerPage);
 
+	/* Begin heap scan for vacuum */
+	lvscan = lv_beginscan(onerel, lvstate->pscan);
+
 	/* Report that we're scanning the heap, advertising total # of blocks */
 	initprog_val[0] = PROGRESS_VACUUM_PHASE_SCAN_HEAP;
 	initprog_val[1] = nblocks;
-	initprog_val[2] = vacrelstats->max_dead_tuples;
+	initprog_val[2] = lvstate->dtctl->dt_max;
 	pgstat_progress_update_multi_param(3, initprog_index, initprog_val);
 
-	/*
-	 * Except when aggressive is set, we want to skip pages that are
-	 * all-visible according to the visibility map, but only when we can skip
-	 * at least SKIP_PAGES_THRESHOLD consecutive pages.  Since we're reading
-	 * sequentially, the OS should be doing readahead for us, so there's no
-	 * gain in skipping a page now and then; that's likely to disable
-	 * readahead and so be counterproductive. Also, skipping even a single
-	 * page means that we can't update relfrozenxid, so we only want to do it
-	 * if we can skip a goodly number of pages.
-	 *
-	 * When aggressive is set, we can't skip pages just because they are
-	 * all-visible, but we can still skip pages that are all-frozen, since
-	 * such pages do not need freezing and do not affect the value that we can
-	 * safely set for relfrozenxid or relminmxid.
-	 *
-	 * Before entering the main loop, establish the invariant that
-	 * next_unskippable_block is the next block number >= blkno that we can't
-	 * skip based on the visibility map, either all-visible for a regular scan
-	 * or all-frozen for an aggressive scan.  We set it to nblocks if there's
-	 * no such block.  We also set up the skipping_blocks flag correctly at
-	 * this stage.
-	 *
-	 * Note: The value returned by visibilitymap_get_status could be slightly
-	 * out-of-date, since we make this test before reading the corresponding
-	 * heap page or locking the buffer.  This is OK.  If we mistakenly think
-	 * that the page is all-visible or all-frozen when in fact the flag's just
-	 * been cleared, we might fail to vacuum the page.  It's easy to see that
-	 * skipping a page when aggressive is not set is not a very big deal; we
-	 * might leave some dead tuples lying around, but the next vacuum will
-	 * find them.  But even when aggressive *is* set, it's still OK if we miss
-	 * a page whose all-frozen marking has just been cleared.  Any new XIDs
-	 * just added to that page are necessarily newer than the GlobalXmin we
-	 * computed, so they'll have no effect on the value to which we can safely
-	 * set relfrozenxid.  A similar argument applies for MXIDs and relminmxid.
-	 *
-	 * We will scan the table's last page, at least to the extent of
-	 * determining whether it has tuples or not, even if it should be skipped
-	 * according to the above rules; except when we've already determined that
-	 * it's not worth trying to truncate the table.  This avoids having
-	 * lazy_truncate_heap() take access-exclusive lock on the table to attempt
-	 * a truncation that just fails immediately because there are tuples in
-	 * the last page.  This is worth avoiding mainly because such a lock must
-	 * be replayed on any hot standby, where it can be disruptive.
-	 */
-	next_unskippable_block = 0;
-	if ((options & VACOPT_DISABLE_PAGE_SKIPPING) == 0)
-	{
-		while (next_unskippable_block < nblocks)
-		{
-			uint8		vmstatus;
-
-			vmstatus = visibilitymap_get_status(onerel, next_unskippable_block,
-												&vmbuffer);
-			if (aggressive)
-			{
-				if ((vmstatus & VISIBILITYMAP_ALL_FROZEN) == 0)
-					break;
-			}
-			else
-			{
-				if ((vmstatus & VISIBILITYMAP_ALL_VISIBLE) == 0)
-					break;
-			}
-			vacuum_delay_point();
-			next_unskippable_block++;
-		}
-	}
-
-	if (next_unskippable_block >= SKIP_PAGES_THRESHOLD)
-		skipping_blocks = true;
-	else
-		skipping_blocks = false;
-
-	for (blkno = 0; blkno < nblocks; blkno++)
+#ifdef PLV_TIME
+	pg_rusage_init(&ru_scan);
+#endif
+	while((blkno = lazy_scan_get_nextpage(onerel, lvstate, lvscan,
+										  &all_visible_according_to_vm,
+										  &vmbuffer, options, aggressive)) != InvalidBlockNumber)
 	{
 		Buffer		buf;
 		Page		page;
@@ -597,99 +799,31 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 		int			prev_dead_count;
 		int			nfrozen;
 		Size		freespace;
-		bool		all_visible_according_to_vm = false;
 		bool		all_visible;
 		bool		all_frozen = true;	/* provided all_visible is also true */
 		bool		has_dead_tuples;
 		TransactionId visibility_cutoff_xid = InvalidTransactionId;
-
-		/* see note above about forcing scanning of last page */
-#define FORCE_CHECK_PAGE() \
-		(blkno == nblocks - 1 && should_attempt_truncation(vacrelstats))
+		int			dtmax;
+		int			dtcount;
 
 		pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_SCANNED, blkno);
 
-		if (blkno == next_unskippable_block)
-		{
-			/* Time to advance next_unskippable_block */
-			next_unskippable_block++;
-			if ((options & VACOPT_DISABLE_PAGE_SKIPPING) == 0)
-			{
-				while (next_unskippable_block < nblocks)
-				{
-					uint8		vmskipflags;
-
-					vmskipflags = visibilitymap_get_status(onerel,
-														   next_unskippable_block,
-														   &vmbuffer);
-					if (aggressive)
-					{
-						if ((vmskipflags & VISIBILITYMAP_ALL_FROZEN) == 0)
-							break;
-					}
-					else
-					{
-						if ((vmskipflags & VISIBILITYMAP_ALL_VISIBLE) == 0)
-							break;
-					}
-					vacuum_delay_point();
-					next_unskippable_block++;
-				}
-			}
-
-			/*
-			 * We know we can't skip the current block.  But set up
-			 * skipping_blocks to do the right thing at the following blocks.
-			 */
-			if (next_unskippable_block - blkno > SKIP_PAGES_THRESHOLD)
-				skipping_blocks = true;
-			else
-				skipping_blocks = false;
-
-			/*
-			 * Normally, the fact that we can't skip this block must mean that
-			 * it's not all-visible.  But in an aggressive vacuum we know only
-			 * that it's not all-frozen, so it might still be all-visible.
-			 */
-			if (aggressive && VM_ALL_VISIBLE(onerel, blkno, &vmbuffer))
-				all_visible_according_to_vm = true;
-		}
-		else
-		{
-			/*
-			 * The current block is potentially skippable; if we've seen a
-			 * long enough run of skippable blocks to justify skipping it, and
-			 * we're not forced to check it, then go ahead and skip.
-			 * Otherwise, the page must be at least all-visible if not
-			 * all-frozen, so we can set all_visible_according_to_vm = true.
-			 */
-			if (skipping_blocks && !FORCE_CHECK_PAGE())
-			{
-				/*
-				 * Tricky, tricky.  If this is in aggressive vacuum, the page
-				 * must have been all-frozen at the time we checked whether it
-				 * was skippable, but it might not be any more.  We must be
-				 * careful to count it as a skipped all-frozen page in that
-				 * case, or else we'll think we can't update relfrozenxid and
-				 * relminmxid.  If it's not an aggressive vacuum, we don't
-				 * know whether it was all-frozen, so we have to recheck; but
-				 * in this case an approximate answer is OK.
-				 */
-				if (aggressive || VM_ALL_FROZEN(onerel, blkno, &vmbuffer))
-					vacrelstats->frozenskipped_pages++;
-				continue;
-			}
-			all_visible_according_to_vm = true;
-		}
-
 		vacuum_delay_point();
 
 		/*
 		 * If we are close to overrunning the available space for dead-tuple
 		 * TIDs, pause and do a cycle of vacuuming before we tackle this page.
 		 */
-		if ((vacrelstats->max_dead_tuples - vacrelstats->num_dead_tuples) < MaxHeapTuplesPerPage &&
-			vacrelstats->num_dead_tuples > 0)
+		if (IsDeadTupleShared(lvstate))
+			SpinLockAcquire(&lvstate->dtctl->mutex);
+
+		dtmax = lvstate->dtctl->dt_max;
+		dtcount = lvstate->dtctl->dt_count;
+
+		if (IsDeadTupleShared(lvstate))
+			SpinLockRelease(&lvstate->dtctl->mutex);
+
+		if (((dtmax - dtcount) < MaxHeapTuplesPerPage) && dtcount > 0)
 		{
 			const int	hvp_index[] = {
 				PROGRESS_VACUUM_PHASE,
@@ -697,6 +831,19 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 			};
 			int64		hvp_val[2];
 
+#ifdef PLV_TIME
+			elog(WARNING, "%d Scan %s", ParallelWorkerNumber, pg_rusage_show(&ru_scan));
+#endif
+			/*
+			 * Here we're about to vacuum the table and indexes actually. Before
+			 * entering vacuum state, we have to wait for other vacuum worker to
+			 * reach here.
+			 */
+			lazy_prepare_vacuum(lvstate);
+#ifdef PLV_TIME
+			pg_rusage_init(&ru_vacuum);
+#endif
+
 			/*
 			 * Before beginning index vacuuming, we release any pin we may
 			 * hold on the visibility map page.  This isn't necessary for
@@ -716,11 +863,12 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 			pgstat_progress_update_param(PROGRESS_VACUUM_PHASE,
 										 PROGRESS_VACUUM_PHASE_VACUUM_INDEX);
 
-			/* Remove index entries */
+			/* Remove assigned index entries */
 			for (i = 0; i < nindexes; i++)
-				lazy_vacuum_index(Irel[i],
-								  &indstats[i],
-								  vacrelstats);
+			{
+				if (IsAssignedIndex(i, lvstate->pstate))
+					lazy_vacuum_index(Irel[i], &indstats[i], lvstate);
+			}
 
 			/*
 			 * Report that we are now vacuuming the heap.  We also increase
@@ -733,19 +881,28 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 			pgstat_progress_update_multi_param(2, hvp_index, hvp_val);
 
 			/* Remove tuples from heap */
-			lazy_vacuum_heap(onerel, vacrelstats);
+			lazy_vacuum_heap(onerel, lvstate);
 
+#ifdef PLV_TIME
+			elog(WARNING, "%d VACUUM : %s", ParallelWorkerNumber, pg_rusage_show(&ru_vacuum));
+#endif
 			/*
-			 * Forget the now-vacuumed tuples, and press on, but be careful
-			 * not to reset latestRemovedXid since we want that value to be
-			 * valid.
+			 * Here we've done vacuum on the heap and index and we are going
+			 * to begin the next round scan on heap. Wait until all vacuum worker
+			 * finished vacuum. After all vacuum workers finished, forget the
+			 * now-vacuumed tuples, and press on, but be careful not to reset
+			 * latestRemoveXid since we want that value to be valid.
 			 */
-			vacrelstats->num_dead_tuples = 0;
-			vacrelstats->num_index_scans++;
+			lazy_end_vacuum(lvstate);
+#ifdef PLV_TIME
+			pg_rusage_init(&ru_scan);
+#endif
 
 			/* Report that we are once again scanning the heap */
 			pgstat_progress_update_param(PROGRESS_VACUUM_PHASE,
 										 PROGRESS_VACUUM_PHASE_SCAN_HEAP);
+
+			vacrelstats->num_index_scans++;
 		}
 
 		/*
@@ -771,7 +928,7 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 			 * it's OK to skip vacuuming pages we get a lock conflict on. They
 			 * will be dealt with in some future vacuum.
 			 */
-			if (!aggressive && !FORCE_CHECK_PAGE())
+			if (!aggressive && !FORCE_CHECK_PAGE(blkno))
 			{
 				ReleaseBuffer(buf);
 				vacrelstats->pinskipped_pages++;
@@ -923,7 +1080,7 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 		has_dead_tuples = false;
 		nfrozen = 0;
 		hastup = false;
-		prev_dead_count = vacrelstats->num_dead_tuples;
+		prev_dead_count = lvstate->dtctl->dt_count;
 		maxoff = PageGetMaxOffsetNumber(page);
 
 		/*
@@ -962,7 +1119,7 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 			 */
 			if (ItemIdIsDead(itemid))
 			{
-				lazy_record_dead_tuple(vacrelstats, &(tuple.t_self));
+				lazy_record_dead_tuple(lvstate, &(tuple.t_self));
 				all_visible = false;
 				continue;
 			}
@@ -1067,7 +1224,7 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 
 			if (tupgone)
 			{
-				lazy_record_dead_tuple(vacrelstats, &(tuple.t_self));
+				lazy_record_dead_tuple(lvstate, &(tuple.t_self));
 				HeapTupleHeaderAdvanceLatestRemovedXid(tuple.t_data,
 													   &vacrelstats->latestRemovedXid);
 				tups_vacuumed += 1;
@@ -1132,13 +1289,13 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 
 		/*
 		 * If there are no indexes then we can vacuum the page right now
-		 * instead of doing a second scan.
+		 * instead of doing a second scan. Because each parallel worker uses its
+		 * own dead tuple area they can vacuum independently.
 		 */
-		if (nindexes == 0 &&
-			vacrelstats->num_dead_tuples > 0)
+		if (Irel == NULL && lvstate->dtctl->dt_count > 0)
 		{
 			/* Remove tuples from heap */
-			lazy_vacuum_page(onerel, blkno, buf, 0, vacrelstats, &vmbuffer);
+			lazy_vacuum_page(onerel, blkno, buf, 0, lvstate, &vmbuffer);
 			has_dead_tuples = false;
 
 			/*
@@ -1146,7 +1303,8 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 			 * not to reset latestRemovedXid since we want that value to be
 			 * valid.
 			 */
-			vacrelstats->num_dead_tuples = 0;
+			lvstate->dtctl->dt_count = 0;
+
 			vacuumed_pages++;
 		}
 
@@ -1249,7 +1407,7 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 		 * page, so remember its free space as-is.  (This path will always be
 		 * taken if there are no indexes.)
 		 */
-		if (vacrelstats->num_dead_tuples == prev_dead_count)
+		if (lvstate->dtctl->dt_count == prev_dead_count)
 			RecordPageWithFreeSpace(onerel, blkno, freespace);
 	}
 
@@ -1264,10 +1422,11 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 	vacrelstats->new_dead_tuples = nkeep;
 
 	/* now we can compute the new value for pg_class.reltuples */
-	vacrelstats->new_rel_tuples = vac_estimate_reltuples(onerel, false,
-														 nblocks,
-														 vacrelstats->tupcount_pages,
-														 num_tuples);
+	if (!lvstate->parallel_mode)
+		vacrelstats->new_rel_tuples = vac_estimate_reltuples(onerel, false,
+															 nblocks,
+															 vacrelstats->tupcount_pages,
+															 num_tuples);
 
 	/*
 	 * Release any remaining pin on visibility map page.
@@ -1280,13 +1439,25 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 
 	/* If any tuples need to be deleted, perform final vacuum cycle */
 	/* XXX put a threshold on min number of tuples here? */
-	if (vacrelstats->num_dead_tuples > 0)
+	if (lvstate->dtctl->dt_count > 0)
 	{
 		const int	hvp_index[] = {
 			PROGRESS_VACUUM_PHASE,
 			PROGRESS_VACUUM_NUM_INDEX_VACUUMS
 		};
 		int64		hvp_val[2];
+#ifdef PLV_TIME
+		elog(WARNING, "%d Scan %s", ParallelWorkerNumber, pg_rusage_show(&ru_scan));
+#endif
+		/*
+		 * Here we're about to vacuum the table and indexes actually. Before
+		 * entering vacuum state, we have to wait for other vacuum worker to
+		 * reach here.
+		 */
+		lazy_prepare_vacuum(lvstate);
+#ifdef PLV_TIME
+		pg_rusage_init(&ru_vacuum);
+#endif
 
 		/* Log cleanup info before we touch indexes */
 		vacuum_log_cleanup_info(onerel, vacrelstats);
@@ -1297,9 +1468,10 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 
 		/* Remove index entries */
 		for (i = 0; i < nindexes; i++)
-			lazy_vacuum_index(Irel[i],
-							  &indstats[i],
-							  vacrelstats);
+		{
+			if (IsAssignedIndex(i, lvstate->pstate))
+				lazy_vacuum_index(Irel[i], &indstats[i], lvstate);
+		}
 
 		/* Report that we are now vacuuming the heap */
 		hvp_val[0] = PROGRESS_VACUUM_PHASE_VACUUM_HEAP;
@@ -1309,8 +1481,13 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 		/* Remove tuples from heap */
 		pgstat_progress_update_param(PROGRESS_VACUUM_PHASE,
 									 PROGRESS_VACUUM_PHASE_VACUUM_HEAP);
-		lazy_vacuum_heap(onerel, vacrelstats);
+
+		lazy_vacuum_heap(onerel, lvstate);
+
 		vacrelstats->num_index_scans++;
+#ifdef PLV_TIME
+		elog(WARNING, "%d VACUUM : %s", ParallelWorkerNumber, pg_rusage_show(&ru_vacuum));
+#endif
 	}
 
 	/* report all blocks vacuumed; and that we're cleaning up */
@@ -1320,7 +1497,11 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 
 	/* Do post-vacuum cleanup and statistics update for each index */
 	for (i = 0; i < nindexes; i++)
-		lazy_cleanup_index(Irel[i], indstats[i], vacrelstats);
+	{
+		if (IsAssignedIndex(i, lvstate->pstate))
+			lazy_cleanup_index(Irel[i], indstats[i], lvstate->vacrelstats,
+							   lvstate->parallel_mode ? &(lvstate->indstats[i]) : NULL);
+	}
 
 	/* If no indexes, make log report that lazy_vacuum_heap would've made */
 	if (vacuumed_pages)
@@ -1329,12 +1510,16 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 						RelationGetRelationName(onerel),
 						tups_vacuumed, vacuumed_pages)));
 
+	lv_endscan(lvscan);
+
 	/*
 	 * This is pretty messy, but we split it up so that we can skip emitting
 	 * individual parts of the message when not applicable.
 	 */
 	initStringInfo(&buf);
 	appendStringInfo(&buf,
+					 "------- worker %d TOTAL stats -------\n", ParallelWorkerNumber);
+	appendStringInfo(&buf,
 					 _("%.0f dead row versions cannot be removed yet, oldest xmin: %u\n"),
 					 nkeep, OldestXmin);
 	appendStringInfo(&buf, _("There were %.0f unused item pointers.\n"),
@@ -1362,6 +1547,35 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 	pfree(buf.data);
 }
 
+/*
+ * gather_vacuum_stats() -- Gather vacuum statistics from workers
+ */
+static void
+lazy_gather_vacuum_stats(ParallelContext *pcxt, LVRelStats *vacrelstats)
+{
+	int	i;
+	LVRelStats *lvstats_list;
+
+	lvstats_list = (LVRelStats *) shm_toc_lookup(pcxt->toc, VACUUM_KEY_VACUUM_STATS, false);
+
+	/* Gather each worker stats */
+	for (i = 0; i < pcxt->nworkers_launched; i++)
+	{
+		LVRelStats *wstats = (LVRelStats*) ((char *) lvstats_list + sizeof(LVRelStats) * i);
+
+		vacrelstats->scanned_pages += wstats->scanned_pages;
+		vacrelstats->pinskipped_pages += wstats->pinskipped_pages;
+		vacrelstats->frozenskipped_pages += wstats->frozenskipped_pages;
+		vacrelstats->scanned_tuples += wstats->scanned_tuples;
+		vacrelstats->new_dead_tuples += wstats->new_dead_tuples;
+		vacrelstats->pages_removed += wstats->pages_removed;
+		vacrelstats->tuples_deleted += wstats->tuples_deleted;
+		vacrelstats->nonempty_pages += wstats->nonempty_pages;
+	}
+
+	/* all vacuum workers have same value of rel_pages */
+	vacrelstats->rel_pages = lvstats_list->rel_pages;
+}
 
 /*
  *	lazy_vacuum_heap() -- second pass over the heap
@@ -1375,18 +1589,24 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
  * process index entry removal in batches as large as possible.
  */
 static void
-lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats)
+lazy_vacuum_heap(Relation onerel, LVState *lvstate)
 {
 	int			tupindex;
 	int			npages;
 	PGRUsage	ru0;
+	BlockNumber	prev_tblk;
 	Buffer		vmbuffer = InvalidBuffer;
+	ItemPointer	deadtuples = lvstate->deadtuples;
+	LVDeadTupleCtl *dtctl = lvstate->dtctl;
+	BlockNumber	ntuples = 0;
+	StringInfoData	buf;
 
 	pg_rusage_init(&ru0);
 	npages = 0;
 
 	tupindex = 0;
-	while (tupindex < vacrelstats->num_dead_tuples)
+
+	while (tupindex < dtctl->dt_count)
 	{
 		BlockNumber tblk;
 		Buffer		buf;
@@ -1395,7 +1615,40 @@ lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats)
 
 		vacuum_delay_point();
 
-		tblk = ItemPointerGetBlockNumber(&vacrelstats->dead_tuples[tupindex]);
+		/*
+		 * If the dead tuple TIDs are shared with all vacuum workers,
+		 * we acquire the lock and advance tupindex before vacuuming.
+		 *
+		 * NB: The number of maximum tuple can be stored into single
+		 * page is not a large number in most cases. We can use spinlock
+		 * here.
+		 */
+		if (IsDeadTupleShared(lvstate))
+		{
+			SpinLockAcquire(&(dtctl->mutex));
+
+			tupindex = dtctl->dt_index;
+
+			if (tupindex >= dtctl->dt_count)
+			{
+				SpinLockRelease(&(dtctl->mutex));
+				break;
+			}
+
+			/* Advance dtct->dt_index */
+			prev_tblk = tblk = ItemPointerGetBlockNumber(&deadtuples[tupindex]);
+			while(prev_tblk == tblk &&
+				  dtctl->dt_index < dtctl->dt_count)
+			{
+				tblk = ItemPointerGetBlockNumber(&deadtuples[dtctl->dt_index]);
+				dtctl->dt_index++;
+				ntuples++;
+			}
+
+			SpinLockRelease(&(dtctl->mutex));
+		}
+
+		tblk = ItemPointerGetBlockNumber(&deadtuples[tupindex]);
 		buf = ReadBufferExtended(onerel, MAIN_FORKNUM, tblk, RBM_NORMAL,
 								 vac_strategy);
 		if (!ConditionalLockBufferForCleanup(buf))
@@ -1404,7 +1657,7 @@ lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats)
 			++tupindex;
 			continue;
 		}
-		tupindex = lazy_vacuum_page(onerel, tblk, buf, tupindex, vacrelstats,
+		tupindex = lazy_vacuum_page(onerel, tblk, buf, tupindex, lvstate,
 									&vmbuffer);
 
 		/* Now that we've compacted the page, record its available space */
@@ -1422,10 +1675,17 @@ lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats)
 		vmbuffer = InvalidBuffer;
 	}
 
+#ifdef PLV_TIME
+	elog(WARNING, "%d TABLE %s", ParallelWorkerNumber, pg_rusage_show(&ru0));
+#endif
+	initStringInfo(&buf);
+	appendStringInfo(&buf,
+					 "------- worker %d VACUUM HEAP stats -------\n", ParallelWorkerNumber);
+	appendStringInfo(&buf,
+					 "\"%s\": removed %d row versions in %d pages",
+					 RelationGetRelationName(onerel), ntuples, npages);
 	ereport(elevel,
-			(errmsg("\"%s\": removed %d row versions in %d pages",
-					RelationGetRelationName(onerel),
-					tupindex, npages),
+			(errmsg("%s", buf.data),
 			 errdetail_internal("%s", pg_rusage_show(&ru0))));
 }
 
@@ -1435,34 +1695,32 @@ lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats)
  *
  * Caller must hold pin and buffer cleanup lock on the buffer.
  *
- * tupindex is the index in vacrelstats->dead_tuples of the first dead
- * tuple for this page.  We assume the rest follow sequentially.
- * The return value is the first tupindex after the tuples of this page.
  */
 static int
 lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer,
-				 int tupindex, LVRelStats *vacrelstats, Buffer *vmbuffer)
+				 int tupindex, LVState *lvstate, Buffer *vmbuffer)
 {
 	Page		page = BufferGetPage(buffer);
 	OffsetNumber unused[MaxOffsetNumber];
 	int			uncnt = 0;
 	TransactionId visibility_cutoff_xid;
 	bool		all_frozen;
+	LVRelStats	*vacrelstats = lvstate->vacrelstats;
 
 	pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_VACUUMED, blkno);
 
 	START_CRIT_SECTION();
 
-	for (; tupindex < vacrelstats->num_dead_tuples; tupindex++)
+	for (; tupindex < lvstate->dtctl->dt_count; tupindex++)
 	{
 		BlockNumber tblk;
 		OffsetNumber toff;
 		ItemId		itemid;
 
-		tblk = ItemPointerGetBlockNumber(&vacrelstats->dead_tuples[tupindex]);
+		tblk = ItemPointerGetBlockNumber(&lvstate->deadtuples[tupindex]);
 		if (tblk != blkno)
 			break;				/* past end of tuples for this block */
-		toff = ItemPointerGetOffsetNumber(&vacrelstats->dead_tuples[tupindex]);
+		toff = ItemPointerGetOffsetNumber(&lvstate->deadtuples[tupindex]);
 		itemid = PageGetItemId(page, toff);
 		ItemIdSetUnused(itemid);
 		unused[uncnt++] = toff;
@@ -1587,14 +1845,15 @@ lazy_check_needs_freeze(Buffer buf, bool *hastup)
  *	lazy_vacuum_index() -- vacuum one index relation.
  *
  *		Delete all the index entries pointing to tuples listed in
- *		vacrelstats->dead_tuples, and update running statistics.
+ *		lvstate->deadtuples, and update running statistics.
  */
 static void
 lazy_vacuum_index(Relation indrel,
 				  IndexBulkDeleteResult **stats,
-				  LVRelStats *vacrelstats)
+				  LVState	*lvstate)
 {
 	IndexVacuumInfo ivinfo;
+	StringInfoData buf;
 	PGRUsage	ru0;
 
 	pg_rusage_init(&ru0);
@@ -1603,17 +1862,25 @@ lazy_vacuum_index(Relation indrel,
 	ivinfo.analyze_only = false;
 	ivinfo.estimated_count = true;
 	ivinfo.message_level = elevel;
-	ivinfo.num_heap_tuples = vacrelstats->old_rel_tuples;
+	ivinfo.num_heap_tuples = lvstate->vacrelstats->old_rel_tuples;
 	ivinfo.strategy = vac_strategy;
 
 	/* Do bulk deletion */
-	*stats = index_bulk_delete(&ivinfo, *stats,
-							   lazy_tid_reaped, (void *) vacrelstats);
+	*stats = index_bulk_delete(&ivinfo, *stats, lazy_tid_reaped, (void *) lvstate);
+
+#ifdef PLV_TIME
+	elog(WARNING, "%d INDEX(%d) %s", ParallelWorkerNumber, RelationGetRelid(indrel),
+		 pg_rusage_show(&ru0));
+#endif
+	initStringInfo(&buf);
+	appendStringInfo(&buf,
+					 "------- worker %d VACUUM INDEX stats -------\n", ParallelWorkerNumber);
+	appendStringInfo(&buf,
+					 "scanned index \"%s\" to remove %d row versions",
+					 RelationGetRelationName(indrel), lvstate->dtctl->dt_count);
 
 	ereport(elevel,
-			(errmsg("scanned index \"%s\" to remove %d row versions",
-					RelationGetRelationName(indrel),
-					vacrelstats->num_dead_tuples),
+			(errmsg("%s", buf.data),
 			 errdetail_internal("%s", pg_rusage_show(&ru0))));
 }
 
@@ -1621,11 +1888,11 @@ lazy_vacuum_index(Relation indrel,
  *	lazy_cleanup_index() -- do post-vacuum cleanup for one index relation.
  */
 static void
-lazy_cleanup_index(Relation indrel,
-				   IndexBulkDeleteResult *stats,
-				   LVRelStats *vacrelstats)
+lazy_cleanup_index(Relation indrel, IndexBulkDeleteResult *stats,
+							   LVRelStats *vacrelstats, LVIndStats *indstat)
 {
 	IndexVacuumInfo ivinfo;
+	StringInfoData	buf;
 	PGRUsage	ru0;
 
 	pg_rusage_init(&ru0);
@@ -1639,34 +1906,54 @@ lazy_cleanup_index(Relation indrel,
 
 	stats = index_vacuum_cleanup(&ivinfo, stats);
 
+	/* Will be updated by leader process after vacuumed */
+	if (indstat)
+		indstat->updated = false;
+
 	if (!stats)
 		return;
 
 	/*
 	 * Now update statistics in pg_class, but only if the index says the count
-	 * is accurate.
+	 * is accurate. In parallel lazy vacuum, the worker can not update these
+	 * information by itself, so save to DSM and then the launcher process
+	 * updates it later.
 	 */
 	if (!stats->estimated_count)
-		vac_update_relstats(indrel,
-							stats->num_pages,
-							stats->num_index_tuples,
-							0,
-							false,
-							InvalidTransactionId,
-							InvalidMultiXactId,
-							false);
+	{
+		if (indstat)
+		{
+			indstat->updated = true;
+			indstat->num_pages = stats->num_pages;
+			indstat->num_tuples = stats->num_index_tuples;
+		}
+		else
+			vac_update_relstats(indrel,
+								stats->num_pages,
+								stats->num_index_tuples,
+								0,
+								false,
+								InvalidTransactionId,
+								InvalidMultiXactId,
+								false);
+	}
 
+	initStringInfo(&buf);
+	appendStringInfo(&buf,
+					 "------- worker %d CLEANUP INDEX stats -------\n", ParallelWorkerNumber);
+	appendStringInfo(&buf,
+					 "index \"%s\" now contains %.0f row versions in %u pages",
+					 RelationGetRelationName(indrel),
+					 stats->num_index_tuples,
+					 stats->num_pages);
 	ereport(elevel,
-			(errmsg("index \"%s\" now contains %.0f row versions in %u pages",
-					RelationGetRelationName(indrel),
-					stats->num_index_tuples,
-					stats->num_pages),
-			 errdetail("%.0f index row versions were removed.\n"
-					   "%u index pages have been deleted, %u are currently reusable.\n"
-					   "%s.",
-					   stats->tuples_removed,
-					   stats->pages_deleted, stats->pages_free,
-					   pg_rusage_show(&ru0))));
+			(errmsg("%s", buf.data),
+					errdetail("%.0f index row versions were removed.\n"
+							  "%u index pages have been deleted, %u are currently reusable.\n"
+							  "%s.",
+							  stats->tuples_removed,
+							  stats->pages_deleted, stats->pages_free,
+							  pg_rusage_show(&ru0))));
 
 	pfree(stats);
 }
@@ -1976,59 +2263,69 @@ count_nondeletable_pages(Relation onerel, LVRelStats *vacrelstats)
 /*
  * lazy_space_alloc - space allocation decisions for lazy vacuum
  *
+ * In parallel lazy vacuum the space for dead tuple locations are already
+ * allocated in dynamic shared memory, so we allocate space for dead tuple
+ * locations in local memory only when in not parallel lazy vacuum and set
+ * MyDeadTuple.
+ *
  * See the comments at the head of this file for rationale.
  */
 static void
-lazy_space_alloc(LVRelStats *vacrelstats, BlockNumber relblocks)
+lazy_space_alloc(LVState *lvstate, BlockNumber relblocks)
 {
-	long		maxtuples;
-	int			vac_work_mem = IsAutoVacuumWorkerProcess() &&
-	autovacuum_work_mem != -1 ?
-	autovacuum_work_mem : maintenance_work_mem;
+	long maxtuples;
 
-	if (vacrelstats->hasindex)
-	{
-		maxtuples = (vac_work_mem * 1024L) / sizeof(ItemPointerData);
-		maxtuples = Min(maxtuples, INT_MAX);
-		maxtuples = Min(maxtuples, MaxAllocSize / sizeof(ItemPointerData));
+	/*
+	 * In parallel mode, we already set the pointer to dead tuple
+	 * array when initialize.
+	 */
+	if (lvstate->parallel_mode && lvstate->vacrelstats->nindexes > 0)
+		return;
 
-		/* curious coding here to ensure the multiplication can't overflow */
-		if ((BlockNumber) (maxtuples / LAZY_ALLOC_TUPLES) > relblocks)
-			maxtuples = relblocks * LAZY_ALLOC_TUPLES;
+	maxtuples = lazy_get_max_dead_tuples(lvstate->vacrelstats);
 
-		/* stay sane if small maintenance_work_mem */
-		maxtuples = Max(maxtuples, MaxHeapTuplesPerPage);
-	}
-	else
-	{
-		maxtuples = MaxHeapTuplesPerPage;
-	}
-
-	vacrelstats->num_dead_tuples = 0;
-	vacrelstats->max_dead_tuples = (int) maxtuples;
-	vacrelstats->dead_tuples = (ItemPointer)
-		palloc(maxtuples * sizeof(ItemPointerData));
+	/*
+	 * If in not parallel lazy vacuum, we need to allocate dead
+	 * tuple array in local memory.
+	 */
+	lvstate->deadtuples = palloc0(sizeof(ItemPointerData) * (int)maxtuples);
+	lvstate->dtctl = (LVDeadTupleCtl *) palloc(sizeof(LVDeadTupleCtl));
+	lvstate->dtctl->dt_max = lazy_get_max_dead_tuples(lvstate->vacrelstats);
+	lvstate->dtctl->dt_count = 0;
 }
 
 /*
  * lazy_record_dead_tuple - remember one deletable tuple
+ *
+ * Acquiring the spinlock before remember is required if the dead tuple
+ * TIDs are shared with other vacuum workers.
  */
 static void
-lazy_record_dead_tuple(LVRelStats *vacrelstats,
-					   ItemPointer itemptr)
+lazy_record_dead_tuple(LVState *lvstate, ItemPointer itemptr)
 {
+	LVDeadTupleCtl *dtctl = lvstate->dtctl;
+
+	if (IsDeadTupleShared(lvstate))
+		SpinLockAcquire(&(dtctl->mutex));
+
+	if (dtctl->dt_count >= dtctl->dt_max)
+		elog(ERROR, "dead tuple array overflow");
+
 	/*
 	 * The array must never overflow, since we rely on all deletable tuples
 	 * being removed; inability to remove a tuple might cause an old XID to
 	 * persist beyond the freeze limit, which could be disastrous later on.
 	 */
-	if (vacrelstats->num_dead_tuples >= vacrelstats->max_dead_tuples)
-		elog(ERROR, "dead tuple array overflow");
+	if (dtctl->dt_count < dtctl->dt_max)
+	{
 
-	vacrelstats->dead_tuples[vacrelstats->num_dead_tuples] = *itemptr;
-	vacrelstats->num_dead_tuples++;
-	pgstat_progress_update_param(PROGRESS_VACUUM_NUM_DEAD_TUPLES,
-								 vacrelstats->num_dead_tuples);
+		lvstate->deadtuples[dtctl->dt_count] = *itemptr;
+		(dtctl->dt_count)++;
+		/* XXX : Update progress information here */
+	}
+
+	if (IsDeadTupleShared(lvstate))
+		SpinLockRelease(&(dtctl->mutex));
 }
 
 /*
@@ -2041,16 +2338,23 @@ lazy_record_dead_tuple(LVRelStats *vacrelstats,
 static bool
 lazy_tid_reaped(ItemPointer itemptr, void *state)
 {
-	LVRelStats *vacrelstats = (LVRelStats *) state;
+	LVState *lvstate = (LVState *) state;
 	ItemPointer res;
 
+	/*
+	 * We can assume that the dead tuple TIDs are sorted by TID location
+	 * even when we shared the dead tuple TIDs with other vacuum workers.
+	 */
 	res = (ItemPointer) bsearch((void *) itemptr,
-								(void *) vacrelstats->dead_tuples,
-								vacrelstats->num_dead_tuples,
+								(void *) lvstate->deadtuples,
+								lvstate->dtctl->dt_count,
 								sizeof(ItemPointerData),
 								vac_cmp_itemptr);
 
-	return (res != NULL);
+	if (res != NULL)
+		return true;
+
+	return false;
 }
 
 /*
@@ -2194,3 +2498,622 @@ heap_page_is_all_visible(Relation rel, Buffer buf,
 
 	return all_visible;
 }
+
+/*
+ * Return the block number we need to scan next, or InvalidBlockNumber if scan
+ * is done.
+ *
+ * Except when aggressive is set, we want to skip pages that are
+ * all-visible according to the visibility map, but only when we can skip
+ * at least SKIP_PAGES_THRESHOLD consecutive pages.  Since we're reading
+ * sequentially, the OS should be doing readahead for us, so there's no
+ * gain in skipping a page now and then; that's likely to disable
+ * readahead and so be counterproductive. Also, skipping even a single
+ * page means that we can't update relfrozenxid, so we only want to do it
+ * if we can skip a goodly number of pages.
+ *
+ * When aggressive is set, we can't skip pages just because they are
+ * all-visible, but we can still skip pages that are all-frozen, since
+ * such pages do not need freezing and do not affect the value that we can
+ * safely set for relfrozenxid or relminmxid.
+ *
+ * Before entering the main loop, establish the invariant that
+ * next_unskippable_block is the next block number >= blkno that we can't
+ * skip based on the visibility map, either all-visible for a regular scan
+ * or all-frozen for an aggressive scan.  We set it to nblocks if there's
+ * no such block.  We also set up the skipping_blocks flag correctly at
+ * this stage.
+ *
+ * In not parallel mode, before entering the main loop, establish the
+ * invariant that next_unskippable_block is the next block number >= blkno
+ * that's not we can't skip based on the visibility map, either all-visible
+ * for a regular scan or all-frozen for an aggressive scan.  We set it to
+ * nblocks if there's no such block.  We also set up the skipping_blocks
+ * flag correctly at this stage.
+ *
+ * In parallel mode, pstate is not NULL. We scan heap pages
+ * using parallel heap scan description. Each worker calls heap_parallelscan_nextpage()
+ * in order to exclusively get  block number we need to scan at next.
+ * If given block is all-visible according to visibility map, we skip to
+ * scan this block immediately unlike not parallel lazy scan.
+ *
+ * Note: The value returned by visibilitymap_get_status could be slightly
+ * out-of-date, since we make this test before reading the corresponding
+ * heap page or locking the buffer.  This is OK.  If we mistakenly think
+ * that the page is all-visible or all-frozen when in fact the flag's just
+ * been cleared, we might fail to vacuum the page.  It's easy to see that
+ * skipping a page when aggressive is not set is not a very big deal; we
+ * might leave some dead tuples lying around, but the next vacuum will
+ * find them.  But even when aggressive *is* set, it's still OK if we miss
+ * a page whose all-frozen marking has just been cleared.  Any new XIDs
+ * just added to that page are necessarily newer than the GlobalXmin we
+ * Computed, so they'll have no effect on the value to which we can safely
+ * set relfrozenxid.  A similar argument applies for MXIDs and relminmxid.
+ *
+ * We will scan the table's last page, at least to the extent of
+ * determining whether it has tuples or not, even if it should be skipped
+ * according to the above rules; except when we've already determined that
+ * it's not worth trying to truncate the table.  This avoids having
+ * lazy_truncate_heap() take access-exclusive lock on the table to attempt
+ * a truncation that just fails immediately because there are tuples in
+ * the last page.  This is worth avoiding mainly because such a lock must
+ * be replayed on any hot standby, where it can be disruptive.
+ */
+static BlockNumber
+lazy_scan_get_nextpage(Relation onerel, LVState *lvstate,
+					   LVScanDesc lvscan, bool *all_visible_according_to_vm,
+					   Buffer *vmbuffer, int options, bool aggressive)
+{
+	BlockNumber blkno;
+	LVRelStats	*vacrelstats = lvstate->vacrelstats;
+
+	if (lvstate->parallel_mode)
+	{
+		/*
+		 * In parallel lazy vacuum since it's hard to know how many consecutive
+		 * all-visible pages exits on table we skip to scan the heap page immediately.
+		 * if it is all-visible page.
+		 */
+		while ((blkno = heap_parallelscan_nextpage(lvscan->heapscan)) != InvalidBlockNumber)
+		{
+			*all_visible_according_to_vm = false;
+			vacuum_delay_point();
+
+			/* Consider to skip scan page according visibility map */
+			if ((options & VACOPT_DISABLE_PAGE_SKIPPING) == 0 &&
+				!FORCE_CHECK_PAGE(blkno))
+			{
+				uint8		vmstatus;
+
+				vmstatus = visibilitymap_get_status(onerel, blkno, vmbuffer);
+
+				if (aggressive)
+				{
+					if ((vmstatus & VISIBILITYMAP_ALL_FROZEN) != 0)
+					{
+						vacrelstats->frozenskipped_pages++;
+						continue;
+					}
+					else if ((vmstatus & VISIBILITYMAP_ALL_VISIBLE) != 0)
+						*all_visible_according_to_vm = true;
+				}
+				else
+				{
+					if ((vmstatus & VISIBILITYMAP_ALL_VISIBLE) != 0)
+					{
+						if ((vmstatus & VISIBILITYMAP_ALL_FROZEN) != 0)
+							vacrelstats->frozenskipped_pages++;
+						continue;
+					}
+				}
+			}
+
+			/* We need to scan current blkno, break */
+			break;
+		}
+	}
+	else
+	{
+		bool skipping_blocks = false;
+
+		/* Initialize lv_nextunskippable_page if needed */
+		if (lvscan->lv_cblock == 0 && (options & VACOPT_DISABLE_PAGE_SKIPPING) == 0)
+		{
+			while (lvscan->lv_next_unskippable_block < lvscan->lv_nblocks)
+			{
+				uint8		vmstatus;
+
+				vmstatus = visibilitymap_get_status(onerel, lvscan->lv_next_unskippable_block,
+													vmbuffer);
+				if (aggressive)
+				{
+					if ((vmstatus & VISIBILITYMAP_ALL_FROZEN) == 0)
+						break;
+				}
+				else
+				{
+					if ((vmstatus & VISIBILITYMAP_ALL_VISIBLE) == 0)
+						break;
+				}
+				vacuum_delay_point();
+				lvscan->lv_next_unskippable_block++;
+			}
+
+			if (lvscan->lv_next_unskippable_block >= SKIP_PAGES_THRESHOLD)
+				skipping_blocks = true;
+			else
+				skipping_blocks = false;
+		}
+
+		/* Decide the block number we need to scan */
+		for (blkno = lvscan->lv_cblock; blkno < lvscan->lv_nblocks; blkno++)
+		{
+			if (blkno == lvscan->lv_next_unskippable_block)
+			{
+				/* Time to advance next_unskippable_block */
+				lvscan->lv_next_unskippable_block++;
+				if ((options & VACOPT_DISABLE_PAGE_SKIPPING) == 0)
+				{
+					while (lvscan->lv_next_unskippable_block < lvscan->lv_nblocks)
+					{
+						uint8		vmstatus;
+
+						vmstatus = visibilitymap_get_status(onerel,
+															lvscan->lv_next_unskippable_block,
+															vmbuffer);
+						if (aggressive)
+						{
+							if ((vmstatus & VISIBILITYMAP_ALL_FROZEN) == 0)
+								break;
+						}
+						else
+						{
+							if ((vmstatus & VISIBILITYMAP_ALL_VISIBLE) == 0)
+								break;
+						}
+						vacuum_delay_point();
+						lvscan->lv_next_unskippable_block++;
+					}
+				}
+
+				/*
+				 * We know we can't skip the current block.  But set up
+				 * skipping_all_visible_blocks to do the right thing at the
+				 * following blocks.
+				 */
+				if (lvscan->lv_next_unskippable_block - blkno > SKIP_PAGES_THRESHOLD)
+					skipping_blocks = true;
+				else
+					skipping_blocks = false;
+
+				/*
+				 * Normally, the fact that we can't skip this block must mean that
+				 * it's not all-visible.  But in an aggressive vacuum we know only
+				 * that it's not all-frozen, so it might still be all-visible.
+				 */
+				if (aggressive && VM_ALL_VISIBLE(onerel, blkno, vmbuffer))
+					*all_visible_according_to_vm = true;
+
+				/* Found out that next unskippable block number */
+				break;
+			}
+			else
+			{
+				/*
+				 * The current block is potentially skippable; if we've seen a
+				 * long enough run of skippable blocks to justify skipping it, and
+				 * we're not forced to check it, then go ahead and skip.
+				 * Otherwise, the page must be at least all-visible if not
+				 * all-frozen, so we can set all_visible_according_to_vm = true.
+				 */
+				if (skipping_blocks && !FORCE_CHECK_PAGE(blkno))
+				{
+					/*
+					 * Tricky, tricky.  If this is in aggressive vacuum, the page
+					 * must have been all-frozen at the time we checked whether it
+					 * was skippable, but it might not be any more.  We must be
+					 * careful to count it as a skipped all-frozen page in that
+					 * case, or else we'll think we can't update relfrozenxid and
+					 * relminmxid.  If it's not an aggressive vacuum, we don't
+					 * know whether it was all-frozen, so we have to recheck; but
+					 * in this case an approximate answer is OK.
+					 */
+					if (aggressive || VM_ALL_FROZEN(onerel, blkno, vmbuffer))
+						vacrelstats->frozenskipped_pages++;
+					continue;
+				}
+
+				*all_visible_according_to_vm = true;
+
+				/* We need to scan current blkno, break */
+				break;
+			}
+		} /* for */
+
+		/* Advance the current block number for the next scan */
+		lvscan->lv_cblock = blkno + 1;
+	}
+
+	return (blkno == lvscan->lv_nblocks) ? InvalidBlockNumber : blkno;
+}
+
+/*
+ * Begin lazy vacuum scan. lvscan->heapscan is NULL if
+ * we're not in parallel lazy vacuum.
+ */
+static LVScanDesc
+lv_beginscan(Relation onerel, ParallelHeapScanDesc pscan)
+{
+	LVScanDesc lvscan;
+
+	lvscan = (LVScanDesc) palloc(sizeof(LVScanDescData));
+
+	lvscan->lv_cblock = 0;
+	lvscan->lv_next_unskippable_block = 0;
+	lvscan->lv_nblocks = RelationGetNumberOfBlocks(onerel);
+
+	if (pscan != NULL)
+		lvscan->heapscan = heap_beginscan_parallel(onerel, pscan);
+	else
+		lvscan->heapscan = NULL;
+
+	return lvscan;
+}
+
+/*
+ * End lazy vacuum scan.
+ */
+static void
+lv_endscan(LVScanDesc lvscan)
+{
+	if (lvscan->heapscan != NULL)
+		heap_endscan(lvscan->heapscan);
+	pfree(lvscan);
+}
+
+/* ----------------------------------------------------------------
+ *						Parallel Lazy Vacuum Support
+ * ----------------------------------------------------------------
+ */
+
+/*
+ * Estimate storage for parallel lazy vacuum.
+ */
+static void
+lazy_estimate_dsm(ParallelContext *pcxt, LVRelStats *vacrelstats)
+{
+	Size size = 0;
+	int keys = 0;
+	int vacuum_workers = pcxt->nworkers + 1;
+	long maxtuples = lazy_get_max_dead_tuples(vacrelstats);
+
+	/* Estimate size for parallel heap scan */
+	size += heap_parallelscan_estimate(SnapshotAny);
+	keys++;
+
+	/* Estimate size for vacuum statistics for only workers*/
+	size += BUFFERALIGN(mul_size(sizeof(LVRelStats), pcxt->nworkers));
+	keys++;
+
+	/* We have to share dead tuple information only when the table has indexes */
+	if (vacrelstats->nindexes > 0)
+	{
+		/* Estimate size for index statistics */
+		size += BUFFERALIGN(mul_size(sizeof(LVIndStats), vacrelstats->nindexes));
+		keys++;
+
+		/* Estimate size for dead tuple control */
+		size += BUFFERALIGN(sizeof(LVDeadTupleCtl));
+		keys++;
+
+		/* Estimate size for dead tuple array */
+		size += BUFFERALIGN(mul_size(
+							 mul_size(sizeof(ItemPointerData), maxtuples),
+							 vacuum_workers));
+		keys++;
+	}
+
+	/* Estimate size for parallel lazy vacuum state */
+	size += BUFFERALIGN(sizeof(LVParallelState));
+	keys++;
+
+	/* Estimate size for vacuum task */
+	size += BUFFERALIGN(sizeof(VacuumInfo));
+	keys++;
+
+	shm_toc_estimate_chunk(&pcxt->estimator, size);
+	shm_toc_estimate_keys(&pcxt->estimator, keys);
+}
+
+/*
+ * Initialize dynamic shared memory for parallel lazy vacuum. We store
+ * relevant informations of parallel heap scanning, dead tuple array
+ * and vacuum statistics for each worker and some parameters for lazy vacuum.
+ */
+static void
+lazy_initialize_dsm(ParallelContext *pcxt, Relation onerel, LVState *lvstate,
+					int options, bool aggressive)
+{
+	ParallelHeapScanDesc pscan_ptr;
+	ItemPointer	deadtuples_ptr;
+	char 		*lvrelstats_ptr;
+	LVParallelState *pstate_ptr;
+	LVIndStats	*indstats_ptr;
+	LVDeadTupleCtl	*dtctl_ptr;
+	int i;
+	int deadtuples_size;
+	int lvrelstats_size;
+	int	vacuum_workers = pcxt->nworkers + 1;
+	long max_tuples = lazy_get_max_dead_tuples(lvstate->vacrelstats);
+
+	/* Allocate and initialize DSM for vacuum stats for each worker */
+	lvrelstats_size = mul_size(sizeof(LVRelStats), pcxt->nworkers);
+	lvrelstats_ptr = shm_toc_allocate(pcxt->toc, lvrelstats_size);
+	for (i = 0; i < pcxt->nworkers; i++)
+	{
+		char *start;
+
+		start = lvrelstats_ptr + i * sizeof(LVRelStats);
+		memcpy(start, lvstate->vacrelstats, sizeof(LVRelStats));
+	}
+	shm_toc_insert(pcxt->toc, VACUUM_KEY_VACUUM_STATS, lvrelstats_ptr);
+
+	if (lvstate->vacrelstats->nindexes > 0)
+	{
+		/* Allocate and initialize DSM for dead tuple control */
+		dtctl_ptr = (LVDeadTupleCtl *) shm_toc_allocate(pcxt->toc, sizeof(LVDeadTupleCtl));
+		SpinLockInit(&(dtctl_ptr->mutex));
+		dtctl_ptr->dt_max = max_tuples * vacuum_workers;
+		dtctl_ptr->dt_count = 0;
+		dtctl_ptr->dt_index = 0;
+		lvstate->dtctl = dtctl_ptr;
+		shm_toc_insert(pcxt->toc, VACUUM_KEY_DEAD_TUPLE_CTL, dtctl_ptr);
+
+		/* Allocate and initialize DSM for dead tuple array */
+		deadtuples_size = mul_size(mul_size(sizeof(ItemPointerData), max_tuples),
+								   vacuum_workers);
+		deadtuples_ptr = (ItemPointer) shm_toc_allocate(pcxt->toc,
+														deadtuples_size);
+		shm_toc_insert(pcxt->toc, VACUUM_KEY_DEAD_TUPLES, deadtuples_ptr);
+		lvstate->deadtuples = deadtuples_ptr;
+
+		/* Allocate DSM for index statistics */
+		indstats_ptr = (LVIndStats *) shm_toc_allocate(pcxt->toc,
+													   mul_size(sizeof(LVIndStats),
+																lvstate->vacrelstats->nindexes));
+		shm_toc_insert(pcxt->toc, VACUUM_KEY_INDEX_STATS, indstats_ptr);
+		lvstate->indstats = indstats_ptr;
+	}
+
+	/* Allocate and initialize DSM for parallel scan description */
+	pscan_ptr = (ParallelHeapScanDesc) shm_toc_allocate(pcxt->toc,
+														heap_parallelscan_estimate(SnapshotAny));
+	shm_toc_insert(pcxt->toc, VACUUM_KEY_PARALLEL_SCAN, pscan_ptr);
+	heap_parallelscan_initialize(pscan_ptr, onerel, SnapshotAny);
+	lvstate->pscan = pscan_ptr;
+
+	/* Allocate and initialize DSM for parallel vacuum state */
+	pstate_ptr = (LVParallelState *) shm_toc_allocate(pcxt->toc, sizeof(LVParallelState));
+	shm_toc_insert(pcxt->toc, VACUUM_KEY_PARALLEL_STATE, pstate_ptr);
+
+	ConditionVariableInit(&(pstate_ptr->cv));
+	SpinLockInit(&(pstate_ptr->mutex));
+	pstate_ptr->nworkers = vacuum_workers;
+	pstate_ptr->state = VACSTATE_SCAN;
+	pstate_ptr->info.aggressive = aggressive;
+	pstate_ptr->info.options = options;
+	pstate_ptr->info.oldestxmin = OldestXmin;
+	pstate_ptr->info.freezelimit = FreezeLimit;
+	pstate_ptr->info.multixactcutoff = MultiXactCutoff;
+	pstate_ptr->info.elevel = elevel;
+	lvstate->pstate = pstate_ptr;
+}
+
+/*
+ * Initialize parallel lazy vacuum for worker.
+ */
+static LVState *
+lazy_initialize_worker(shm_toc *toc)
+{
+	LVState	*lvstate;
+	char *lvstats;
+
+	lvstate = (LVState *) palloc(sizeof(LVState));
+	lvstate->parallel_mode = true;
+
+	/* Set up vacuum stats */
+	lvstats = shm_toc_lookup(toc, VACUUM_KEY_VACUUM_STATS, false);
+	lvstate->vacrelstats = (LVRelStats *) (lvstats +
+										   sizeof(LVRelStats) * ParallelWorkerNumber);
+
+	if (lvstate->vacrelstats->nindexes > 0)
+	{
+		/* Set up dead tuple control */
+		lvstate->dtctl = (LVDeadTupleCtl *) shm_toc_lookup(toc, VACUUM_KEY_DEAD_TUPLE_CTL, false);
+
+		/* Set up dead tuple array */
+		lvstate->deadtuples = (ItemPointer) shm_toc_lookup(toc, VACUUM_KEY_DEAD_TUPLES, false);
+
+		/* Set up index statistics */
+		lvstate->indstats = (LVIndStats *) shm_toc_lookup(toc, VACUUM_KEY_INDEX_STATS, false);
+	}
+
+	/* Set up parallel vacuum state */
+	lvstate->pstate = (LVParallelState *) shm_toc_lookup(toc, VACUUM_KEY_PARALLEL_STATE, false);
+
+	/* Set up parallel heap scan description */
+	lvstate->pscan = (ParallelHeapScanDesc) shm_toc_lookup(toc, VACUUM_KEY_PARALLEL_SCAN, false);
+
+	/* Set up parameters for lazy vacuum */
+	OldestXmin = lvstate->pstate->info.oldestxmin;
+	FreezeLimit = lvstate->pstate->info.freezelimit;
+	MultiXactCutoff = lvstate->pstate->info.multixactcutoff;
+	elevel = lvstate->pstate->info.elevel;
+
+	return lvstate;
+}
+
+/*
+ * In the end of actual vacuumming on table and indexes actually, we have
+ * to wait for other all vacuum workers to reach here before clearing dead
+ * tuple TIDs information.
+ */
+static void
+lazy_end_vacuum(LVState *lvstate)
+{
+	LVParallelState *pstate = lvstate->pstate;
+	bool counted = false;
+
+	/* Exit if in not parallel vacuum */
+	if (!lvstate->parallel_mode)
+	{
+		lvstate->dtctl->dt_count = 0;
+		return;
+	}
+
+	while (true)
+	{
+		int finish_count;
+		int state;
+
+		SpinLockAcquire(&(pstate->mutex));
+
+		/* Fetch shared information */
+		if (!counted)
+			pstate->finish_count++;
+		finish_count = pstate->finish_count;
+		state = pstate->state;
+
+		SpinLockRelease(&(pstate->mutex));
+
+		if (state == VACSTATE_SCAN)
+			break;
+
+		/*
+		 * Wake up other workers if counted up if first time to reach here and
+		 * is a parallel worker.
+		 */
+		if (!counted && IsParallelWorker())
+			ConditionVariableBroadcast(&pstate->cv);
+
+		counted = true;
+
+		/*
+		 * If all launched parallel vacuum workers reached here, we can clear the
+		 * dead tuple TIDs information.
+		 */
+		if (!IsParallelWorker() && finish_count == (lvstate->pcxt->nworkers_launched + 1))
+		{
+			/* Clear dead tuples */
+			lvstate->dtctl->dt_count = 0;
+
+			/* need spinlock ? */
+			pstate->finish_count = 0;
+			pstate->state = VACSTATE_SCAN;
+
+			ConditionVariableBroadcast(&pstate->cv);
+			break;
+		}
+
+		ConditionVariableSleep(&pstate->cv, WAIT_EVENT_PARALLEL_VACUUM_DONE);
+	}
+
+	ConditionVariableCancelSleep();
+}
+
+/*
+ * Before starting actual vacuuming on table and indexes, we have to wait for
+ * other all vacuum workers so that all worker can see the same dead tuple TIDs
+ * information when vacuuming.
+ */
+static void
+lazy_prepare_vacuum(LVState *lvstate)
+{
+	LVParallelState *pstate = lvstate->pstate;
+	bool counted = false;
+
+	/* Exit if in not parallel vacuum */
+	if (!lvstate->parallel_mode)
+		return;
+
+	while (true)
+	{
+		int finish_count;
+		int state;
+
+		SpinLockAcquire(&(pstate->mutex));
+
+		if (!counted)
+			pstate->finish_count++;
+		state = pstate->state;
+		finish_count = pstate->finish_count;
+
+		SpinLockRelease(&(pstate->mutex));
+
+		if (state == VACSTATE_VACUUM)
+			break;
+
+		/*
+		 * Wake up other workers if counted up if first time to reach here and
+		 * is a parallel worker.
+		 */
+		if (!counted && IsParallelWorker())
+			ConditionVariableBroadcast(&pstate->cv);
+
+		counted = true;
+
+		/*
+		 * The leader process can change parallel vacuum state if all workers
+		 * have reached here.
+		 */
+		if (!IsParallelWorker() && finish_count == (lvstate->pcxt->nworkers_launched + 1))
+		{
+			qsort((void *) lvstate->deadtuples, lvstate->dtctl->dt_count,
+				  sizeof(ItemPointerData), vac_cmp_itemptr);
+
+			/* XXX: need spinlock ? */
+			pstate->finish_count = 0;
+			pstate->state = VACSTATE_VACUUM;
+
+			ConditionVariableBroadcast(&pstate->cv);
+			break;
+		}
+
+		ConditionVariableSleep(&pstate->cv, WAIT_EVENT_PARALLEL_VACUUM_PREPARE);
+	}
+
+	ConditionVariableCancelSleep();
+}
+
+/*
+ * Return the number of maximum dead tuples can be stored according
+ * to vac_work_mem.
+ */
+static long
+lazy_get_max_dead_tuples(LVRelStats *vacrelstats)
+{
+	long maxtuples;
+	int	vac_work_mem = IsAutoVacuumWorkerProcess() &&
+		autovacuum_work_mem != -1 ?
+		autovacuum_work_mem : maintenance_work_mem;
+
+	if (vacrelstats->nindexes != 0)
+	{
+		maxtuples = (vac_work_mem * 1024L) / sizeof(ItemPointerData);
+		maxtuples = Min(maxtuples, INT_MAX);
+		maxtuples = Min(maxtuples, MaxAllocSize / sizeof(ItemPointerData));
+
+		/* curious coding here to ensure the multiplication can't overflow */
+		if ((BlockNumber) (maxtuples / LAZY_ALLOC_TUPLES) > vacrelstats->old_rel_pages)
+			maxtuples = vacrelstats->old_rel_pages * LAZY_ALLOC_TUPLES;
+
+		/* stay sane if small maintenance_work_mem */
+		maxtuples = Max(maxtuples, MaxHeapTuplesPerPage);
+	}
+	else
+	{
+		maxtuples = MaxHeapTuplesPerPage;
+	}
+
+	return maxtuples;
+}
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
index 7a70001..8cba9c8 100644
--- a/src/backend/nodes/equalfuncs.c
+++ b/src/backend/nodes/equalfuncs.c
@@ -1662,7 +1662,12 @@ _equalDropdbStmt(const DropdbStmt *a, const DropdbStmt *b)
 static bool
 _equalVacuumStmt(const VacuumStmt *a, const VacuumStmt *b)
 {
-	COMPARE_SCALAR_FIELD(options);
+	if (a->options.flags != b->options.flags)
+		return false;
+
+	if (a->options.nworkers != b->options.nworkers)
+		return false;
+
 	COMPARE_NODE_FIELD(rels);
 
 	return true;
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 4c83a63..85f5d07 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -187,6 +187,7 @@ static void processCASbits(int cas_bits, int location, const char *constrType,
 			   bool *deferrable, bool *initdeferred, bool *not_valid,
 			   bool *no_inherit, core_yyscan_t yyscanner);
 static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
+static VacuumOptions *makeVacOpt(VacuumOption flag, int nworkers);
 
 %}
 
@@ -237,6 +238,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 	struct ImportQual	*importqual;
 	InsertStmt			*istmt;
 	VariableSetStmt		*vsetstmt;
+	VacuumOptions		*vacopts;
 	PartitionElem		*partelem;
 	PartitionSpec		*partspec;
 	PartitionBoundSpec	*partboundspec;
@@ -305,7 +307,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 				create_extension_opt_item alter_extension_opt_item
 
 %type <ival>	opt_lock lock_type cast_context
-%type <ival>	vacuum_option_list vacuum_option_elem
+%type <vacopts>	vacuum_option_list vacuum_option_elem
 %type <boolean>	opt_or_replace
 				opt_grant_grant_option opt_grant_admin_option
 				opt_nowait opt_if_exists opt_with_data
@@ -10152,32 +10154,40 @@ cluster_index_specification:
 VacuumStmt: VACUUM opt_full opt_freeze opt_verbose opt_vacuum_relation_list
 				{
 					VacuumStmt *n = makeNode(VacuumStmt);
-					n->options = VACOPT_VACUUM;
+					VacuumOptions *vacopts = makeVacOpt(VACOPT_VACUUM, 1);
 					if ($2)
-						n->options |= VACOPT_FULL;
+						vacopts->flags |= VACOPT_FULL;
 					if ($3)
-						n->options |= VACOPT_FREEZE;
+						vacopts->flags |= VACOPT_FREEZE;
 					if ($4)
-						n->options |= VACOPT_VERBOSE;
+						vacopts->flags |= VACOPT_VERBOSE;
+
 					n->rels = $5;
+					n->options.flags = vacopts->flags;
+					n->options.nworkers = 0;
 					$$ = (Node *)n;
+					pfree(vacopts);
 				}
 			| VACUUM opt_full opt_freeze opt_verbose AnalyzeStmt
 				{
 					VacuumStmt *n = (VacuumStmt *) $5;
-					n->options |= VACOPT_VACUUM;
+					n->options.flags |= VACOPT_VACUUM;
 					if ($2)
-						n->options |= VACOPT_FULL;
+						n->options.flags |= VACOPT_FULL;
 					if ($3)
-						n->options |= VACOPT_FREEZE;
+						n->options.flags |= VACOPT_FREEZE;
 					if ($4)
-						n->options |= VACOPT_VERBOSE;
+						n->options.flags |= VACOPT_VERBOSE;
+					n->options.nworkers = 0;
 					$$ = (Node *)n;
 				}
 			| VACUUM '(' vacuum_option_list ')' opt_vacuum_relation_list
 				{
 					VacuumStmt *n = makeNode(VacuumStmt);
-					n->options = VACOPT_VACUUM | $3;
+					VacuumOptions	*vacopts = $3;
+
+					n->options.flags = vacopts->flags | VACOPT_VACUUM;
+					n->options.nworkers = vacopts->nworkers;
 					n->rels = $5;
 					$$ = (Node *) n;
 				}
@@ -10185,18 +10195,38 @@ VacuumStmt: VACUUM opt_full opt_freeze opt_verbose opt_vacuum_relation_list
 
 vacuum_option_list:
 			vacuum_option_elem								{ $$ = $1; }
-			| vacuum_option_list ',' vacuum_option_elem		{ $$ = $1 | $3; }
+			| vacuum_option_list ',' vacuum_option_elem
+			{
+				VacuumOptions *vacopts1 = (VacuumOptions *)$1;
+				VacuumOptions *vacopts2 = (VacuumOptions *)$3;
+
+				vacopts1->flags |= vacopts2->flags;
+				if (vacopts2->flags == VACOPT_PARALLEL)
+					vacopts1->nworkers = vacopts2->nworkers;
+
+				$$ = vacopts1;
+				pfree(vacopts2);
+			}
 		;
 
 vacuum_option_elem:
-			analyze_keyword		{ $$ = VACOPT_ANALYZE; }
-			| VERBOSE			{ $$ = VACOPT_VERBOSE; }
-			| FREEZE			{ $$ = VACOPT_FREEZE; }
-			| FULL				{ $$ = VACOPT_FULL; }
+			analyze_keyword		{ $$ = makeVacOpt(VACOPT_ANALYZE, 0); }
+			| VERBOSE			{ $$ = makeVacOpt(VACOPT_VERBOSE, 0); }
+			| FREEZE			{ $$ = makeVacOpt(VACOPT_FREEZE, 0); }
+			| FULL				{ $$ = makeVacOpt(VACOPT_FULL, 0); }
+			| PARALLEL ICONST
+				{
+					if ($2 < 1)
+						ereport(ERROR,
+								(errcode(ERRCODE_SYNTAX_ERROR),
+								 errmsg("parallel vacuum degree must be more than 1"),
+								 parser_errposition(@1)));
+					$$ = makeVacOpt(VACOPT_PARALLEL, $2);
+				}
 			| IDENT
 				{
 					if (strcmp($1, "disable_page_skipping") == 0)
-						$$ = VACOPT_DISABLE_PAGE_SKIPPING;
+						$$ = makeVacOpt(VACOPT_DISABLE_PAGE_SKIPPING, 1);
 					else
 						ereport(ERROR,
 								(errcode(ERRCODE_SYNTAX_ERROR),
@@ -10208,11 +10238,16 @@ vacuum_option_elem:
 AnalyzeStmt: analyze_keyword opt_verbose opt_vacuum_relation_list
 				{
 					VacuumStmt *n = makeNode(VacuumStmt);
-					n->options = VACOPT_ANALYZE;
+					VacuumOptions *vacopts = makeVacOpt(VACOPT_ANALYZE, 1);
+
 					if ($2)
-						n->options |= VACOPT_VERBOSE;
+						vacopts->flags |= VACOPT_VERBOSE;
+
+					n->options.flags = vacopts->flags;
+					n->options.nworkers = 0;
 					n->rels = $3;
 					$$ = (Node *)n;
+					pfree(vacopts);
 				}
 		;
 
@@ -15915,6 +15950,16 @@ makeRecursiveViewSelect(char *relname, List *aliases, Node *query)
 	return (Node *) s;
 }
 
+static VacuumOptions *
+makeVacOpt(VacuumOption flag, int nworkers)
+{
+	VacuumOptions *vacopt = palloc(sizeof(VacuumOptions));
+
+	vacopt->flags = flag;
+	vacopt->nworkers = nworkers;
+	return vacopt;
+}
+
 /* parser_init()
  * Initialize to parse one query string
  */
diff --git a/src/backend/postmaster/autovacuum.c b/src/backend/postmaster/autovacuum.c
index c04c0b5..ee7e6be 100644
--- a/src/backend/postmaster/autovacuum.c
+++ b/src/backend/postmaster/autovacuum.c
@@ -189,7 +189,7 @@ typedef struct av_relation
 typedef struct autovac_table
 {
 	Oid			at_relid;
-	int			at_vacoptions;	/* bitmask of VacuumOption */
+	VacuumOptions at_vacoptions;	/* contains bitmask of VacuumOption */
 	VacuumParams at_params;
 	int			at_vacuum_cost_delay;
 	int			at_vacuum_cost_limit;
@@ -2466,7 +2466,7 @@ do_autovacuum(void)
 			 * next table in our list.
 			 */
 			HOLD_INTERRUPTS();
-			if (tab->at_vacoptions & VACOPT_VACUUM)
+			if (tab->at_vacoptions.flags & VACOPT_VACUUM)
 				errcontext("automatic vacuum of table \"%s.%s.%s\"",
 						   tab->at_datname, tab->at_nspname, tab->at_relname);
 			else
@@ -2867,10 +2867,11 @@ table_recheck_autovac(Oid relid, HTAB *table_toast_map,
 		tab = palloc(sizeof(autovac_table));
 		tab->at_relid = relid;
 		tab->at_sharedrel = classForm->relisshared;
-		tab->at_vacoptions = VACOPT_SKIPTOAST |
+		tab->at_vacoptions.flags = VACOPT_SKIPTOAST |
 			(dovacuum ? VACOPT_VACUUM : 0) |
 			(doanalyze ? VACOPT_ANALYZE : 0) |
 			(!wraparound ? VACOPT_NOWAIT : 0);
+		tab->at_vacoptions.nworkers = 1;
 		tab->at_params.freeze_min_age = freeze_min_age;
 		tab->at_params.freeze_table_age = freeze_table_age;
 		tab->at_params.multixact_freeze_min_age = multixact_freeze_min_age;
@@ -3116,10 +3117,10 @@ autovac_report_activity(autovac_table *tab)
 	int			len;
 
 	/* Report the command and possible options */
-	if (tab->at_vacoptions & VACOPT_VACUUM)
+	if (tab->at_vacoptions.flags & VACOPT_VACUUM)
 		snprintf(activity, MAX_AUTOVAC_ACTIV_LEN,
 				 "autovacuum: VACUUM%s",
-				 tab->at_vacoptions & VACOPT_ANALYZE ? " ANALYZE" : "");
+				 tab->at_vacoptions.flags & VACOPT_ANALYZE ? " ANALYZE" : "");
 	else
 		snprintf(activity, MAX_AUTOVAC_ACTIV_LEN,
 				 "autovacuum: ANALYZE");
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 3a0b49c..93e1138 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -3610,6 +3610,12 @@ pgstat_get_wait_ipc(WaitEventIPC w)
 		case WAIT_EVENT_PARALLEL_BITMAP_SCAN:
 			event_name = "ParallelBitmapScan";
 			break;
+		case WAIT_EVENT_PARALLEL_VACUUM_PREPARE:
+			event_name = "ParallelVacuumPrepare";
+			break;
+		case WAIT_EVENT_PARALLEL_VACUUM_DONE:
+			event_name = "ParallelVacuumDone";
+			break;
 		case WAIT_EVENT_PROCARRAY_GROUP_UPDATE:
 			event_name = "ProcArrayGroupUpdate";
 			break;
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 82a707a..ead81ff 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -669,7 +669,7 @@ standard_ProcessUtility(PlannedStmt *pstmt,
 				VacuumStmt *stmt = (VacuumStmt *) parsetree;
 
 				/* we choose to allow this during "read only" transactions */
-				PreventCommandDuringRecovery((stmt->options & VACOPT_VACUUM) ?
+				PreventCommandDuringRecovery((stmt->options.flags & VACOPT_VACUUM) ?
 											 "VACUUM" : "ANALYZE");
 				/* forbidden in parallel mode due to CommandIsReadOnly */
 				ExecVacuum(stmt, isTopLevel);
@@ -2498,7 +2498,7 @@ CreateCommandTag(Node *parsetree)
 			break;
 
 		case T_VacuumStmt:
-			if (((VacuumStmt *) parsetree)->options & VACOPT_VACUUM)
+			if (((VacuumStmt *) parsetree)->options.flags & VACOPT_VACUUM)
 				tag = "VACUUM";
 			else
 				tag = "ANALYZE";
diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c
index 294ab70..159f2e4 100644
--- a/src/backend/utils/time/snapmgr.c
+++ b/src/backend/utils/time/snapmgr.c
@@ -2040,7 +2040,6 @@ EstimateSnapshotSpace(Snapshot snap)
 	Size		size;
 
 	Assert(snap != InvalidSnapshot);
-	Assert(snap->satisfies == HeapTupleSatisfiesMVCC);
 
 	/* We allocate any XID arrays needed in the same palloc block. */
 	size = add_size(sizeof(SerializedSnapshotData),
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 4e41024..57bea54 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -132,6 +132,7 @@ extern void heap_parallelscan_initialize(ParallelHeapScanDesc target,
 							 Relation relation, Snapshot snapshot);
 extern void heap_parallelscan_reinitialize(ParallelHeapScanDesc parallel_scan);
 extern HeapScanDesc heap_beginscan_parallel(Relation, ParallelHeapScanDesc);
+extern BlockNumber heap_parallelscan_nextpage(HeapScanDesc scan);
 
 extern bool heap_fetch(Relation relation, Snapshot snapshot,
 		   HeapTuple tuple, Buffer *userbuf, bool keep_buf,
diff --git a/src/include/commands/vacuum.h b/src/include/commands/vacuum.h
index 7a7b793..e7950d5 100644
--- a/src/include/commands/vacuum.h
+++ b/src/include/commands/vacuum.h
@@ -15,6 +15,8 @@
 #define VACUUM_H
 
 #include "access/htup.h"
+#include "access/heapam.h"
+#include "access/parallel.h"
 #include "catalog/pg_statistic.h"
 #include "catalog/pg_type.h"
 #include "nodes/parsenodes.h"
@@ -157,7 +159,7 @@ extern int	vacuum_multixact_freeze_table_age;
 
 /* in commands/vacuum.c */
 extern void ExecVacuum(VacuumStmt *vacstmt, bool isTopLevel);
-extern void vacuum(int options, List *relations, VacuumParams *params,
+extern void vacuum(VacuumOptions options, List *relations, VacuumParams *params,
 	   BufferAccessStrategy bstrategy, bool isTopLevel);
 extern void vac_open_indexes(Relation relation, LOCKMODE lockmode,
 				 int *nindexes, Relation **Irel);
@@ -187,8 +189,9 @@ extern void vac_update_datfrozenxid(void);
 extern void vacuum_delay_point(void);
 
 /* in commands/vacuumlazy.c */
-extern void lazy_vacuum_rel(Relation onerel, int options,
+extern void lazy_vacuum_rel(Relation onerel, VacuumOptions options,
 				VacuumParams *params, BufferAccessStrategy bstrategy);
+extern void LazyVacuumWorkerMain(dsm_segment *seg, shm_toc *toc);
 
 /* in commands/analyze.c */
 extern void analyze_rel(Oid relid, RangeVar *relation, int options,
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 50eec73..d9ef28c 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3095,9 +3095,16 @@ typedef enum VacuumOption
 	VACOPT_FULL = 1 << 4,		/* FULL (non-concurrent) vacuum */
 	VACOPT_NOWAIT = 1 << 5,		/* don't wait to get lock (autovacuum only) */
 	VACOPT_SKIPTOAST = 1 << 6,	/* don't process the TOAST table, if any */
-	VACOPT_DISABLE_PAGE_SKIPPING = 1 << 7	/* don't skip any pages */
+	VACOPT_DISABLE_PAGE_SKIPPING = 1 << 7,	/* don't skip any pages */
+	VACOPT_PARALLEL = 1 << 8	/* do VACUUM parallelly */
 } VacuumOption;
 
+typedef struct VacuumOptions
+{
+	VacuumOption flags; /* OR of VacuumOption flags */
+	int nworkers; /* # of parallel vacuum workers */
+} VacuumOptions;
+
 /*
  * Info about a single target table of VACUUM/ANALYZE.
  *
@@ -3116,7 +3123,7 @@ typedef struct VacuumRelation
 typedef struct VacuumStmt
 {
 	NodeTag		type;
-	int			options;		/* OR of VacuumOption flags */
+	VacuumOptions	options;
 	List	   *rels;			/* list of VacuumRelation, or NIL for all */
 } VacuumStmt;
 
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 089b7c3..e87a7bc 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -811,6 +811,8 @@ typedef enum
 	WAIT_EVENT_MQ_SEND,
 	WAIT_EVENT_PARALLEL_FINISH,
 	WAIT_EVENT_PARALLEL_BITMAP_SCAN,
+	WAIT_EVENT_PARALLEL_VACUUM_PREPARE,
+	WAIT_EVENT_PARALLEL_VACUUM_DONE,
 	WAIT_EVENT_PROCARRAY_GROUP_UPDATE,
 	WAIT_EVENT_CLOG_GROUP_UPDATE,
 	WAIT_EVENT_REPLICATION_ORIGIN_DROP,
diff --git a/src/test/regress/expected/vacuum.out b/src/test/regress/expected/vacuum.out
index c440c7e..b43ef0a 100644
--- a/src/test/regress/expected/vacuum.out
+++ b/src/test/regress/expected/vacuum.out
@@ -80,6 +80,9 @@ CONTEXT:  SQL function "do_analyze" statement 1
 SQL function "wrap_do_analyze" statement 1
 VACUUM FULL vactst;
 VACUUM (DISABLE_PAGE_SKIPPING) vaccluster;
+VACUUM (PARALLEL 2) vactst;
+VACUUM (PARALLEL 2, DISABLE_PAGE_SKIPPING) vactst;
+VACUUM (PARALLEL 2, FREEZE) vactst;
 -- partitioned table
 CREATE TABLE vacparted (a int, b char) PARTITION BY LIST (a);
 CREATE TABLE vacparted1 PARTITION OF vacparted FOR VALUES IN (1);
diff --git a/src/test/regress/sql/vacuum.sql b/src/test/regress/sql/vacuum.sql
index 92eaca2..ab9bc4c 100644
--- a/src/test/regress/sql/vacuum.sql
+++ b/src/test/regress/sql/vacuum.sql
@@ -61,6 +61,9 @@ VACUUM FULL vaccluster;
 VACUUM FULL vactst;
 
 VACUUM (DISABLE_PAGE_SKIPPING) vaccluster;
+VACUUM (PARALLEL 2) vactst;
+VACUUM (PARALLEL 2, DISABLE_PAGE_SKIPPING) vactst;
+VACUUM (PARALLEL 2, FREEZE) vactst;
 
 -- partitioned table
 CREATE TABLE vacparted (a int, b char) PARTITION BY LIST (a);
