diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 1585861..94c8507 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -2049,11 +2049,6 @@ heap_prepare_insert(Relation relation, HeapTuple tup, TransactionId xid,
 	 * inserts in general except for the cases where inserts generate a new
 	 * CommandId (eg. inserts into a table having a foreign key column).
 	 */
-	if (IsParallelWorker())
-		ereport(ERROR,
-				(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
-				 errmsg("cannot insert tuples in a parallel worker")));
-
 	tup->t_data->t_infomask &= ~(HEAP_XACT_MASK);
 	tup->t_data->t_infomask2 &= ~(HEAP2_XACT_MASK);
 	tup->t_data->t_infomask |= HEAP_XMAX_INVALID;
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index af6afce..8c69931 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -517,6 +517,20 @@ GetCurrentFullTransactionIdIfAny(void)
 }
 
 /*
+ *	SetCurrentCommandIdUsedForWorker
+ *
+ * For a parallel worker, record that the currentCommandId has been used.
+ * This must only be called at the start of a parallel operation.
+ */
+void
+SetCurrentCommandIdUsedForWorker(void)
+{
+	Assert(IsParallelWorker() && !currentCommandIdUsed && currentCommandId != InvalidCommandId);
+
+	currentCommandIdUsed = true;
+}
+
+/*
  *	MarkCurrentTransactionIdLoggedIfAny
  *
  * Remember that the current xid - if it is assigned - now has been wal logged.
@@ -764,12 +778,13 @@ GetCurrentCommandId(bool used)
 	if (used)
 	{
 		/*
-		 * Forbid setting currentCommandIdUsed in a parallel worker, because
-		 * we have no provision for communicating this back to the leader.  We
-		 * could relax this restriction when currentCommandIdUsed was already
-		 * true at the start of the parallel operation.
+		 * If in a parallel worker, only allow setting currentCommandIdUsed
+		 * if currentCommandIdUsed was already true at the start of the
+		 * parallel operation (by way of SetCurrentCommandIdUsed()), otherwise
+		 * forbid setting currentCommandIdUsed because we have no provision
+		 * for communicating this back to the leader.
 		 */
-		Assert(!IsParallelWorker());
+		Assert(!(IsParallelWorker() && !currentCommandIdUsed));
 		currentCommandIdUsed = true;
 	}
 	return currentCommandId;
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index 2e27e26..0f3bd82 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -173,7 +173,7 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags)
 	 * against performing unsafe operations in parallel mode, but this gives a
 	 * more user-friendly error message.
 	 */
-	if ((XactReadOnly || IsInParallelMode()) &&
+	if ((XactReadOnly || (IsInParallelMode() && queryDesc->plannedstmt->commandType != CMD_INSERT)) &&
 		!(eflags & EXEC_FLAG_EXPLAIN_ONLY))
 		ExecCheckXactReadOnly(queryDesc->plannedstmt);
 
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index 382e78f..0e62554 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -23,6 +23,7 @@
 
 #include "postgres.h"
 
+#include "access/xact.h"
 #include "executor/execParallel.h"
 #include "executor/executor.h"
 #include "executor/nodeAgg.h"
@@ -65,6 +66,7 @@
 #define PARALLEL_KEY_QUERY_TEXT		UINT64CONST(0xE000000000000008)
 #define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009)
 #define PARALLEL_KEY_WAL_USAGE			UINT64CONST(0xE00000000000000A)
+#define PARALLEL_KEY_PROCESSED_COUNT	UINT64CONST(0xE00000000000000B)
 
 #define PARALLEL_TUPLE_QUEUE_SIZE		65536
 
@@ -173,18 +175,20 @@ ExecSerializePlan(Plan *plan, EState *estate)
 	 * PlannedStmt to start the executor.
 	 */
 	pstmt = makeNode(PlannedStmt);
-	pstmt->commandType = CMD_SELECT;
+	Assert(estate->es_plannedstmt->commandType == CMD_SELECT ||
+			estate->es_plannedstmt->commandType == CMD_INSERT);
+	pstmt->commandType = IsA(plan, ModifyTable) ? CMD_INSERT : CMD_SELECT;
 	pstmt->queryId = UINT64CONST(0);
-	pstmt->hasReturning = false;
-	pstmt->hasModifyingCTE = false;
+	pstmt->hasReturning = estate->es_plannedstmt->hasReturning;
+	pstmt->hasModifyingCTE = estate->es_plannedstmt->hasModifyingCTE;
 	pstmt->canSetTag = true;
 	pstmt->transientPlan = false;
 	pstmt->dependsOnRole = false;
 	pstmt->parallelModeNeeded = false;
 	pstmt->planTree = plan;
 	pstmt->rtable = estate->es_range_table;
-	pstmt->resultRelations = NIL;
-	pstmt->rootResultRelations = NIL;
+	pstmt->resultRelations = estate->es_plannedstmt->resultRelations;
+	pstmt->rootResultRelations = estate->es_plannedstmt->rootResultRelations;
 	pstmt->appendRelations = NIL;
 
 	/*
@@ -591,6 +595,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
 	char	   *paramlistinfo_space;
 	BufferUsage *bufusage_space;
 	WalUsage   *walusage_space;
+	uint64	   *processed_count_space;
 	SharedExecutorInstrumentation *instrumentation = NULL;
 	SharedJitInstrumentation *jit_instrumentation = NULL;
 	int			pstmt_len;
@@ -676,6 +681,14 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
 						   mul_size(PARALLEL_TUPLE_QUEUE_SIZE, pcxt->nworkers));
 	shm_toc_estimate_keys(&pcxt->estimator, 1);
 
+	if (IsA(planstate->plan, ModifyTable))
+	{
+		/* Estimate space for returned "# of tuples processed" count. */
+		shm_toc_estimate_chunk(&pcxt->estimator,
+							   mul_size(sizeof(uint64), pcxt->nworkers));
+		shm_toc_estimate_keys(&pcxt->estimator, 1);
+	}
+
 	/*
 	 * Give parallel-aware nodes a chance to add to the estimates, and get a
 	 * count of how many PlanState nodes there are.
@@ -765,6 +778,19 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
 	/* We don't need the TupleQueueReaders yet, though. */
 	pei->reader = NULL;
 
+	if (IsA(planstate->plan, ModifyTable))
+	{
+		/* Allocate space for each worker's returned "# of tuples processed" count. */
+		processed_count_space = shm_toc_allocate(pcxt->toc,
+											mul_size(sizeof(uint64), pcxt->nworkers));
+		shm_toc_insert(pcxt->toc, PARALLEL_KEY_PROCESSED_COUNT, processed_count_space);
+		pei->processed_count = processed_count_space;
+	}
+	else
+	{
+		pei->processed_count = NULL;
+	}
+
 	/*
 	 * If instrumentation options were supplied, allocate space for the data.
 	 * It only gets partially initialized here; the rest happens during
@@ -1153,6 +1179,16 @@ ExecParallelFinish(ParallelExecutorInfo *pei)
 	for (i = 0; i < nworkers; i++)
 		InstrAccumParallelQuery(&pei->buffer_usage[i], &pei->wal_usage[i]);
 
+	/*
+	 * Update total # of tuples processed, using counts from each worker.
+	 * This is currently done only in the case of parallel INSERT.
+	 */
+	if (pei->processed_count != NULL)
+	{
+		for (i = 0; i < nworkers; i++)
+			pei->planstate->state->es_processed += pei->processed_count[i];
+	}
+
 	pei->finished = true;
 }
 
@@ -1380,6 +1416,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
 	FixedParallelExecutorState *fpes;
 	BufferUsage *buffer_usage;
 	WalUsage   *wal_usage;
+	uint64   *processed_count;
 	DestReceiver *receiver;
 	QueryDesc  *queryDesc;
 	SharedExecutorInstrumentation *instrumentation;
@@ -1401,6 +1438,16 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
 										 true);
 	queryDesc = ExecParallelGetQueryDesc(toc, receiver, instrument_options);
 
+	Assert(queryDesc->operation == CMD_SELECT || queryDesc->operation == CMD_INSERT);
+	if (queryDesc->operation == CMD_INSERT)
+	{
+		/*
+		 * Record that the CurrentCommandId is used, at the start of
+		 * the parallel operation.
+		 */
+		SetCurrentCommandIdUsedForWorker();
+	}
+
 	/* Setting debug_query_string for individual workers */
 	debug_query_string = queryDesc->sourceText;
 
@@ -1459,6 +1506,13 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
 	InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber],
 						  &wal_usage[ParallelWorkerNumber]);
 
+	if (queryDesc->operation == CMD_INSERT)
+	{
+		/* Report the # of tuples processed during parallel INSERT execution. */
+		processed_count = shm_toc_lookup(toc, PARALLEL_KEY_PROCESSED_COUNT, false);
+		processed_count[ParallelWorkerNumber] = queryDesc->estate->es_processed;
+	}
+
 	/* Report instrumentation data if any instrumentation options are set. */
 	if (instrumentation != NULL)
 		ExecParallelReportInstrumentation(queryDesc->planstate,
diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c
index a01b46a..7c2aa52 100644
--- a/src/backend/executor/nodeGather.c
+++ b/src/backend/executor/nodeGather.c
@@ -35,6 +35,7 @@
 #include "executor/execdebug.h"
 #include "executor/execParallel.h"
 #include "executor/nodeGather.h"
+#include "executor/nodeModifyTable.h"
 #include "executor/nodeSubplan.h"
 #include "executor/tqueue.h"
 #include "miscadmin.h"
@@ -60,6 +61,7 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
 	GatherState *gatherstate;
 	Plan	   *outerNode;
 	TupleDesc	tupDesc;
+	Index		varno;
 
 	/* Gather node doesn't have innerPlan node. */
 	Assert(innerPlan(node) == NULL);
@@ -104,7 +106,9 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
 	 * Initialize result type and projection.
 	 */
 	ExecInitResultTypeTL(&gatherstate->ps);
-	ExecConditionalAssignProjectionInfo(&gatherstate->ps, tupDesc, OUTER_VAR);
+	varno = (IsA(outerNode, ModifyTable) && castNode(ModifyTable, outerNode)->returningLists != NULL) ?
+					castNode(ModifyTableState, outerPlanState(gatherstate))->resultRelInfo->ri_RangeTableIndex : OUTER_VAR;
+	ExecConditionalAssignProjectionInfo(&gatherstate->ps, tupDesc, varno);
 
 	/*
 	 * Without projections result slot type is not trivially known, see
@@ -144,9 +148,19 @@ ExecGather(PlanState *pstate)
 	GatherState *node = castNode(GatherState, pstate);
 	TupleTableSlot *slot;
 	ExprContext *econtext;
+	ModifyTableState *nodeModifyTableState = NULL;
+	bool isParallelInsertLeader = false;
+	bool isParallelInsertWithReturning = false;
 
 	CHECK_FOR_INTERRUPTS();
 
+	if (IsA(outerPlanState(pstate), ModifyTableState))
+	{
+		nodeModifyTableState = castNode(ModifyTableState, outerPlanState(pstate));
+		isParallelInsertLeader = nodeModifyTableState->operation == CMD_INSERT;
+		isParallelInsertWithReturning = isParallelInsertLeader && nodeModifyTableState->ps.plan->targetlist != NIL;
+	}
+
 	/*
 	 * Initialize the parallel context and workers on first execution. We do
 	 * this on first execution rather than during node initialization, as it
@@ -166,6 +180,28 @@ ExecGather(PlanState *pstate)
 		{
 			ParallelContext *pcxt;
 
+			/* For parallel INSERT, assign FullTransactionId and CurrentCommandId,
+			 * to be included in the transaction state that is serialized in the
+			 * parallel DSM. We need to temporarily escape parallel mode in order
+			 * for this to be possible.
+			 * For parallel SELECT (as part of non-parallel INSERT), to avoid an
+			 * attempt on INSERT to assign the FullTransactionId whilst in
+			 * parallel mode, we similarly assign the FullTransactionId here.
+			 */
+			if (isParallelInsertLeader || estate->es_plannedstmt->commandType == CMD_INSERT)
+			{
+				/*
+				 * Assign FullTransactionId and CurrentCommandId, to be
+				 * included in the transaction state that is serialized in the DSM.
+				 */
+				if (isParallelInsertLeader)
+					GetCurrentCommandId(true);
+				Assert(IsInParallelMode());
+				ExitParallelMode();
+				GetCurrentFullTransactionId();
+				EnterParallelMode();
+			}
+
 			/* Initialize, or re-initialize, shared state needed by workers. */
 			if (!node->pei)
 				node->pei = ExecInitParallelPlan(node->ps.lefttree,
@@ -178,6 +214,25 @@ ExecGather(PlanState *pstate)
 										 node->pei,
 										 gather->initParam);
 
+			if (isParallelInsertLeader)
+			{
+				/* For Parallel INSERT, if there are BEFORE STATEMENT triggers,
+				 * these must be fired by the leader, not the parallel workers.
+				 */
+				if (nodeModifyTableState->fireBSTriggers)
+				{
+					fireBSTriggers(nodeModifyTableState);
+					nodeModifyTableState->fireBSTriggers = false;
+
+					/*
+					 * Disable firing of AFTER STATEMENT triggers by local
+					 * plan execution (ModifyTable processing). These will be
+					 * fired at end of Gather processing.
+					*/
+					nodeModifyTableState->fireASTriggers = false;
+				}
+			}
+
 			/*
 			 * Register backend workers. We might not get as many as we
 			 * requested, or indeed any at all.
@@ -188,7 +243,7 @@ ExecGather(PlanState *pstate)
 			node->nworkers_launched = pcxt->nworkers_launched;
 
 			/* Set up tuple queue readers to read the results. */
-			if (pcxt->nworkers_launched > 0)
+			if (pcxt->nworkers_launched > 0 && !(isParallelInsertLeader && !isParallelInsertWithReturning))
 			{
 				ExecParallelCreateReaders(node->pei);
 				/* Make a working array showing the active readers */
@@ -200,7 +255,10 @@ ExecGather(PlanState *pstate)
 			}
 			else
 			{
-				/* No workers?	Then never mind. */
+				/*
+				 * No workers were launched, or this is a parallel INSERT
+				 * without a RETURNING clause - no readers are required.
+				 */
 				node->nreaders = 0;
 				node->reader = NULL;
 			}
@@ -208,7 +266,7 @@ ExecGather(PlanState *pstate)
 		}
 
 		/* Run plan locally if no workers or enabled and not single-copy. */
-		node->need_to_scan_locally = (node->nreaders == 0)
+		node->need_to_scan_locally = (node->nworkers_launched <= 0)
 			|| (!gather->single_copy && parallel_leader_participation);
 		node->initialized = true;
 	}
@@ -418,14 +476,25 @@ ExecShutdownGatherWorkers(GatherState *node)
 void
 ExecShutdownGather(GatherState *node)
 {
-	ExecShutdownGatherWorkers(node);
+	if (node->pei == NULL)
+		return;
 
-	/* Now destroy the parallel context. */
-	if (node->pei != NULL)
+	bool isParallelInsertLeader = IsA(outerPlanState(node), ModifyTableState) &&
+									castNode(ModifyTableState, outerPlanState(node))->operation == CMD_INSERT;
+	if (isParallelInsertLeader)
 	{
-		ExecParallelCleanup(node->pei);
-		node->pei = NULL;
+		/* For Parallel INSERT, if there are AFTER STATEMENT triggers, these must be
+		 * fired by the leader, not the parallel workers.
+		 */
+		ModifyTableState *nodeMTS = castNode(ModifyTableState, outerPlanState(node));
+		fireASTriggers(nodeMTS);
 	}
+
+	ExecShutdownGatherWorkers(node);
+
+	/* Now destroy the parallel context. */
+	ExecParallelCleanup(node->pei);
+	node->pei = NULL;
 }
 
 /* ----------------------------------------------------------------
diff --git a/src/backend/executor/nodeGatherMerge.c b/src/backend/executor/nodeGatherMerge.c
index 4712934..cc197dd 100644
--- a/src/backend/executor/nodeGatherMerge.c
+++ b/src/backend/executor/nodeGatherMerge.c
@@ -1,4 +1,4 @@
-/*-------------------------------------------------------------------------
+/*------------------------------------------------------------------------
  *
  * nodeGatherMerge.c
  *		Scan a plan in multiple workers, and do order-preserving merge.
@@ -210,6 +210,21 @@ ExecGatherMerge(PlanState *pstate)
 		{
 			ParallelContext *pcxt;
 
+			if (estate->es_plannedstmt->commandType == CMD_INSERT)
+			{
+				/*
+				 * We need to avoid an attempt on INSERT to assign a
+				 * FullTransactionId whilst in parallel mode (which is in
+				 * effect due to the underlying parallel query) - so the
+				 * FullTransactionId is assigned here. Parallel mode must
+				 * be temporarily escaped in order for this to be possible.
+				*/
+				Assert(IsInParallelMode());
+				ExitParallelMode();
+				GetCurrentFullTransactionId();
+				EnterParallelMode();
+			}
+
 			/* Initialize, or re-initialize, shared state needed by workers. */
 			if (!node->pei)
 				node->pei = ExecInitParallelPlan(node->ps.lefttree,
diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c
index 9812089..1e97974 100644
--- a/src/backend/executor/nodeModifyTable.c
+++ b/src/backend/executor/nodeModifyTable.c
@@ -39,6 +39,7 @@
 
 #include "access/heapam.h"
 #include "access/htup_details.h"
+#include "access/parallel.h"
 #include "access/tableam.h"
 #include "access/xact.h"
 #include "catalog/catalog.h"
@@ -1734,7 +1735,7 @@ ExecOnConflictUpdate(ModifyTableState *mtstate,
 /*
  * Process BEFORE EACH STATEMENT triggers
  */
-static void
+void
 fireBSTriggers(ModifyTableState *node)
 {
 	ModifyTable *plan = (ModifyTable *) node->ps.plan;
@@ -1793,7 +1794,7 @@ getTargetResultRelInfo(ModifyTableState *node)
 /*
  * Process AFTER EACH STATEMENT triggers
  */
-static void
+void
 fireASTriggers(ModifyTableState *node)
 {
 	ModifyTable *plan = (ModifyTable *) node->ps.plan;
@@ -2281,7 +2282,11 @@ ExecModifyTable(PlanState *pstate)
 	/*
 	 * We're done, but fire AFTER STATEMENT triggers before exiting.
 	 */
-	fireASTriggers(node);
+	if (node->fireASTriggers)
+	{
+		fireASTriggers(node);
+		node->fireASTriggers = false;
+	}
 
 	node->mt_done = true;
 
@@ -2335,7 +2340,9 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
 
 	/* set up epqstate with dummy subplan data for the moment */
 	EvalPlanQualInit(&mtstate->mt_epqstate, estate, NULL, NIL, node->epqParam);
-	mtstate->fireBSTriggers = true;
+	/* Statement-level triggers must not be fired by parallel workers */
+	mtstate->fireBSTriggers = !IsParallelWorker();
+	mtstate->fireASTriggers = !IsParallelWorker();
 
 	/*
 	 * call ExecInitNode on each of the plans to be executed and save the
diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c
index cd3716d..7786624 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -179,6 +179,7 @@ static void set_rel_width(PlannerInfo *root, RelOptInfo *rel);
 static double relation_byte_size(double tuples, int width);
 static double page_size(double tuples, int width);
 static double get_parallel_divisor(Path *path);
+static double get_modifytable_parallel_divisor(ModifyTablePath *path);
 
 
 /*
@@ -203,6 +204,66 @@ clamp_row_est(double nrows)
 
 
 /*
+ * cost_modifytable
+ *    Determines and returns the cost of a ModifyTable node.
+ */
+void
+cost_modifytable(ModifyTablePath *path)
+{
+	double      total_size;
+	double      total_rows;
+	ListCell   *lc;
+
+	/*
+	 * Compute cost & rowcount as sum of subpath costs & rowcounts.
+	 */
+	path->path.startup_cost = 0;
+	path->path.total_cost = 0;
+	path->path.rows = 0;
+	total_size = 0;
+	total_rows = 0;
+	foreach(lc, path->subpaths)
+	{
+		Path       *subpath = (Path *) lfirst(lc);
+
+		if (lc == list_head(path->subpaths))  /* first node? */
+			path->path.startup_cost = subpath->startup_cost;
+		path->path.total_cost += subpath->total_cost;
+		total_rows += subpath->rows;
+		total_size += subpath->pathtarget->width * subpath->rows;
+	}
+
+	/* Adjust costing for parallelism, if used. */
+	if (path->path.parallel_workers > 0)
+	{
+		double	parallel_divisor = get_modifytable_parallel_divisor(path);
+
+		/* The total cost is divided among all the workers. */
+		path->path.total_cost /= parallel_divisor;
+
+		/*
+		 * In the case of a parallel plan, the row count needs to represent
+		 * the number of tuples processed per worker.
+		 */
+		path->path.rows = clamp_row_est(total_rows / parallel_divisor);
+	}
+	else
+	{
+		path->path.rows = total_rows;
+	}
+
+	/*
+	 * Set width to the average width of the subpath outputs.  XXX this is
+	 * totally wrong: we should report zero if no RETURNING, else an average
+	 * of the RETURNING tlist widths.  But it's what happened historically,
+	 * and improving it is a task for another day.
+	 */
+	if (total_rows > 0)
+		total_size /= total_rows;
+	path->path.pathtarget->width = rint(total_size);
+}
+
+/*
  * cost_seqscan
  *	  Determines and returns the cost of scanning a relation sequentially.
  *
@@ -383,7 +444,21 @@ cost_gather(GatherPath *path, PlannerInfo *root,
 
 	/* Parallel setup and communication cost. */
 	startup_cost += parallel_setup_cost;
-	run_cost += parallel_tuple_cost * path->path.rows;
+
+	/*
+	 * For Parallel INSERT, provided no tuples are returned from workers
+	 * to gather/leader node, don't add a cost-per-row, as each worker
+	 * parallelly inserts the tuples that result from its chunk of plan
+	 * execution. This change may make the parallel plan cheap among all
+	 * other plans, and influence the planner to consider this parallel
+	 * plan.
+	 */
+	if (!(IsA(path->subpath, ModifyTablePath) &&
+		castNode(ModifyTablePath, path->subpath)->operation == CMD_INSERT &&
+		castNode(ModifyTablePath, path->subpath)->returningLists != NULL))
+	{
+		run_cost += parallel_tuple_cost * path->path.rows;
+	}
 
 	path->path.startup_cost = startup_cost;
 	path->path.total_cost = (startup_cost + run_cost);
@@ -5737,6 +5812,29 @@ get_parallel_divisor(Path *path)
 }
 
 /*
+ * Divisor for ModifyTable (currently only Parallel Insert).
+ * Estimate the fraction of the work that each worker will do given the
+ * number of workers budgeted for the path.
+ * TODO: Needs revising based on further experience.
+ */
+static double
+get_modifytable_parallel_divisor(ModifyTablePath *path)
+{
+	double		parallel_divisor = path->path.parallel_workers;
+
+	if (parallel_leader_participation && path->returningLists != NIL)
+	{
+		double		leader_contribution;
+
+		leader_contribution = 1.0 - (0.3 * path->path.parallel_workers);
+		if (leader_contribution > 0)
+			parallel_divisor += leader_contribution;
+	}
+
+	return parallel_divisor;
+}
+
+/*
  * compute_bitmap_pages
  *
  * compute number of pages fetched from heap in bitmap heap scan.
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index 3d7a4e3..825896b 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -339,7 +339,7 @@ create_plan(PlannerInfo *root, Path *best_path)
 	 * top-level tlist seen at execution time.  However, ModifyTable plan
 	 * nodes don't have a tlist matching the querytree targetlist.
 	 */
-	if (!IsA(plan, ModifyTable))
+	if (!IsA(plan, ModifyTable) && !(IsA(plan, Gather) && IsA(outerPlan(plan), ModifyTable)))
 		apply_tlist_labeling(plan->targetlist, root->processed_tlist);
 
 	/*
diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c
index f331f82..9573a28 100644
--- a/src/backend/optimizer/plan/planner.c
+++ b/src/backend/optimizer/plan/planner.c
@@ -28,6 +28,7 @@
 #include "catalog/pg_inherits.h"
 #include "catalog/pg_proc.h"
 #include "catalog/pg_type.h"
+#include "commands/trigger.h"
 #include "executor/executor.h"
 #include "executor/nodeAgg.h"
 #include "foreign/fdwapi.h"
@@ -58,6 +59,7 @@
 #include "parser/parse_agg.h"
 #include "parser/parsetree.h"
 #include "partitioning/partdesc.h"
+#include "rewrite/rewriteHandler.h"
 #include "rewrite/rewriteManip.h"
 #include "storage/dsm_impl.h"
 #include "utils/lsyscache.h"
@@ -337,7 +339,7 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions,
 	 */
 	if ((cursorOptions & CURSOR_OPT_PARALLEL_OK) != 0 &&
 		IsUnderPostmaster &&
-		parse->commandType == CMD_SELECT &&
+		(parse->commandType == CMD_SELECT || parse->commandType == CMD_INSERT) &&
 		!parse->hasModifyingCTE &&
 		max_parallel_workers_per_gather > 0 &&
 		!IsParallelWorker())
@@ -371,6 +373,7 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions,
 	 * parallel-unsafe, or else the query planner itself has a bug.
 	 */
 	glob->parallelModeNeeded = glob->parallelModeOK &&
+		(parse->commandType == CMD_SELECT) &&
 		(force_parallel_mode != FORCE_PARALLEL_OFF);
 
 	/* Determine what fraction of the plan is likely to be scanned */
@@ -425,7 +428,7 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions,
 	 * Optionally add a Gather node for testing purposes, provided this is
 	 * actually a safe thing to do.
 	 */
-	if (force_parallel_mode != FORCE_PARALLEL_OFF && top_plan->parallel_safe)
+	if (force_parallel_mode != FORCE_PARALLEL_OFF && parse->commandType == CMD_SELECT && top_plan->parallel_safe)
 	{
 		Gather	   *gather = makeNode(Gather);
 
@@ -1797,7 +1800,8 @@ inheritance_planner(PlannerInfo *root)
 									 returningLists,
 									 rowMarks,
 									 NULL,
-									 assign_special_exec_param(root)));
+									 assign_special_exec_param(root),
+									 0));
 }
 
 /*--------------------
@@ -1845,6 +1849,7 @@ grouping_planner(PlannerInfo *root, bool inheritance_update,
 	RelOptInfo *final_rel;
 	FinalPathExtraData extra;
 	ListCell   *lc;
+	int parallel_insert_partial_path_count = 0;
 
 	/* Tweak caller-supplied tuple_fraction if have LIMIT/OFFSET */
 	if (parse->limitCount || parse->limitOffset)
@@ -2381,13 +2386,102 @@ grouping_planner(PlannerInfo *root, bool inheritance_update,
 										returningLists,
 										rowMarks,
 										parse->onConflict,
-										assign_special_exec_param(root));
+										assign_special_exec_param(root),
+										0);
 		}
 
 		/* And shove it into final_rel */
 		add_path(final_rel, path);
 	}
 
+	/* Consider Parallel INSERT */
+	if (parse->commandType == CMD_INSERT &&
+		 !inheritance_update &&
+		 final_rel->consider_parallel &&
+		 parse->rowMarks == NIL)
+	{
+		Index		rootRelation;
+		List	   *withCheckOptionLists;
+		List	   *returningLists;
+		int			parallelInsertWorkers;
+
+		/*
+		 * Generate partial paths for the final_rel. Insert all surviving paths, with
+		 * Limit, and/or ModifyTable steps added if needed.
+		 */
+		foreach(lc, current_rel->partial_pathlist)
+		{
+			Path	   *path = (Path *) lfirst(lc);
+
+			/*
+			 * If there is a LIMIT/OFFSET clause, add the LIMIT node.
+			 */
+			if (limit_needed(parse))
+			{
+				path = (Path *) create_limit_path(root, final_rel, path,
+												  parse->limitOffset,
+												  parse->limitCount,
+												  parse->limitOption,
+												  offset_est, count_est);
+			}
+
+			/*
+			 * Add the ModifyTable node.
+			 */
+
+			/*
+			 * If target is a partition root table, we need to mark the
+			 * ModifyTable node appropriately for that.
+			 */
+			if (rt_fetch(parse->resultRelation, parse->rtable)->relkind ==
+				RELKIND_PARTITIONED_TABLE)
+				rootRelation = parse->resultRelation;
+			else
+				rootRelation = 0;
+
+			/*
+			 * Set up the WITH CHECK OPTION and RETURNING lists-of-lists, if
+			 * needed.
+			 */
+			if (parse->withCheckOptions)
+				withCheckOptionLists = list_make1(parse->withCheckOptions);
+			else
+				withCheckOptionLists = NIL;
+
+			if (parse->returningList)
+				returningLists = list_make1(parse->returningList);
+			else
+				returningLists = NIL;
+
+			/*
+			 * For the number of workers to use for a parallel INSERT, it
+			 * seems resonable to use the same number of workers as estimated
+			 * for the underlying query.
+			 */
+			parallelInsertWorkers = path->parallel_workers;
+
+			path = (Path *)
+				create_modifytable_path(root, final_rel,
+										parse->commandType,
+										parse->canSetTag,
+										parse->resultRelation,
+										rootRelation,
+										false,
+										list_make1_int(parse->resultRelation),
+										list_make1(path),
+										list_make1(root),
+										withCheckOptionLists,
+										returningLists,
+										root->rowMarks,
+										parse->onConflict,
+										assign_special_exec_param(root),
+										parallelInsertWorkers);
+
+			add_partial_path(final_rel, path);
+			parallel_insert_partial_path_count++;
+		}
+	}
+
 	/*
 	 * Generate partial paths for final_rel, too, if outer query levels might
 	 * be able to make use of them.
@@ -2404,6 +2498,12 @@ grouping_planner(PlannerInfo *root, bool inheritance_update,
 		}
 	}
 
+	if (parallel_insert_partial_path_count > 0)
+	{
+		final_rel->rows = current_rel->rows;		/* ??? why hasn't this been set above somewhere ???? */
+		generate_useful_gather_paths(root, final_rel, false);
+	}
+
 	extra.limit_needed = limit_needed(parse);
 	extra.limit_tuples = limit_tuples;
 	extra.count_est = count_est;
@@ -7355,6 +7455,163 @@ can_partial_agg(PlannerInfo *root, const AggClauseCosts *agg_costs)
 }
 
 /*
+ * IsTriggerDataParallelInsertSafe
+ *
+ * Checks if the specified trigger data is parallel safe.
+ * Returns false if any one of the triggers are not safe
+ * for parallel insert.
+ */
+static pg_attribute_always_inline bool
+IsTriggerDataParallelInsertSafe(TriggerDesc *trigdesc)
+{
+	int	i;
+
+	/*
+	 * Can't support execution of the following triggers during
+	 * insert by parallel workers:
+	 * - before/after statement trigger
+	 * - before/after row trigger
+	 * - instead of trigger
+	 * - transition table trigger
+	 * Note however that for parallel INSERT, any before/after
+	 * Insert statement triggers are executed in the leader only
+	 * (not the workers), so checks for those types of triggers
+	 * are not included here.
+	 */
+	if (trigdesc != NULL &&
+		 (trigdesc->trig_insert_instead_row ||
+		  trigdesc->trig_insert_before_row ||
+		  trigdesc->trig_insert_after_row ||
+		  trigdesc->trig_insert_new_table))
+	{
+		return false;
+	}
+
+	for (i = 0; i < trigdesc->numtriggers; i++)
+	{
+		Trigger    *trigger = &trigdesc->triggers[i];
+		int 		trigtype;
+
+		if (func_parallel(trigger->tgfoid) != PROPARALLEL_SAFE)
+			return false;
+
+		/* If the trigger type is RI_TRIGGER_FK, this indicates a FK
+		 * exists in the relation, and this is not parallel-safe for
+		 * insert, as it would result in creation of new CommandIds,
+		 * and this isn't supported by parallel workers.
+		 */
+		trigtype = RI_FKey_trigger_type(trigger->tgfoid);
+		if (trigtype == RI_TRIGGER_FK)
+			return false;
+	}
+
+	return true;
+}
+
+/*
+ * IsParallelInsertSafe
+ *
+ * Determines whether a specified INSERT statement is parallel safe.
+ */
+static bool
+IsParallelInsertSafe(Query *parse)
+{
+	Relation        rel;
+	RangeTblEntry   *rte;
+	TupleDesc		tupdesc;
+	int				attnum;
+
+	/*
+	 * It's not safe to create a parallel Insert plan if
+	 * ON CONFLICT ... DO UPDATE ... has been specified, because
+	 * parallel UPDATE is not supported.
+	 */
+	if (parse->onConflict != NULL && parse->onConflict->action == ONCONFLICT_UPDATE)
+		return false;
+
+	rte = rt_fetch(parse->resultRelation, parse->rtable);
+	rel = table_open(rte->relid, NoLock);
+
+	/*
+	 * We can't support insert by parallel workers on certain table types:
+	 * - foreign table (no FDW API for supporting parallel insert)
+	 * - temporary table (may not be accessible by parallel workers)
+	 */
+	if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE ||
+		RelationUsesLocalBuffers(rel))
+	{
+		table_close(rel, NoLock);
+		return false;
+	}
+
+	/* If any triggers, check they are parallel safe. */
+	if (rel->trigdesc != NULL &&
+		!IsTriggerDataParallelInsertSafe(rel->trigdesc))
+	{
+		table_close(rel, NoLock);
+		return false;
+	}
+
+	/*
+	 * Check if any of the columns has a non-parallel-safe
+	 * volatile default expression.
+	 */
+	tupdesc = RelationGetDescr(rel);
+	for (attnum = 0; attnum < tupdesc->natts; attnum++)
+	{
+		Expr *defexpr;
+		bool isVolatileExpr;
+
+		Form_pg_attribute att = TupleDescAttr(tupdesc, attnum);
+
+		/* We don't need info for dropped or generated attributes */
+		if (att->attisdropped || att->attgenerated)
+			continue;
+
+		if (att->atthasdef)
+		{
+			defexpr = (Expr *)build_column_default(rel, attnum + 1);
+
+			/* Run the expression through planner */
+			defexpr = expression_planner(defexpr);
+
+			isVolatileExpr = contain_volatile_functions((Node *)defexpr);
+			if (isVolatileExpr &&
+				(max_parallel_hazard((Query *)defexpr)) != PROPARALLEL_SAFE)
+			{
+				table_close(rel, NoLock);
+				return false;
+			}
+		}
+	}
+
+	/*
+	 * Check if there are any CHECK constraints which are not parallel-safe.
+	 */
+	if (tupdesc->constr != NULL && tupdesc->constr->num_check > 0)
+	{
+		int i;
+
+		ConstrCheck *check = tupdesc->constr->check;
+
+		for (i = 0; i < tupdesc->constr->num_check; i++)
+		{
+			Expr *checkExpr = stringToNode(check->ccbin);
+			bool isVolatileExpr = contain_volatile_functions((Node *)checkExpr);
+			if (isVolatileExpr &&
+				(max_parallel_hazard((Query *)checkExpr)) != PROPARALLEL_SAFE)
+			{
+				table_close(rel, NoLock);
+				return false;
+			}
+		}
+	}
+
+	table_close(rel, NoLock);
+	return true;
+}
+
+/*
  * apply_scanjoin_target_to_paths
  *
  * Adjust the final scan/join relation, and recursively all of its children,
@@ -7573,7 +7830,24 @@ apply_scanjoin_target_to_paths(PlannerInfo *root,
 	 * one of the generated paths may turn out to be the cheapest one.
 	 */
 	if (rel->consider_parallel && !IS_OTHER_REL(rel))
-		generate_useful_gather_paths(root, rel, false);
+	{
+		if (root->parse->commandType == CMD_INSERT)
+		{
+			if (!IsParallelInsertSafe(root->parse))
+			{
+				/*
+				 * Don't allow parallel insert bacause it's not safe, but do
+				 * allow any underlying query to be run by parallel workers.
+				 */
+				generate_useful_gather_paths(root, rel, false);
+				rel->consider_parallel = false;
+			}
+		}
+		else
+		{
+			generate_useful_gather_paths(root, rel, false);
+		}
+	}
 
 	/*
 	 * Reassess which paths are the cheapest, now that we've potentially added
diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c
index dd8e2e9..e9dcd30 100644
--- a/src/backend/optimizer/plan/setrefs.c
+++ b/src/backend/optimizer/plan/setrefs.c
@@ -252,6 +252,7 @@ set_plan_references(PlannerInfo *root, Plan *plan)
 	PlannerGlobal *glob = root->glob;
 	int			rtoffset = list_length(glob->finalrtable);
 	ListCell   *lc;
+	Plan		*finalPlan;
 
 	/*
 	 * Add all the query's RTEs to the flattened rangetable.  The live ones
@@ -302,7 +303,16 @@ set_plan_references(PlannerInfo *root, Plan *plan)
 	}
 
 	/* Now fix the Plan tree */
-	return set_plan_refs(root, plan, rtoffset);
+	finalPlan = set_plan_refs(root, plan, rtoffset);
+	if (finalPlan != NULL && IsA(finalPlan, Gather))
+	{
+		Plan *subplan = outerPlan(finalPlan);
+		if (IsA(subplan, ModifyTable) && castNode(ModifyTable, subplan)->returningLists != NULL)
+		{
+			finalPlan->targetlist = outerPlan(finalPlan)->targetlist;
+		}
+	}
+	return finalPlan;
 }
 
 /*
diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c
index c1fc866..4a9c3fa 100644
--- a/src/backend/optimizer/util/pathnode.c
+++ b/src/backend/optimizer/util/pathnode.c
@@ -3538,11 +3538,11 @@ create_modifytable_path(PlannerInfo *root, RelOptInfo *rel,
 						List *subroots,
 						List *withCheckOptionLists, List *returningLists,
 						List *rowMarks, OnConflictExpr *onconflict,
-						int epqParam)
+						int epqParam,
+						int parallel_workers)
 {
+	ListCell *lc;
 	ModifyTablePath *pathnode = makeNode(ModifyTablePath);
-	double		total_size;
-	ListCell   *lc;
 
 	Assert(list_length(resultRelations) == list_length(subpaths));
 	Assert(list_length(resultRelations) == list_length(subroots));
@@ -3557,45 +3557,22 @@ create_modifytable_path(PlannerInfo *root, RelOptInfo *rel,
 	pathnode->path.pathtarget = rel->reltarget;
 	/* For now, assume we are above any joins, so no parameterization */
 	pathnode->path.param_info = NULL;
-	pathnode->path.parallel_aware = false;
-	pathnode->path.parallel_safe = false;
-	pathnode->path.parallel_workers = 0;
-	pathnode->path.pathkeys = NIL;
-
-	/*
-	 * Compute cost & rowcount as sum of subpath costs & rowcounts.
-	 *
-	 * Currently, we don't charge anything extra for the actual table
-	 * modification work, nor for the WITH CHECK OPTIONS or RETURNING
-	 * expressions if any.  It would only be window dressing, since
-	 * ModifyTable is always a top-level node and there is no way for the
-	 * costs to change any higher-level planning choices.  But we might want
-	 * to make it look better sometime.
-	 */
-	pathnode->path.startup_cost = 0;
-	pathnode->path.total_cost = 0;
-	pathnode->path.rows = 0;
-	total_size = 0;
-	foreach(lc, subpaths)
+	pathnode->path.parallel_aware = parallel_workers > 0 ? true : false;
+	pathnode->path.parallel_safe = rel->consider_parallel;
+	if (rel->consider_parallel)
 	{
-		Path	   *subpath = (Path *) lfirst(lc);
-
-		if (lc == list_head(subpaths))	/* first node? */
-			pathnode->path.startup_cost = subpath->startup_cost;
-		pathnode->path.total_cost += subpath->total_cost;
-		pathnode->path.rows += subpath->rows;
-		total_size += subpath->pathtarget->width * subpath->rows;
+		foreach (lc, subpaths)
+		{
+			Path *sp = (Path *)lfirst(lc);
+			if (!sp->parallel_safe)
+			{
+				pathnode->path.parallel_safe = false;
+				break;
+			}
+		}
 	}
-
-	/*
-	 * Set width to the average width of the subpath outputs.  XXX this is
-	 * totally wrong: we should report zero if no RETURNING, else an average
-	 * of the RETURNING tlist widths.  But it's what happened historically,
-	 * and improving it is a task for another day.
-	 */
-	if (pathnode->path.rows > 0)
-		total_size /= pathnode->path.rows;
-	pathnode->path.pathtarget->width = rint(total_size);
+	pathnode->path.parallel_workers = parallel_workers;
+	pathnode->path.pathkeys = NIL;
 
 	pathnode->operation = operation;
 	pathnode->canSetTag = canSetTag;
@@ -3611,6 +3588,8 @@ create_modifytable_path(PlannerInfo *root, RelOptInfo *rel,
 	pathnode->onconflict = onconflict;
 	pathnode->epqParam = epqParam;
 
+	cost_modifytable(pathnode);
+
 	return pathnode;
 }
 
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index df1b43a..96295bc 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -385,6 +385,7 @@ extern FullTransactionId GetTopFullTransactionId(void);
 extern FullTransactionId GetTopFullTransactionIdIfAny(void);
 extern FullTransactionId GetCurrentFullTransactionId(void);
 extern FullTransactionId GetCurrentFullTransactionIdIfAny(void);
+extern void SetCurrentCommandIdUsedForWorker(void);
 extern void MarkCurrentTransactionIdLoggedIfAny(void);
 extern bool SubTransactionIsActive(SubTransactionId subxid);
 extern CommandId GetCurrentCommandId(bool used);
diff --git a/src/include/executor/execParallel.h b/src/include/executor/execParallel.h
index 5a39a5b..afb8a57 100644
--- a/src/include/executor/execParallel.h
+++ b/src/include/executor/execParallel.h
@@ -27,6 +27,7 @@ typedef struct ParallelExecutorInfo
 	ParallelContext *pcxt;		/* parallel context we're using */
 	BufferUsage *buffer_usage;	/* points to bufusage area in DSM */
 	WalUsage   *wal_usage;		/* walusage area in DSM */
+	uint64   *processed_count;	/* processed tuple count area in DSM */
 	SharedExecutorInstrumentation *instrumentation; /* optional */
 	struct SharedJitInstrumentation *jit_instrumentation;	/* optional */
 	dsa_area   *area;			/* points to DSA area in DSM */
diff --git a/src/include/executor/nodeModifyTable.h b/src/include/executor/nodeModifyTable.h
index 4ec4ebd..fbddee2 100644
--- a/src/include/executor/nodeModifyTable.h
+++ b/src/include/executor/nodeModifyTable.h
@@ -20,5 +20,6 @@ extern void ExecComputeStoredGenerated(EState *estate, TupleTableSlot *slot, Cmd
 extern ModifyTableState *ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags);
 extern void ExecEndModifyTable(ModifyTableState *node);
 extern void ExecReScanModifyTable(ModifyTableState *node);
-
+extern void fireBSTriggers(ModifyTableState *node);
+extern void fireASTriggers(ModifyTableState *node);
 #endif							/* NODEMODIFYTABLE_H */
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index ef448d6..cc744b6 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -1163,7 +1163,8 @@ typedef struct ModifyTableState
 										 * table root) */
 	List	  **mt_arowmarks;	/* per-subplan ExecAuxRowMark lists */
 	EPQState	mt_epqstate;	/* for evaluating EvalPlanQual rechecks */
-	bool		fireBSTriggers; /* do we need to fire stmt triggers? */
+	bool		fireBSTriggers; /* do we need to fire before stmt triggers? */
+	bool		fireASTriggers; /* do we need to fire after stmt triggers? */
 
 	/*
 	 * Slot for storing tuples in the root partitioned table's rowtype during
diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h
index 6141654..fafa087 100644
--- a/src/include/optimizer/cost.h
+++ b/src/include/optimizer/cost.h
@@ -69,6 +69,7 @@ extern PGDLLIMPORT int constraint_exclusion;
 
 extern double index_pages_fetched(double tuples_fetched, BlockNumber pages,
 								  double index_pages, PlannerInfo *root);
+extern void cost_modifytable(ModifyTablePath *path);
 extern void cost_seqscan(Path *path, PlannerInfo *root, RelOptInfo *baserel,
 						 ParamPathInfo *param_info);
 extern void cost_samplescan(Path *path, PlannerInfo *root, RelOptInfo *baserel,
diff --git a/src/include/optimizer/pathnode.h b/src/include/optimizer/pathnode.h
index 715a24a..2d08f0c 100644
--- a/src/include/optimizer/pathnode.h
+++ b/src/include/optimizer/pathnode.h
@@ -264,7 +264,8 @@ extern ModifyTablePath *create_modifytable_path(PlannerInfo *root,
 												List *subroots,
 												List *withCheckOptionLists, List *returningLists,
 												List *rowMarks, OnConflictExpr *onconflict,
-												int epqParam);
+												int epqParam,
+												int parallel_workers);
 extern LimitPath *create_limit_path(PlannerInfo *root, RelOptInfo *rel,
 									Path *subpath,
 									Node *limitOffset, Node *limitCount,
