From 6ada025d2b3ca57bb071556a63d8f56e908562b6 Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Mon, 4 Mar 2019 09:31:41 +0900
Subject: [PATCH v16 2/3] Add parallel option to VACUUM command

In parallel vacuum, we do both index vacuum and cleanup vacuum
in parallel with parallel worker processes if the table has
more than one index. All processes including the leader process
process indexes one by one.

Parallel vacuum can be performed by specifying like
VACUUM (PARALLEL 2) tbl, meaning that performing vacuum with 2
parallel worker processes.

The parallel vacuum degree is limited by both the number of
indexes the table has and max_parallel_maintenance_workers.
---
 doc/src/sgml/config.sgml              |  14 +-
 doc/src/sgml/ref/vacuum.sgml          |  20 +
 src/backend/access/heap/vacuumlazy.c  | 855 ++++++++++++++++++++++++++++++----
 src/backend/access/transam/parallel.c |   4 +
 src/backend/commands/vacuum.c         |   6 +
 src/backend/nodes/copyfuncs.c         |   1 +
 src/backend/nodes/equalfuncs.c        |   1 +
 src/backend/parser/gram.y             |  62 ++-
 src/backend/postmaster/autovacuum.c   |   1 +
 src/bin/psql/tab-complete.c           |   3 +-
 src/include/access/heapam.h           |   3 +
 src/include/nodes/parsenodes.h        |   4 +-
 src/test/regress/expected/vacuum.out  |   2 +
 src/test/regress/sql/vacuum.sql       |   3 +
 14 files changed, 851 insertions(+), 128 deletions(-)

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 6d42b7a..840eadd 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -2209,13 +2209,13 @@ include_dir 'conf.d'
        <listitem>
         <para>
          Sets the maximum number of parallel workers that can be
-         started by a single utility command.  Currently, the only
-         parallel utility command that supports the use of parallel
-         workers is <command>CREATE INDEX</command>, and only when
-         building a B-tree index.  Parallel workers are taken from the
-         pool of processes established by <xref
-         linkend="guc-max-worker-processes"/>, limited by <xref
-         linkend="guc-max-parallel-workers"/>.  Note that the requested
+         started by a single utility command.  Currently, the parallel
+         utility commands that support the use of parallel workers are
+         <command>CREATE INDEX</command> only when building a B-tree index,
+         and <command>VACUUM</command> without <literal>FULL</literal>
+         option. Parallel workers are taken from the pool of processes
+         established by <xref linkend="guc-max-worker-processes"/>, limited
+         by <xref linkend="guc-max-parallel-workers"/>.  Note that the requested
          number of workers may not actually be available at run time.
          If this occurs, the utility operation will run with fewer
          workers than expected.  The default value is 2.  Setting this
diff --git a/doc/src/sgml/ref/vacuum.sgml b/doc/src/sgml/ref/vacuum.sgml
index fd911f5..1d7a002 100644
--- a/doc/src/sgml/ref/vacuum.sgml
+++ b/doc/src/sgml/ref/vacuum.sgml
@@ -30,6 +30,7 @@ VACUUM [ FULL ] [ FREEZE ] [ VERBOSE ] [ ANALYZE ] [ <replaceable class="paramet
     FREEZE
     VERBOSE
     ANALYZE
+    PARALLEL [ <replaceable class="parameter">N</replaceable> ]
     DISABLE_PAGE_SKIPPING
     SKIP_LOCKED
 
@@ -143,6 +144,25 @@ VACUUM [ FULL ] [ FREEZE ] [ VERBOSE ] [ ANALYZE ] [ <replaceable class="paramet
    </varlistentry>
 
    <varlistentry>
+    <term><literal>PARALLEL <replaceable class="parameter">N</replaceable></literal></term>
+    <listitem>
+     <para>
+      Perform vacuum index and cleanup index phases of <command>VACUUM</command>
+      in parallel using <replaceable class="parameter">N</replaceable> background
+      workers (for the detail of each vacuum phases, please refer to
+      <xref linkend="vacuum-phases"/>). Only one worker can be used per index.
+      Workers for vacuum launches before starting each phases and exit at the end
+      of the phase. If the parallel degree
+      <replaceable class="parameter">N</replaceable> is omitted, then
+      <command>VACUUM</command> decides the number of workers based on number of
+      indexes on the relation which further limited by
+      <xref linkend="guc-max-parallel-workers-maintenance"/>. This option can not
+      use with  <literal>FULL</literal> option.
+     </para>
+    </listitem>
+   </varlistentry>
+
+   <varlistentry>
     <term><literal>DISABLE_PAGE_SKIPPING</literal></term>
     <listitem>
      <para>
diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c
index 2c33bf6..5f1eed4 100644
--- a/src/backend/access/heap/vacuumlazy.c
+++ b/src/backend/access/heap/vacuumlazy.c
@@ -22,6 +22,19 @@
  * of index scans performed.  So we don't use maintenance_work_mem memory for
  * the TID array, just enough to hold as many heap tuples as fit on one page.
  *
+ * Lazy vacuum supports parallel execution with parallel worker processes. In
+ * parallel vacuum, we perform both index vacuuming and index cleanup in
+ * parallel. Individual indexes is processed by one vacuum process. At beginning
+ * of lazy vacuum (at lazy_scan_heap) we prepare the parallel context and
+ * initialize the shared memory segments that contains shared information as
+ * well as the memory space for dead tuples. When starting either index vacuuming
+ * or index cleanup, we launch parallel worker processes. Once all indexes are
+ * processed the parallel worker processes exit and the leader process
+ * re-initializes the shared memory segment. Note that all parallel workers live
+ * during one either index vacuuming or cleanup index but the leader process neither
+ * exits from the parallel mode nor destroys the parallel context. For updating
+ * the index statistics, since any updates are not allowed during parallel mode
+ * we update the index statistics after exited from the parallel mode.
  *
  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
@@ -41,8 +54,10 @@
 #include "access/heapam_xlog.h"
 #include "access/htup_details.h"
 #include "access/multixact.h"
+#include "access/parallel.h"
 #include "access/transam.h"
 #include "access/visibilitymap.h"
+#include "access/xact.h"
 #include "access/xlog.h"
 #include "catalog/storage.h"
 #include "commands/dbcommands.h"
@@ -55,6 +70,7 @@
 #include "storage/bufmgr.h"
 #include "storage/freespace.h"
 #include "storage/lmgr.h"
+#include "tcop/tcopprot.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
 #include "utils/pg_rusage.h"
@@ -110,6 +126,88 @@
  */
 #define PREFETCH_SIZE			((BlockNumber) 32)
 
+/*
+ * DSM keys for parallel lazy vacuum. Since we don't need to worry about DSM
+ * keys conflicting with plan_node_id we can use small integers.
+ */
+#define PARALLEL_VACUUM_KEY_SHARED			1
+#define PARALLEL_VACUUM_KEY_DEAD_TUPLES		2
+#define PARALLEL_VACUUM_KEY_QUERY_TEXT		3
+
+/*
+ * Structs for an index bulk-deletion statistic that is used for parallel
+ * lazy vacuum. This is allocated in a dynamic shared memory segment.
+ */
+typedef struct LVIndStats
+{
+	bool updated;	/* are the stats updated? */
+	IndexBulkDeleteResult stats;
+} LVIndStats;
+
+/*
+ * LVDeadTuples stores the dead tuple TIDs collected during heap scan.
+ * This is allocated in a dynamic shared memory segment when parallel
+ * lazy vacuum mode, or allocated in a local memory.
+ */
+typedef struct LVDeadTuples
+{
+	int			max_tuples;	/* # slots allocated in array */
+	int			num_tuples;	/* current # of entries */
+	/* List of TIDs of tuples we intend to delete */
+	/* NB: this list is ordered by TID address */
+	ItemPointerData itemptrs[FLEXIBLE_ARRAY_MEMBER];	/* array of ItemPointerData */
+} LVDeadTuples;
+#define SizeOfLVDeadTuples offsetof(LVDeadTuples, itemptrs) + sizeof(ItemPointerData)
+
+/*
+ * Shared information among parallel workers. So this is allocated in
+ * a dynamic shared memory segment.
+ */
+typedef struct LVShared
+{
+	/*
+	 * Target table relid and vacuum settings. These fields are not modified
+	 * during the lazy vacuum.
+	 */
+	Oid		relid;
+	bool	is_wraparound;
+	int		elevel;
+
+	/*
+	 * An indication for vacuum workers of doing either vacuuming index or
+	 * cleanup index.
+	 */
+	bool	for_cleanup;
+
+	/*
+	 * Fields for vacuum index or cleanup index, or both necessary for
+	 * IndexVacuumInfo.
+	 *
+	 * reltuples is the total number of input heap tuples. We set either an
+	 * old live tuples in vacuum index or th new live tuples in cleanup index.
+	 *
+	 * estimated_count is true if the reltuples is estimated value.
+	 */
+	double	reltuples;
+	bool	estimated_count;
+
+	/*
+	 * Variables to control parallel index vacuuming. An variable-sized field
+	 * 'indstats' must come last.
+	 */
+	pg_atomic_uint32	nprocessed;
+	LVIndStats			indstats[FLEXIBLE_ARRAY_MEMBER];
+} LVShared;
+#define SizeOfLVShared offsetof(LVShared, indstats) + sizeof(LVIndStats)
+
+/* Struct for parallel lazy vacuum */
+typedef struct LVParallelState
+{
+	ParallelContext	*pcxt;
+	LVShared		*lvshared;
+	int				nworkers_requested;	/* user-requested parallel degree */
+} LVParallelState;
+
 typedef struct LVRelStats
 {
 	/* hasindex = true means two-pass strategy; false means one-pass */
@@ -128,17 +226,12 @@ typedef struct LVRelStats
 	BlockNumber pages_removed;
 	double		tuples_deleted;
 	BlockNumber nonempty_pages; /* actually, last nonempty page + 1 */
-	/* List of TIDs of tuples we intend to delete */
-	/* NB: this list is ordered by TID address */
-	int			num_dead_tuples;	/* current # of entries */
-	int			max_dead_tuples;	/* # slots allocated in array */
-	ItemPointer dead_tuples;	/* array of ItemPointerData */
+	LVDeadTuples *dead_tuples;
 	int			num_index_scans;
 	TransactionId latestRemovedXid;
 	bool		lock_waiter_detected;
 } LVRelStats;
 
-
 /* A few variables that don't seem worth passing around as parameters */
 static int	elevel = -1;
 
@@ -152,15 +245,17 @@ static BufferAccessStrategy vac_strategy;
 /* non-export function prototypes */
 static void lazy_scan_heap(Relation onerel, VacuumOptions *options,
 			   LVRelStats *vacrelstats, Relation *Irel, int nindexes,
-			   bool aggressive);
+			   bool aggressive, bool is_wraparound);
 static void lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats, BlockNumber nblocks);
 static bool lazy_check_needs_freeze(Buffer buf, bool *hastup);
-static void lazy_vacuum_index(Relation indrel,
-				  IndexBulkDeleteResult **stats,
-				  LVRelStats *vacrelstats);
-static void lazy_cleanup_index(Relation indrel,
-				   IndexBulkDeleteResult *stats,
-				   LVRelStats *vacrelstats);
+static IndexBulkDeleteResult *lazy_vacuum_index(Relation indrel,
+									IndexBulkDeleteResult *stats,
+									double reltuples,
+									LVDeadTuples	*dead_tuples);
+static IndexBulkDeleteResult *lazy_cleanup_index(Relation indrel,
+									IndexBulkDeleteResult *stats,
+									double reltuples, bool estimated_count,
+									bool update_stats);
 static int lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer,
 				 int tupindex, LVRelStats *vacrelstats, Buffer *vmbuffer);
 static bool should_attempt_truncation(LVRelStats *vacrelstats);
@@ -168,13 +263,27 @@ static void lazy_truncate_heap(Relation onerel, LVRelStats *vacrelstats);
 static BlockNumber count_nondeletable_pages(Relation onerel,
 						 LVRelStats *vacrelstats);
 static void lazy_space_alloc(LVRelStats *vacrelstats, BlockNumber relblocks);
-static void lazy_record_dead_tuple(LVRelStats *vacrelstats,
-					   ItemPointer itemptr);
+static void lazy_record_dead_tuple(LVDeadTuples *dead_tuples, ItemPointer itemptr);
 static bool lazy_tid_reaped(ItemPointer itemptr, void *state);
 static int	vac_cmp_itemptr(const void *left, const void *right);
 static bool heap_page_is_all_visible(Relation rel, Buffer buf,
 						 TransactionId *visibility_cutoff_xid, bool *all_frozen);
-
+static LVParallelState *lazy_prepare_parallel(LVRelStats *vacrelstats, Oid relid,
+											  BlockNumber nblocks, int nindexes,
+											  int nrequested, bool is_wraparound);
+static void lazy_end_parallel(LVParallelState *lps, Relation *Irel, int nindexes,
+							  bool update_indstats);
+static bool lazy_begin_parallel_vacuum_index(LVParallelState *lps, LVRelStats *vacrelstats,
+											 bool for_cleanup);
+static void lazy_end_parallel_vacuum_index(LVParallelState *lps, bool for_cleanup);
+static void lazy_vacuum_all_indexes(LVRelStats *vacrelstats, Relation *Irel, int nindexes,
+									IndexBulkDeleteResult **stats,
+									LVParallelState *lps, bool for_cleanup);
+static void lazy_vacuum_indexes_for_worker(Relation *indrels, int nindexes,
+										   LVShared *lvshared, LVDeadTuples *dead_tuples,
+										   bool for_cleanup);
+static int compute_parallel_workers(Relation onerel, int nrequested, int nindexes);
+static long compute_max_dead_tuples(BlockNumber relblocks, bool hasindex);
 
 /*
  *	heap_vacuum_rel() -- perform VACUUM for one heap relation
@@ -261,7 +370,8 @@ heap_vacuum_rel(Relation onerel, VacuumOptions *options, VacuumParams *params,
 	vacrelstats->hasindex = (nindexes > 0);
 
 	/* Do the vacuuming */
-	lazy_scan_heap(onerel, options, vacrelstats, Irel, nindexes, aggressive);
+	lazy_scan_heap(onerel, options, vacrelstats, Irel, nindexes, aggressive,
+				   params->is_wraparound);
 
 	/* Done with indexes */
 	vac_close_indexes(nindexes, Irel, NoLock);
@@ -464,14 +574,29 @@ vacuum_log_cleanup_info(Relation rel, LVRelStats *vacrelstats)
  *		dead-tuple TIDs, invoke vacuuming of indexes and call lazy_vacuum_heap
  *		to reclaim dead line pointers.
  *
+ *		If the table has more than one index and parallel lazy vacuum is requested,
+ *		we execute both index vacuuming and cleanup index with parallel workers. When
+ *		allocating the space for lazy scan heap, we enter the parallel mode, create
+ *		the parallel context and initailize a dynamic shared memory segment for dead
+ *		tuples. The dead_tuples points either to a dynamic shared memory segment in
+ *		parallel vacuum case or to a local memory in single process vacuum case.
+ *		Before starting	parallel index vacuuming and parallel cleanup index we launch
+ *		parallel workers. All parallel workers will exit after processed all indexes
+ *		and the leader process re-initialize parallel context and then re-launch them
+ *		at the next execution. The index statistics are updated by the leader after
+ *		exited from the parallel mode since all writes are not allowed during the
+ *		parallel mode.
+ *
  *		If there are no indexes then we can reclaim line pointers on the fly;
  *		dead line pointers need only be retained until all index pointers that
  *		reference them have been killed.
  */
 static void
 lazy_scan_heap(Relation onerel, VacuumOptions *options, LVRelStats *vacrelstats,
-			   Relation *Irel, int nindexes, bool aggressive)
+			   Relation *Irel, int nindexes, bool aggressive, bool is_wraparound)
 {
+	LVParallelState *lps = NULL;	/* non-NULL means ready for parallel vacuum */
+	LVDeadTuples *dead_tuples;
 	BlockNumber nblocks,
 				blkno;
 	HeapTupleData tuple;
@@ -486,7 +611,7 @@ lazy_scan_heap(Relation onerel, VacuumOptions *options, LVRelStats *vacrelstats,
 				tups_vacuumed,	/* tuples cleaned up by vacuum */
 				nkeep,			/* dead-but-not-removable tuples */
 				nunused;		/* unused item pointers */
-	IndexBulkDeleteResult **indstats;
+	IndexBulkDeleteResult **indstats = NULL;
 	int			i;
 	PGRUsage	ru0;
 	Buffer		vmbuffer = InvalidBuffer;
@@ -494,6 +619,7 @@ lazy_scan_heap(Relation onerel, VacuumOptions *options, LVRelStats *vacrelstats,
 	bool		skipping_blocks;
 	xl_heap_freeze_tuple *frozen;
 	StringInfoData buf;
+	int			parallel_workers = 0;
 	const int	initprog_index[] = {
 		PROGRESS_VACUUM_PHASE,
 		PROGRESS_VACUUM_TOTAL_HEAP_BLKS,
@@ -519,9 +645,6 @@ lazy_scan_heap(Relation onerel, VacuumOptions *options, LVRelStats *vacrelstats,
 	next_fsm_block_to_vacuum = (BlockNumber) 0;
 	num_tuples = live_tuples = tups_vacuumed = nkeep = nunused = 0;
 
-	indstats = (IndexBulkDeleteResult **)
-		palloc0(nindexes * sizeof(IndexBulkDeleteResult *));
-
 	nblocks = RelationGetNumberOfBlocks(onerel);
 	vacrelstats->rel_pages = nblocks;
 	vacrelstats->scanned_pages = 0;
@@ -529,13 +652,47 @@ lazy_scan_heap(Relation onerel, VacuumOptions *options, LVRelStats *vacrelstats,
 	vacrelstats->nonempty_pages = 0;
 	vacrelstats->latestRemovedXid = InvalidTransactionId;
 
-	lazy_space_alloc(vacrelstats, nblocks);
+	/*
+	 * Compute the number of parallel vacuum worker to request and then enable
+	 * parallel lazy vacuum.
+	 */
+	if ((options->flags & VACOPT_PARALLEL) != 0)
+		parallel_workers = compute_parallel_workers(onerel,
+													options->nworkers,
+													nindexes);
+
+	if (parallel_workers > 0)
+	{
+		/* Enter the parallel mode and prepare parallel vacuum */
+		lps = lazy_prepare_parallel(vacrelstats,
+									RelationGetRelid(onerel),
+									nblocks, nindexes,
+									parallel_workers, is_wraparound);
+		lps->nworkers_requested = options->nworkers;
+	}
+	else
+	{
+		/* Allocate the memory space for dead tuples locally */
+		lazy_space_alloc(vacrelstats, nblocks);
+	}
+
+	dead_tuples = vacrelstats->dead_tuples;
 	frozen = palloc(sizeof(xl_heap_freeze_tuple) * MaxHeapTuplesPerPage);
 
+	/*
+	 * Allocate the memory for index bulk-delete results if in the single vacuum
+	 * mode. In parallel mode, we've already prepared it in the shared memory
+	 * segment.
+	 */
+	if (!lps)
+		indstats = (IndexBulkDeleteResult **)
+			palloc0(nindexes * sizeof(IndexBulkDeleteResult *));
+
+
 	/* Report that we're scanning the heap, advertising total # of blocks */
 	initprog_val[0] = PROGRESS_VACUUM_PHASE_SCAN_HEAP;
 	initprog_val[1] = nblocks;
-	initprog_val[2] = vacrelstats->max_dead_tuples;
+	initprog_val[2] = dead_tuples->max_tuples;
 	pgstat_progress_update_multi_param(3, initprog_index, initprog_val);
 
 	/*
@@ -713,8 +870,8 @@ lazy_scan_heap(Relation onerel, VacuumOptions *options, LVRelStats *vacrelstats,
 		 * If we are close to overrunning the available space for dead-tuple
 		 * TIDs, pause and do a cycle of vacuuming before we tackle this page.
 		 */
-		if ((vacrelstats->max_dead_tuples - vacrelstats->num_dead_tuples) < MaxHeapTuplesPerPage &&
-			vacrelstats->num_dead_tuples > 0)
+		if ((dead_tuples->max_tuples - dead_tuples->num_tuples) < MaxHeapTuplesPerPage &&
+			dead_tuples->num_tuples > 0)
 		{
 			const int	hvp_index[] = {
 				PROGRESS_VACUUM_PHASE,
@@ -742,10 +899,8 @@ lazy_scan_heap(Relation onerel, VacuumOptions *options, LVRelStats *vacrelstats,
 										 PROGRESS_VACUUM_PHASE_VACUUM_INDEX);
 
 			/* Remove index entries */
-			for (i = 0; i < nindexes; i++)
-				lazy_vacuum_index(Irel[i],
-								  &indstats[i],
-								  vacrelstats);
+			lazy_vacuum_all_indexes(vacrelstats, Irel, nindexes, indstats,
+									lps, false);
 
 			/*
 			 * Report that we are now vacuuming the heap.  We also increase
@@ -765,7 +920,7 @@ lazy_scan_heap(Relation onerel, VacuumOptions *options, LVRelStats *vacrelstats,
 			 * not to reset latestRemovedXid since we want that value to be
 			 * valid.
 			 */
-			vacrelstats->num_dead_tuples = 0;
+			dead_tuples->num_tuples = 0;
 			vacrelstats->num_index_scans++;
 
 			/*
@@ -961,7 +1116,7 @@ lazy_scan_heap(Relation onerel, VacuumOptions *options, LVRelStats *vacrelstats,
 		has_dead_tuples = false;
 		nfrozen = 0;
 		hastup = false;
-		prev_dead_count = vacrelstats->num_dead_tuples;
+		prev_dead_count = dead_tuples->num_tuples;
 		maxoff = PageGetMaxOffsetNumber(page);
 
 		/*
@@ -1000,7 +1155,7 @@ lazy_scan_heap(Relation onerel, VacuumOptions *options, LVRelStats *vacrelstats,
 			 */
 			if (ItemIdIsDead(itemid))
 			{
-				lazy_record_dead_tuple(vacrelstats, &(tuple.t_self));
+				lazy_record_dead_tuple(dead_tuples, &(tuple.t_self));
 				all_visible = false;
 				continue;
 			}
@@ -1140,7 +1295,7 @@ lazy_scan_heap(Relation onerel, VacuumOptions *options, LVRelStats *vacrelstats,
 
 			if (tupgone)
 			{
-				lazy_record_dead_tuple(vacrelstats, &(tuple.t_self));
+				lazy_record_dead_tuple(dead_tuples, &(tuple.t_self));
 				HeapTupleHeaderAdvanceLatestRemovedXid(tuple.t_data,
 													   &vacrelstats->latestRemovedXid);
 				tups_vacuumed += 1;
@@ -1209,8 +1364,7 @@ lazy_scan_heap(Relation onerel, VacuumOptions *options, LVRelStats *vacrelstats,
 		 * If there are no indexes then we can vacuum the page right now
 		 * instead of doing a second scan.
 		 */
-		if (nindexes == 0 &&
-			vacrelstats->num_dead_tuples > 0)
+		if (nindexes == 0 && dead_tuples->num_tuples > 0)
 		{
 			/* Remove tuples from heap */
 			lazy_vacuum_page(onerel, blkno, buf, 0, vacrelstats, &vmbuffer);
@@ -1221,7 +1375,7 @@ lazy_scan_heap(Relation onerel, VacuumOptions *options, LVRelStats *vacrelstats,
 			 * not to reset latestRemovedXid since we want that value to be
 			 * valid.
 			 */
-			vacrelstats->num_dead_tuples = 0;
+			dead_tuples->num_tuples = 0;
 			vacuumed_pages++;
 
 			/*
@@ -1337,7 +1491,7 @@ lazy_scan_heap(Relation onerel, VacuumOptions *options, LVRelStats *vacrelstats,
 		 * page, so remember its free space as-is.  (This path will always be
 		 * taken if there are no indexes.)
 		 */
-		if (vacrelstats->num_dead_tuples == prev_dead_count)
+		if (dead_tuples->num_tuples == prev_dead_count)
 			RecordPageWithFreeSpace(onerel, blkno, freespace, nblocks);
 	}
 
@@ -1371,7 +1525,7 @@ lazy_scan_heap(Relation onerel, VacuumOptions *options, LVRelStats *vacrelstats,
 
 	/* If any tuples need to be deleted, perform final vacuum cycle */
 	/* XXX put a threshold on min number of tuples here? */
-	if (vacrelstats->num_dead_tuples > 0)
+	if (dead_tuples->num_tuples > 0)
 	{
 		const int	hvp_index[] = {
 			PROGRESS_VACUUM_PHASE,
@@ -1387,10 +1541,8 @@ lazy_scan_heap(Relation onerel, VacuumOptions *options, LVRelStats *vacrelstats,
 									 PROGRESS_VACUUM_PHASE_VACUUM_INDEX);
 
 		/* Remove index entries */
-		for (i = 0; i < nindexes; i++)
-			lazy_vacuum_index(Irel[i],
-							  &indstats[i],
-							  vacrelstats);
+		lazy_vacuum_all_indexes(vacrelstats, Irel, nindexes, indstats,
+								lps, false);
 
 		/* Report that we are now vacuuming the heap */
 		hvp_val[0] = PROGRESS_VACUUM_PHASE_VACUUM_HEAP;
@@ -1417,8 +1569,12 @@ lazy_scan_heap(Relation onerel, VacuumOptions *options, LVRelStats *vacrelstats,
 								 PROGRESS_VACUUM_PHASE_INDEX_CLEANUP);
 
 	/* Do post-vacuum cleanup and statistics update for each index */
-	for (i = 0; i < nindexes; i++)
-		lazy_cleanup_index(Irel[i], indstats[i], vacrelstats);
+	lazy_vacuum_all_indexes(vacrelstats, Irel, nindexes, indstats,
+							lps, true);
+
+	/* End parallel vacuum, update index statistics */
+	if (lps)
+		lazy_end_parallel(lps, Irel, nindexes, true);
 
 	/* If no indexes, make log report that lazy_vacuum_heap would've made */
 	if (vacuumed_pages)
@@ -1485,7 +1641,7 @@ lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats, BlockNumber nblocks)
 	npages = 0;
 
 	tupindex = 0;
-	while (tupindex < vacrelstats->num_dead_tuples)
+	while (tupindex < vacrelstats->dead_tuples->num_tuples)
 	{
 		BlockNumber tblk;
 		Buffer		buf;
@@ -1494,7 +1650,7 @@ lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats, BlockNumber nblocks)
 
 		vacuum_delay_point();
 
-		tblk = ItemPointerGetBlockNumber(&vacrelstats->dead_tuples[tupindex]);
+		tblk = ItemPointerGetBlockNumber(&vacrelstats->dead_tuples->itemptrs[tupindex]);
 		buf = ReadBufferExtended(onerel, MAIN_FORKNUM, tblk, RBM_NORMAL,
 								 vac_strategy);
 		if (!ConditionalLockBufferForCleanup(buf))
@@ -1503,8 +1659,8 @@ lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats, BlockNumber nblocks)
 			++tupindex;
 			continue;
 		}
-		tupindex = lazy_vacuum_page(onerel, tblk, buf, tupindex, vacrelstats,
-									&vmbuffer);
+		tupindex = lazy_vacuum_page(onerel, tblk, buf, tupindex,
+									vacrelstats, &vmbuffer);
 
 		/* Now that we've compacted the page, record its available space */
 		page = BufferGetPage(buf);
@@ -1542,6 +1698,7 @@ static int
 lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer,
 				 int tupindex, LVRelStats *vacrelstats, Buffer *vmbuffer)
 {
+	LVDeadTuples	*dead_tuples = vacrelstats->dead_tuples;
 	Page		page = BufferGetPage(buffer);
 	OffsetNumber unused[MaxOffsetNumber];
 	int			uncnt = 0;
@@ -1552,16 +1709,16 @@ lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer,
 
 	START_CRIT_SECTION();
 
-	for (; tupindex < vacrelstats->num_dead_tuples; tupindex++)
+	for (; tupindex < dead_tuples->num_tuples; tupindex++)
 	{
 		BlockNumber tblk;
 		OffsetNumber toff;
 		ItemId		itemid;
 
-		tblk = ItemPointerGetBlockNumber(&vacrelstats->dead_tuples[tupindex]);
+		tblk = ItemPointerGetBlockNumber(&dead_tuples->itemptrs[tupindex]);
 		if (tblk != blkno)
 			break;				/* past end of tuples for this block */
-		toff = ItemPointerGetOffsetNumber(&vacrelstats->dead_tuples[tupindex]);
+		toff = ItemPointerGetOffsetNumber(&dead_tuples->itemptrs[tupindex]);
 		itemid = PageGetItemId(page, toff);
 		ItemIdSetUnused(itemid);
 		unused[uncnt++] = toff;
@@ -1682,6 +1839,94 @@ lazy_check_needs_freeze(Buffer buf, bool *hastup)
 	return false;
 }
 
+/*
+ * Vacuum or cleanup all indexes. If we're ready to do parallel vacuum it's
+ * performed with parallel workers. So this function must be used by the parallel
+ * vacuum leader process.
+ *
+ * In parallel lazy vacuum, we copy the index bulk-deletion results returned from
+ * ambulkdelete and amvacuumcleanup to the shared memory because they are allocated
+ * locally and it's possible that an index will be vacuumed by the different vacuum
+ * process at the next time.
+ *
+ * Since all vacuum workers write the bulk-delete result at different slot we can
+ * write them without locking.
+ */
+static void
+lazy_vacuum_all_indexes(LVRelStats *vacrelstats, Relation *Irel, int nindexes,
+						IndexBulkDeleteResult **stats, LVParallelState *lps,
+						bool for_cleanup)
+{
+	int			nprocessed = 0;
+	bool		do_parallel = false;
+	int			idx;
+
+	Assert(!IsParallelWorker());
+
+	/* no job if the table has no index */
+	if (nindexes <= 0)
+		return;
+
+	/* Launch parallel vacuum workers if we're ready */
+	if (lps)
+		do_parallel = lazy_begin_parallel_vacuum_index(lps, vacrelstats,
+													   for_cleanup);
+
+	for (;;)
+	{
+		IndexBulkDeleteResult *result = NULL;
+
+		/* Get the next index number to vacuum and set index statistics */
+		if (do_parallel)
+		{
+			idx = pg_atomic_fetch_add_u32(&(lps->lvshared->nprocessed), 1);
+
+			/*
+			 * If there is already-updated result in the shared memory we
+			 * use it. Otherwise we pass NULL ot index AMs as they expect
+			 * NULL for the first time execution.
+			 */
+			if (lps->lvshared->indstats[idx].updated)
+				result = &(lps->lvshared->indstats[idx].stats);
+		}
+		else
+		{
+			idx = nprocessed++;
+			result = stats[idx];
+		}
+
+		/* Done for all indexes? */
+		if (idx >= nindexes)
+			break;
+
+		/*
+		 * Do vacuuming or cleanup one index. For cleanup index, we don't update
+		 * index statistics during parallel mode.
+		 */
+		if (for_cleanup)
+			result = lazy_cleanup_index(Irel[idx], result,
+										vacrelstats->new_rel_tuples,
+										vacrelstats->tupcount_pages < vacrelstats->rel_pages,
+										!do_parallel);
+		else
+			result = lazy_vacuum_index(Irel[idx], result,
+									   vacrelstats->old_rel_pages,
+									   vacrelstats->dead_tuples);
+
+		if (do_parallel && result)
+		{
+			/* Save index bulk-deletion result to the shared memory space */
+			memcpy(&(lps->lvshared->indstats[idx].stats), result,
+				   sizeof(IndexBulkDeleteResult));
+
+			/* Set true to pass the saved results at the next time */
+			lps->lvshared->indstats[idx].updated = true;
+		}
+	}
+
+	if (do_parallel)
+		lazy_end_parallel_vacuum_index(lps, for_cleanup);
+}
 
 /*
  *	lazy_vacuum_index() -- vacuum one index relation.
@@ -1689,12 +1934,13 @@ lazy_check_needs_freeze(Buffer buf, bool *hastup)
  *		Delete all the index entries pointing to tuples listed in
  *		vacrelstats->dead_tuples, and update running statistics.
  */
-static void
-lazy_vacuum_index(Relation indrel,
-				  IndexBulkDeleteResult **stats,
-				  LVRelStats *vacrelstats)
+static IndexBulkDeleteResult *
+lazy_vacuum_index(Relation indrel, IndexBulkDeleteResult *stats,
+				  double reltuples, LVDeadTuples *dead_tuples)
 {
+	IndexBulkDeleteResult *res;
 	IndexVacuumInfo ivinfo;
+	char		*msg;
 	PGRUsage	ru0;
 
 	pg_rusage_init(&ru0);
@@ -1703,57 +1949,56 @@ lazy_vacuum_index(Relation indrel,
 	ivinfo.analyze_only = false;
 	ivinfo.estimated_count = true;
 	ivinfo.message_level = elevel;
-	/* We can only provide an approximate value of num_heap_tuples here */
-	ivinfo.num_heap_tuples = vacrelstats->old_live_tuples;
+	ivinfo.num_heap_tuples = reltuples;
 	ivinfo.strategy = vac_strategy;
 
 	/* Do bulk deletion */
-	*stats = index_bulk_delete(&ivinfo, *stats,
-							   lazy_tid_reaped, (void *) vacrelstats);
+	res = index_bulk_delete(&ivinfo, stats,
+							lazy_tid_reaped, (void *) dead_tuples);
 
+	if (IsParallelWorker())
+		msg = "scanned index \"%s\" to remove %d row versions by parallel vacuum worker";
+	else
+		msg = "scanned index \"%s\" to remove %d row versions";
 	ereport(elevel,
-			(errmsg("scanned index \"%s\" to remove %d row versions",
+			(errmsg(msg,
 					RelationGetRelationName(indrel),
-					vacrelstats->num_dead_tuples),
+					dead_tuples->num_tuples),
 			 errdetail_internal("%s", pg_rusage_show(&ru0))));
+
+	return res;
 }
 
 /*
  *	lazy_cleanup_index() -- do post-vacuum cleanup for one index relation.
  */
-static void
-lazy_cleanup_index(Relation indrel,
-				   IndexBulkDeleteResult *stats,
-				   LVRelStats *vacrelstats)
+static IndexBulkDeleteResult *
+lazy_cleanup_index(Relation indrel, IndexBulkDeleteResult *stats,
+				   double reltuples, bool estimated_count, bool update_stats)
 {
 	IndexVacuumInfo ivinfo;
+	char		*msg;
 	PGRUsage	ru0;
 
 	pg_rusage_init(&ru0);
 
 	ivinfo.index = indrel;
 	ivinfo.analyze_only = false;
-	ivinfo.estimated_count = (vacrelstats->tupcount_pages < vacrelstats->rel_pages);
+	ivinfo.estimated_count = estimated_count;
 	ivinfo.message_level = elevel;
-
-	/*
-	 * Now we can provide a better estimate of total number of surviving
-	 * tuples (we assume indexes are more interested in that than in the
-	 * number of nominally live tuples).
-	 */
-	ivinfo.num_heap_tuples = vacrelstats->new_rel_tuples;
+	ivinfo.num_heap_tuples = reltuples;
 	ivinfo.strategy = vac_strategy;
 
 	stats = index_vacuum_cleanup(&ivinfo, stats);
 
 	if (!stats)
-		return;
+		return NULL;
 
 	/*
 	 * Now update statistics in pg_class, but only if the index says the count
 	 * is accurate.
 	 */
-	if (!stats->estimated_count)
+	if (!stats->estimated_count && update_stats)
 		vac_update_relstats(indrel,
 							stats->num_pages,
 							stats->num_index_tuples,
@@ -1763,8 +2008,13 @@ lazy_cleanup_index(Relation indrel,
 							InvalidMultiXactId,
 							false);
 
+	if (IsParallelWorker())
+		msg = "index \"%s\" now contains %.0f row versions in %u pages, reported by parallel vacuum worker";
+	else
+		msg = "index \"%s\" now contains %.0f row versions in %u pages";
+
 	ereport(elevel,
-			(errmsg("index \"%s\" now contains %.0f row versions in %u pages",
+			(errmsg(msg,
 					RelationGetRelationName(indrel),
 					stats->num_index_tuples,
 					stats->num_pages),
@@ -1775,7 +2025,14 @@ lazy_cleanup_index(Relation indrel,
 					   stats->pages_deleted, stats->pages_free,
 					   pg_rusage_show(&ru0))));
 
-	pfree(stats);
+	if (update_stats)
+	{
+		/* Must not in parallel mode as the stats is allocated in DSM */
+		Assert(!IsInParallelMode());
+		pfree(stats);
+	}
+
+	return stats;
 }
 
 /*
@@ -2080,19 +2337,17 @@ count_nondeletable_pages(Relation onerel, LVRelStats *vacrelstats)
 }
 
 /*
- * lazy_space_alloc - space allocation decisions for lazy vacuum
- *
- * See the comments at the head of this file for rationale.
+ * Return the maximum number of dead tuples we can record.
  */
-static void
-lazy_space_alloc(LVRelStats *vacrelstats, BlockNumber relblocks)
+static long
+compute_max_dead_tuples(BlockNumber relblocks, bool hasindex)
 {
 	long		maxtuples;
 	int			vac_work_mem = IsAutoVacuumWorkerProcess() &&
 	autovacuum_work_mem != -1 ?
 	autovacuum_work_mem : maintenance_work_mem;
 
-	if (vacrelstats->hasindex)
+	if (hasindex)
 	{
 		maxtuples = (vac_work_mem * 1024L) / sizeof(ItemPointerData);
 		maxtuples = Min(maxtuples, INT_MAX);
@@ -2106,34 +2361,49 @@ lazy_space_alloc(LVRelStats *vacrelstats, BlockNumber relblocks)
 		maxtuples = Max(maxtuples, MaxHeapTuplesPerPage);
 	}
 	else
-	{
 		maxtuples = MaxHeapTuplesPerPage;
-	}
 
-	vacrelstats->num_dead_tuples = 0;
-	vacrelstats->max_dead_tuples = (int) maxtuples;
-	vacrelstats->dead_tuples = (ItemPointer)
-		palloc(maxtuples * sizeof(ItemPointerData));
+	return maxtuples;
+}
+
+/*
+ * lazy_space_alloc - space allocation decisions for lazy vacuum
+ *
+ * See the comments at the head of this file for rationale.
+ */
+static void
+lazy_space_alloc(LVRelStats *vacrelstats, BlockNumber relblocks)
+{
+	LVDeadTuples	*dead_tuples = NULL;
+	long		maxtuples;
+
+	maxtuples = compute_max_dead_tuples(relblocks, vacrelstats->hasindex);
+
+	dead_tuples = (LVDeadTuples *)
+		palloc(SizeOfLVDeadTuples + maxtuples * sizeof(ItemPointerData));
+	dead_tuples->num_tuples = 0;
+	dead_tuples->max_tuples = (int) maxtuples;
+
+	vacrelstats->dead_tuples = dead_tuples;
 }
 
 /*
  * lazy_record_dead_tuple - remember one deletable tuple
  */
 static void
-lazy_record_dead_tuple(LVRelStats *vacrelstats,
-					   ItemPointer itemptr)
+lazy_record_dead_tuple(LVDeadTuples *dead_tuples, ItemPointer itemptr)
 {
 	/*
 	 * The array shouldn't overflow under normal behavior, but perhaps it
 	 * could if we are given a really small maintenance_work_mem. In that
 	 * case, just forget the last few tuples (we'll get 'em next time).
 	 */
-	if (vacrelstats->num_dead_tuples < vacrelstats->max_dead_tuples)
+	if (dead_tuples->num_tuples < dead_tuples->max_tuples)
 	{
-		vacrelstats->dead_tuples[vacrelstats->num_dead_tuples] = *itemptr;
-		vacrelstats->num_dead_tuples++;
+		dead_tuples->itemptrs[dead_tuples->num_tuples] = *itemptr;
+		dead_tuples->num_tuples++;
 		pgstat_progress_update_param(PROGRESS_VACUUM_NUM_DEAD_TUPLES,
-									 vacrelstats->num_dead_tuples);
+									 dead_tuples->num_tuples);
 	}
 }
 
@@ -2147,12 +2417,12 @@ lazy_record_dead_tuple(LVRelStats *vacrelstats,
 static bool
 lazy_tid_reaped(ItemPointer itemptr, void *state)
 {
-	LVRelStats *vacrelstats = (LVRelStats *) state;
+	LVDeadTuples	*dead_tuples = (LVDeadTuples *) state;
 	ItemPointer res;
 
 	res = (ItemPointer) bsearch((void *) itemptr,
-								(void *) vacrelstats->dead_tuples,
-								vacrelstats->num_dead_tuples,
+								(void *) dead_tuples->itemptrs,
+								dead_tuples->num_tuples,
 								sizeof(ItemPointerData),
 								vac_cmp_itemptr);
 
@@ -2300,3 +2570,390 @@ heap_page_is_all_visible(Relation rel, Buffer buf,
 
 	return all_visible;
 }
+
+/*
+ * Compute the number of parallel worker process to request. Vacuums can be executed
+ * in parallel if the table has more than one index since the parallel index vacuuming
+ * processes one index by one vacuum process. The relation size of table and indexes
+ * doesn't affect to the parallel degree.
+ */
+static int
+compute_parallel_workers(Relation onerel, int nrequested, int nindexes)
+{
+	int parallel_workers;
+
+	if (nindexes <= 1)
+		return 0;
+
+	if (nrequested)
+		parallel_workers = Min(nrequested, nindexes - 1);
+	else
+	{
+		/*
+		 * The parallel degree is neither requested nor set in relopts. Compute
+		 * it based on the number of indexes.
+		 */
+		parallel_workers = nindexes - 1;
+	}
+
+	/* cap by max_parallel_maintenace_workers */
+	parallel_workers = Min(parallel_workers, max_parallel_maintenance_workers);
+
+	return parallel_workers;
+}
+
+/*
+ * Enter the parallel mode, allocate and initialize a DSM segment.
+ */
+static LVParallelState *
+lazy_prepare_parallel(LVRelStats *vacrelstats, Oid relid, BlockNumber nblocks,
+					  int nindexes, int nrequested, bool is_wraparound)
+{
+	LVParallelState *lps = (LVParallelState *) palloc(sizeof(LVParallelState));
+	LVShared	*shared;
+	ParallelContext *pcxt;
+	LVDeadTuples	*tidmap;
+	long	maxtuples;
+	char	*sharedquery;
+	Size	estshared;
+	Size	estdt;
+	int		querylen;
+	int		i;
+	int		keys = 0;
+
+	Assert(nrequested > 0);
+	Assert(nindexes > 0);
+
+	EnterParallelMode();
+	pcxt = CreateParallelContext("postgres", "heap_parallel_vacuum_main",
+								 nrequested, true);
+	lps->pcxt = pcxt;
+	Assert(pcxt->nworkers > 0);
+
+	/* Estimate size for shared information -- PARALLEL_VACUUM_KEY_SHARED */
+	estshared = MAXALIGN(add_size(SizeOfLVShared,
+								  mul_size(sizeof(LVIndStats), nindexes)));
+	shm_toc_estimate_chunk(&pcxt->estimator, estshared);
+	keys++;
+
+	/* Estimate size for dead tuples -- PARALLEL_VACUUM_KEY_DEAD_TUPLES */
+	maxtuples = compute_max_dead_tuples(nblocks, nindexes > 0);
+	estdt = MAXALIGN(add_size(sizeof(LVDeadTuples),
+							  mul_size(sizeof(ItemPointerData), maxtuples)));
+	shm_toc_estimate_chunk(&pcxt->estimator, estdt);
+	keys++;
+
+	shm_toc_estimate_keys(&pcxt->estimator, keys);
+
+	/* Finally, estimate VACUUM_KEY_QUERY_TEXT space */
+	querylen = strlen(debug_query_string);
+	shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1);
+	shm_toc_estimate_keys(&pcxt->estimator, 1);
+
+	/* create the DSM */
+	InitializeParallelDSM(pcxt);
+
+	/* prepare shared information */
+	shared = (LVShared *) shm_toc_allocate(pcxt->toc, estshared);
+	shared->relid = relid;
+	shared->is_wraparound = is_wraparound;
+	shared->elevel = elevel;
+	pg_atomic_init_u32(&(shared->nprocessed), 0);
+
+	for (i = 0; i < nindexes; i++)
+	{
+		LVIndStats *s = &(shared->indstats[i]);
+		s->updated = false;
+		MemSet(&(s->stats), 0, sizeof(IndexBulkDeleteResult));
+	}
+
+	shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_SHARED, shared);
+	lps->lvshared = shared;
+
+	/* prepare the dead tuple space */
+	tidmap = (LVDeadTuples *) shm_toc_allocate(pcxt->toc, estdt);
+	tidmap->max_tuples = maxtuples;
+	tidmap->num_tuples = 0;
+	MemSet(tidmap->itemptrs, 0, sizeof(ItemPointerData) * maxtuples);
+	shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_DEAD_TUPLES, tidmap);
+	vacrelstats->dead_tuples = tidmap;
+
+	/* Store query string for workers */
+	sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1);
+	memcpy(sharedquery, debug_query_string, querylen + 1);
+	sharedquery[querylen] = '\0';
+	shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_QUERY_TEXT, sharedquery);
+
+	return lps;
+}
+
+/*
+ * Shutdown workers, destroy parallel context, and end parallel mode. If
+ * 'update_indstats' is true, we copy statistics of all indexes before
+ * destroying the parallel context, and then update them after exit parallel
+ * mode.
+ */
+static void
+lazy_end_parallel(LVParallelState *lps, Relation *Irel, int nindexes,
+				  bool update_indstats)
+{
+	LVIndStats *copied_indstats = NULL;
+
+	Assert(!IsParallelWorker());
+
+	if (update_indstats)
+	{
+		Assert(Irel != NULL && nindexes > 0);
+		/* copy the index statistics to a temporary space */
+		copied_indstats = palloc(sizeof(LVIndStats) * nindexes);
+		memcpy(copied_indstats, lps->lvshared->indstats,
+			   sizeof(LVIndStats) * nindexes);
+	}
+
+	/* Shutdown worker processes and destroy the parallel context */
+	WaitForParallelWorkersToFinish(lps->pcxt);
+	DestroyParallelContext(lps->pcxt);
+	ExitParallelMode();
+
+	if (update_indstats)
+	{
+		int i;
+
+		for (i = 0; i < nindexes; i++)
+		{
+			LVIndStats *s = &(copied_indstats[i]);
+
+			/* Update index statistics */
+			if (s->updated && !s->stats.estimated_count)
+				vac_update_relstats(Irel[i],
+									s->stats.num_pages,
+									s->stats.num_index_tuples,
+									0,
+									false,
+									InvalidTransactionId,
+									InvalidMultiXactId,
+									false);
+		}
+
+		pfree(copied_indstats);
+	}
+}
+
+/*
+ * Begin a parallel index vacuuming or cleanup index. Set shared information
+ * and launch parallel worker processes. Return true if at least one worker
+ * has been launched.
+ */
+static bool
+lazy_begin_parallel_vacuum_index(LVParallelState *lps, LVRelStats *vacrelstats,
+								 bool for_cleanup)
+{
+	StringInfoData buf;
+
+	Assert(!IsParallelWorker());
+
+	/*
+	 * Request workers to do either vacuuming indexes or cleaning indexes.
+	 */
+	lps->lvshared->for_cleanup = for_cleanup;
+
+	if (for_cleanup)
+	{
+		/*
+		 * Now we can provide a better estimate of total number of surviving
+		 * tuples (we assume indexes are more interested in that than in the
+		 * number of nominally live tuples).
+		 */
+		lps->lvshared->reltuples = vacrelstats->new_rel_tuples;
+		lps->lvshared->estimated_count =
+			(vacrelstats->tupcount_pages < vacrelstats->rel_pages);
+	}
+	else
+	{
+		/* We can only provide an approximate value of num_heap_tuples here */
+		lps->lvshared->reltuples = vacrelstats->old_live_tuples;
+		lps->lvshared->estimated_count = true;
+	}
+
+	LaunchParallelWorkers(lps->pcxt);
+
+	/* Report parallel vacuum worker information */
+	initStringInfo(&buf);
+	if (for_cleanup)
+	{
+		if (lps->nworkers_requested > 0)
+			appendStringInfo(&buf,
+							 ngettext("launched %d parallel vacuum worker for index cleanup (planned: %d, requested %d)",
+									  "launched %d parallel vacuum workers for index cleanup (planned: %d, requsted %d)",
+									  lps->pcxt->nworkers_launched),
+							 lps->pcxt->nworkers_launched,
+							 lps->pcxt->nworkers,
+							 lps->nworkers_requested);
+		else
+			appendStringInfo(&buf,
+							 ngettext("launched %d parallel vacuum worker for index cleanup (planned: %d)",
+									  "launched %d parallel vacuum workers for index cleanup (planned: %d)",
+									  lps->pcxt->nworkers_launched),
+							 lps->pcxt->nworkers_launched,
+							 lps->pcxt->nworkers);
+	}
+	else
+	{
+		if (lps->nworkers_requested > 0)
+			appendStringInfo(&buf,
+							 ngettext("launched %d parallel vacuum worker for index vacuuming (planned: %d, requested %d)",
+									  "launched %d parallel vacuum workers for index vacuuming (planned: %d, requested %d)",
+									  lps->pcxt->nworkers_launched),
+							 lps->pcxt->nworkers_launched,
+							 lps->pcxt->nworkers,
+							 lps->nworkers_requested);
+		else
+			appendStringInfo(&buf,
+							 ngettext("launched %d parallel vacuum worker for index vacuuming (planned: %d)",
+									  "launched %d parallel vacuum workers for index vacuuming (planned: %d)",
+									  lps->pcxt->nworkers_launched),
+							 lps->pcxt->nworkers_launched,
+							 lps->pcxt->nworkers);
+	}
+	ereport(elevel, (errmsg("%s", buf.data)));
+
+	/*
+	 * if no workers launched, we vacuum all indexes by the leader process
+	 * alone. Since there is hope that we can launch workers in the next
+	 * execution time we don't want to end the parallel mode yet.
+	 */
+	if (lps->pcxt->nworkers_launched == 0)
+	{
+		lazy_end_parallel_vacuum_index(lps, for_cleanup);
+		return false;
+	}
+
+	return true;
+}
+
+/*
+ * Wait for all worker processes to finish and reinitialize DSM for
+ * the next execution.
+ */
+static void
+lazy_end_parallel_vacuum_index(LVParallelState *lps, bool for_cleanup)
+{
+	Assert(!IsParallelWorker());
+
+	WaitForParallelWorkersToFinish(lps->pcxt);
+
+	/* Reset the processing count */
+	pg_atomic_write_u32(&(lps->lvshared->nprocessed), 0);
+
+	/*
+	 * Reinitialize the DSM space except to relaunch parallel workers for
+	 * the next execution.
+	 */
+	if (!for_cleanup)
+		ReinitializeParallelDSM(lps->pcxt);
+}
+
+/*
+ * Perform work within a launched parallel process.
+ *
+ * Parallel vacuum worker processes doesn't report the vacuum progress
+ * information.
+ */
+void
+heap_parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
+{
+	Relation	onerel;
+	Relation	*indrels;
+	LVShared	*lvshared;
+	LVDeadTuples	*dead_tuples;
+	int			nindexes;
+	char		*sharedquery;
+
+	/* Set lazy vacuum state and open relations */
+	lvshared = (LVShared *) shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_SHARED, false);
+	onerel = heap_open(lvshared->relid, ShareUpdateExclusiveLock);
+	vac_open_indexes(onerel, RowExclusiveLock, &nindexes, &indrels);
+	elevel = lvshared->elevel;
+
+	ereport(DEBUG1,
+			(errmsg("starting parallel lazy vacuum worker for %s",
+					lvshared->for_cleanup ? "cleanup" : "vacuuming")));
+
+	/* Set debug_query_string for individual workers */
+	sharedquery = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_QUERY_TEXT, true);
+
+	/* Report the query string from leader */
+	debug_query_string = sharedquery;
+	pgstat_report_activity(STATE_RUNNING, debug_query_string);
+
+	/* Set dead tuple space within worker */
+	dead_tuples = (LVDeadTuples *) shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_DEAD_TUPLES, false);
+
+	/* Set cost-based vacuum delay */
+	VacuumCostActive = (VacuumCostDelay > 0);
+	VacuumCostBalance = 0;
+	VacuumPageHit = 0;
+	VacuumPageMiss = 0;
+	VacuumPageDirty = 0;
+
+	/* Do either vacuuming indexes or cleaning indexes */
+	lazy_vacuum_indexes_for_worker(indrels, nindexes, lvshared,
+								   dead_tuples,
+								   lvshared->for_cleanup);
+
+	vac_close_indexes(nindexes, indrels, RowExclusiveLock);
+	heap_close(onerel, ShareUpdateExclusiveLock);
+}
+
+/*
+ * Vacuum or cleanup indexes. This function must be used by the parallel vacuum
+ * worker processes. Similar to the leader process in parallel lazy vacuum, we
+ * copy the index bulk-deletion results to the shared memory segment.
+ */
+static void
+lazy_vacuum_indexes_for_worker(Relation *indrels, int nindexes,
+							   LVShared *lvshared, LVDeadTuples *dead_tuples,
+							   bool for_cleanup)
+{
+	int idx = 0;
+
+	Assert(IsParallelWorker());
+
+	for (;;)
+	{
+		IndexBulkDeleteResult *result = NULL;
+
+		/* Get next index to process */
+		idx = pg_atomic_fetch_add_u32(&(lvshared->nprocessed), 1);
+
+		/*  Done for all indexes? */
+		if (idx >= nindexes)
+			break;
+
+		/*
+		 * If we already have the result passed by the index AM, we pass
+		 * it. Otherwise pass NULL as it expects NULL for the first time
+		 * execution.
+		 */
+		if (lvshared->indstats[idx].updated)
+			result = &(lvshared->indstats[idx].stats);
+
+		/* Do vacuuming or cleanup one index */
+		if (for_cleanup)
+			result = lazy_cleanup_index(indrels[idx], result, lvshared->reltuples,
+									   lvshared->estimated_count, false);
+		else
+			result = lazy_vacuum_index(indrels[idx], result, lvshared->reltuples,
+									  dead_tuples);
+
+		if (result)
+		{
+			/* Save index bulk-deletion result to the shared memory space */
+			memcpy(&(lvshared->indstats[idx].stats), result,
+				   sizeof(IndexBulkDeleteResult));
+
+			/* Set true to pass the saved results at the next time */
+			lvshared->indstats[idx].updated = true;
+		}
+	}
+}
diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c
index ce2b616..fb1e951 100644
--- a/src/backend/access/transam/parallel.c
+++ b/src/backend/access/transam/parallel.c
@@ -14,6 +14,7 @@
 
 #include "postgres.h"
 
+#include "access/heapam.h"
 #include "access/nbtree.h"
 #include "access/parallel.h"
 #include "access/session.h"
@@ -138,6 +139,9 @@ static const struct
 	},
 	{
 		"_bt_parallel_build_main", _bt_parallel_build_main
+	},
+	{
+		"heap_parallel_vacuum_main", heap_parallel_vacuum_main
 	}
 };
 
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index 843f626..ec6efc4 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -111,6 +111,12 @@ ExecVacuum(VacuumStmt *vacstmt, bool isTopLevel)
 		}
 	}
 
+	if ((vacstmt->options->flags & VACOPT_FULL) &&
+		(vacstmt->options->flags & VACOPT_PARALLEL))
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("cannot specify FULL option with PARALLEL option")));
+
 	/*
 	 * All freeze ages are zero if the FREEZE option is given; otherwise pass
 	 * them as -1 which means to use the default values.
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index 7f937c9..1c4ac81 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -3849,6 +3849,7 @@ _copyVacuumOptions(const VacuumOptions *from)
 	VacuumOptions *newnode = makeNode(VacuumOptions);
 
 	COPY_SCALAR_FIELD(flags);
+	COPY_SCALAR_FIELD(nworkers);
 
 	return newnode;
 }
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
index 3dbbff4..0869a10 100644
--- a/src/backend/nodes/equalfuncs.c
+++ b/src/backend/nodes/equalfuncs.c
@@ -1669,6 +1669,7 @@ static bool
 _equalVacuumOptions(const VacuumOptions *a, const VacuumOptions *b)
 {
 	COMPARE_SCALAR_FIELD(flags);
+	COMPARE_SCALAR_FIELD(nworkers);
 
 	return true;
 }
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index e7601da..f6acf5c 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -187,7 +187,7 @@ static void processCASbits(int cas_bits, int location, const char *constrType,
 			   bool *deferrable, bool *initdeferred, bool *not_valid,
 			   bool *no_inherit, core_yyscan_t yyscanner);
 static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
-static VacuumOptions *makeVacOpt(VacuumFlag flags);
+static VacuumOptions *makeVacOpt(VacuumFlag flag, int nworkers);
 
 %}
 
@@ -10438,7 +10438,7 @@ cluster_index_specification:
 VacuumStmt: VACUUM opt_full opt_freeze opt_verbose opt_analyze opt_vacuum_relation_list
 				{
 					VacuumStmt *n = makeNode(VacuumStmt);
-					VacuumOptions *opt = makeVacOpt(VACOPT_VACUUM);
+					VacuumOptions *opt = makeVacOpt(VACOPT_VACUUM, 0);
 					if ($2)
 						opt->flags |= VACOPT_FULL;
 					if ($3)
@@ -10454,8 +10454,10 @@ VacuumStmt: VACUUM opt_full opt_freeze opt_verbose opt_analyze opt_vacuum_relati
 			| VACUUM '(' vacuum_option_list ')' opt_vacuum_relation_list
 				{
 					VacuumStmt *n = makeNode(VacuumStmt);
-					n->options = $3;
-					n->options->flags |= VACOPT_VACUUM;
+					VacuumOptions *opt = makeVacOpt(VACOPT_VACUUM, 0);
+					opt->flags = VACOPT_VACUUM | $3->flags;
+					opt->nworkers = $3->nworkers;
+					n->options = opt;
 					n->rels = $5;
 					$$ = (Node *) n;
 				}
@@ -10465,23 +10467,38 @@ vacuum_option_list:
 			vacuum_option_elem								{ $$ = $1; }
 			| vacuum_option_list ',' vacuum_option_elem
 				{
-					$1->flags |= $3->flags;
-					pfree($3);
-					$$ = $1;
+					VacuumOptions *opt1 = $1;
+					VacuumOptions *opt2 = $3;
+
+					opt1->flags |= opt2->flags;
+					if (opt2->flags == VACOPT_PARALLEL)
+						opt1->nworkers = opt2->nworkers;
+					pfree(opt2);
+					$$ = opt1;
 				}
 		;
 
 vacuum_option_elem:
-			analyze_keyword		{ $$ = makeVacOpt(VACOPT_ANALYZE); }
-			| VERBOSE			{ $$ = makeVacOpt(VACOPT_VERBOSE); }
-			| FREEZE			{ $$ = makeVacOpt(VACOPT_FREEZE); }
-			| FULL				{ $$ = makeVacOpt(VACOPT_FULL); }
+			analyze_keyword		{ $$ = makeVacOpt(VACOPT_ANALYZE, 0); }
+			| VERBOSE			{ $$ = makeVacOpt(VACOPT_VERBOSE, 0); }
+			| FREEZE			{ $$ = makeVacOpt(VACOPT_FREEZE, 0); }
+			| FULL				{ $$ = makeVacOpt(VACOPT_FULL, 0); }
+			| PARALLEL			{ $$ = makeVacOpt(VACOPT_PARALLEL, 0); }
+			| PARALLEL ICONST
+			{
+				if ($2 < 1)
+					ereport(ERROR,
+							(errcode(ERRCODE_SYNTAX_ERROR),
+							 errmsg("parallel vacuum degree must be more than 1"),
+							 parser_errposition(@1)));
+				$$ = makeVacOpt(VACOPT_PARALLEL, $2);
+			}
 			| IDENT
 				{
 					if (strcmp($1, "disable_page_skipping") == 0)
-						$$ = makeVacOpt(VACOPT_DISABLE_PAGE_SKIPPING);
+						$$ = makeVacOpt(VACOPT_DISABLE_PAGE_SKIPPING, 0);
 					else if (strcmp($1, "skip_locked") == 0)
-						$$ = makeVacOpt(VACOPT_SKIP_LOCKED);
+						$$ = makeVacOpt(VACOPT_SKIP_LOCKED, 0);
 					else
 						ereport(ERROR,
 								(errcode(ERRCODE_SYNTAX_ERROR),
@@ -10493,7 +10510,8 @@ vacuum_option_elem:
 AnalyzeStmt: analyze_keyword opt_verbose opt_vacuum_relation_list
 				{
 					VacuumStmt *n = makeNode(VacuumStmt);
-					VacuumOptions *opt = makeVacOpt(VACOPT_ANALYZE);
+					VacuumOptions *opt = makeVacOpt(VACOPT_ANALYZE, 0);
+
 					if ($2)
 						opt->flags |= VACOPT_VERBOSE;
 					n->options = opt;
@@ -10503,7 +10521,9 @@ AnalyzeStmt: analyze_keyword opt_verbose opt_vacuum_relation_list
 			| analyze_keyword '(' analyze_option_list ')' opt_vacuum_relation_list
 				{
 					VacuumStmt *n = makeNode(VacuumStmt);
-					n->options =  makeVacOpt(VACOPT_ANALYZE | $3);
+					VacuumOptions *opt = makeVacOpt(VACOPT_ANALYZE, 0);
+					opt->flags = VACOPT_ANALYZE | $3;
+					n->options = opt;
 					n->rels = $5;
 					$$ = (Node *) n;
 				}
@@ -16027,16 +16047,18 @@ makeXmlExpr(XmlExprOp op, char *name, List *named_args, List *args,
 	return (Node *) x;
 }
 
+
 /*
- * Create a VacuumOptions with the given flags.
+ * Create a VacuumOptions with the given options.
  */
 static VacuumOptions *
-makeVacOpt(const VacuumFlag flags)
+makeVacOpt(VacuumFlag flag, int nworkers)
 {
-	VacuumOptions *opt = makeNode(VacuumOptions);
+	VacuumOptions *vacopt = makeNode(VacuumOptions);
 
-	opt->flags = flags;
-	return opt;
+	vacopt->flags = flag;
+	vacopt->nworkers = nworkers;
+	return vacopt;
 }
 
 /*
diff --git a/src/backend/postmaster/autovacuum.c b/src/backend/postmaster/autovacuum.c
index 525a33b..05898cf 100644
--- a/src/backend/postmaster/autovacuum.c
+++ b/src/backend/postmaster/autovacuum.c
@@ -2886,6 +2886,7 @@ table_recheck_autovac(Oid relid, HTAB *table_toast_map,
 			(dovacuum ? VACOPT_VACUUM : 0) |
 			(doanalyze ? VACOPT_ANALYZE : 0) |
 			(!wraparound ? VACOPT_SKIP_LOCKED : 0);
+		tab->at_vacoptions.nworkers = 0;	/* parallel lazy autovacuum is not supported */
 		tab->at_params.freeze_min_age = freeze_min_age;
 		tab->at_params.freeze_table_age = freeze_table_age;
 		tab->at_params.multixact_freeze_min_age = multixact_freeze_min_age;
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index 10ae21c..fef80c4 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -3429,7 +3429,8 @@ psql_completion(const char *text, int start, int end)
 		 */
 		if (ends_with(prev_wd, '(') || ends_with(prev_wd, ','))
 			COMPLETE_WITH("FULL", "FREEZE", "ANALYZE", "VERBOSE",
-						  "DISABLE_PAGE_SKIPPING", "SKIP_LOCKED");
+						  "DISABLE_PAGE_SKIPPING", "SKIP_LOCKED",
+						  "PARALLEL");
 	}
 	else if (HeadMatches("VACUUM") && TailMatches("("))
 		/* "VACUUM (" should be caught above, so assume we want columns */
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 1c8525f..63e7a9e 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -14,12 +14,14 @@
 #ifndef HEAPAM_H
 #define HEAPAM_H
 
+#include "access/parallel.h"
 #include "access/relation.h"	/* for backward compatibility */
 #include "access/sdir.h"
 #include "access/skey.h"
 #include "access/table.h"		/* for backward compatibility */
 #include "nodes/parsenodes.h"
 #include "nodes/lockoptions.h"
+#include "nodes/parsenodes.h"
 #include "nodes/primnodes.h"
 #include "storage/bufpage.h"
 #include "storage/lockdefs.h"
@@ -188,6 +190,7 @@ extern Size SyncScanShmemSize(void);
 struct VacuumParams;
 extern void heap_vacuum_rel(Relation onerel, VacuumOptions *options,
 				struct VacuumParams *params, BufferAccessStrategy bstrategy);
+extern void heap_parallel_vacuum_main(dsm_segment *seg, shm_toc *toc);
 
 /* in heap/heapam_visibility.c */
 extern bool HeapTupleSatisfiesVisibility(HeapTuple stup, Snapshot snapshot,
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 278e5d1..46e7fff 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3163,13 +3163,15 @@ typedef enum VacuumFlag
 	VACOPT_FULL = 1 << 4,		/* FULL (non-concurrent) vacuum */
 	VACOPT_SKIP_LOCKED = 1 << 5,	/* skip if cannot get lock */
 	VACOPT_SKIPTOAST = 1 << 6,	/* don't process the TOAST table, if any */
-	VACOPT_DISABLE_PAGE_SKIPPING = 1 << 7	/* don't skip any pages */
+	VACOPT_DISABLE_PAGE_SKIPPING = 1 << 7,	/* don't skip any pages */
+	VACOPT_PARALLEL = 1 << 8	/* do lazy vacuum in parallel */
 } VacuumFlag;
 
 typedef struct VacuumOptions
 {
 	NodeTag		type;
 	int			flags; /* OR of VacuumFlag */
+	int			nworkers;	/* # of parallel vacuum workers */
 } VacuumOptions;
 
 /*
diff --git a/src/test/regress/expected/vacuum.out b/src/test/regress/expected/vacuum.out
index fa9d663..9b5b7dc 100644
--- a/src/test/regress/expected/vacuum.out
+++ b/src/test/regress/expected/vacuum.out
@@ -80,6 +80,8 @@ CONTEXT:  SQL function "do_analyze" statement 1
 SQL function "wrap_do_analyze" statement 1
 VACUUM FULL vactst;
 VACUUM (DISABLE_PAGE_SKIPPING) vaccluster;
+VACUUM (PARALLEL) vaccluster;
+VACUUM (PARALLEL 2) vaccluster;
 -- partitioned table
 CREATE TABLE vacparted (a int, b char) PARTITION BY LIST (a);
 CREATE TABLE vacparted1 PARTITION OF vacparted FOR VALUES IN (1);
diff --git a/src/test/regress/sql/vacuum.sql b/src/test/regress/sql/vacuum.sql
index 9defa0d..f92c4e5 100644
--- a/src/test/regress/sql/vacuum.sql
+++ b/src/test/regress/sql/vacuum.sql
@@ -61,6 +61,9 @@ VACUUM FULL vaccluster;
 VACUUM FULL vactst;
 
 VACUUM (DISABLE_PAGE_SKIPPING) vaccluster;
+VACUUM (PARALLEL) vaccluster;
+VACUUM (PARALLEL 2) vaccluster;
+
 
 -- partitioned table
 CREATE TABLE vacparted (a int, b char) PARTITION BY LIST (a);
-- 
2.10.5

