On Wed, Dec 9, 2020 at 10:16 AM Dilip Kumar <dilipbal...@gmail.com> wrote:
>
> On Tue, Dec 8, 2020 at 6:24 PM Hou, Zhijie <houzj.f...@cn.fujitsu.com> wrote:
> >
> > > > I'm not quite sure how to address this. Can we not allow the planner
> > > > to consider that the select is for CTAS and check only after the
> > > > planning is done for the Gather node and other checks?
> > > >
> > >
> > > IIUC, you are saying that we should not influence the cost of gather node
> > > even when the insertion would be done by workers? I think that should be
> > > our fallback option anyway but that might miss some paths to be considered
> > > parallel where the cost becomes more due to parallel_tuple_cost (aka tuple
> > > transfer cost). I think the idea is we can avoid the tuple transfer cost
> > > only when Gather is the top node because only at that time we can push
> > > insertion down, right? How about if we have some way to detect the same
> > > before calling generate_useful_gather_paths()? I think when we are calling
> > > apply_scanjoin_target_to_paths() in grouping_planner(), if the
> > > query_level is 1, it is for CTAS, and it doesn't have a chance to create
> > > UPPER_REL (doesn't have grouping, order, limit, etc clause) then we can
> > > probably assume that the Gather will be top_node. I am not sure about this
> > > but I think it is worth exploring.
> > >
> >
> > I took a look at the parallel insert patch and have the same idea.
> > https://commitfest.postgresql.org/31/2844/
> >
> >          * Consider generating Gather or Gather Merge paths.  We must only 
> > do this
> >          * if the relation is parallel safe, and we don't do it for child 
> > rels to
> >          * avoid creating multiple Gather nodes within the same plan. We 
> > must do
> >          * this after all paths have been generated and before 
> > set_cheapest, since
> >          * one of the generated paths may turn out to be the cheapest one.
> >          */
> >         if (rel->consider_parallel && !IS_OTHER_REL(rel))
> >                 generate_useful_gather_paths(root, rel, false);
> >
> > IMO Gatherpath created here seems the right one which can possible ignore 
> > parallel cost if in CTAS.
> > But We need check the following parse option which will create path to be 
> > the parent of Gatherpath here.
> >
> > if (root->parse->rowMarks)
> > if (limit_needed(root->parse))
> > if (root->parse->sortClause)
> > if (root->parse->distinctClause)
> > if (root->parse->hasWindowFuncs)
> > if (root->parse->groupClause || root->parse->groupingSets || 
> > root->parse->hasAggs || root->root->hasHavingQual)
> >
>
> Yeah, and as I pointed earlier, along with this we also need to
> consider that the RelOptInfo must be the final target(top level rel).
>

Attaching v10 patch set that includes the change suggested above for
ignoring parallel tuple cost and also few more test cases. I split the
patch as per Amit's suggestion. v10-0001 contains parallel inserts
code without planner tuple cost changes and test cases. v10-0002 has
required changes for ignoring planner tuple cost calculations.

Please review it further.

After the review and addressing all the comments, I plan to make some
code common so that it can be used for Parallel Inserts in REFRESH
MATERIALIZED VIEW. Thoughts?

With Regards,
Bharath Rupireddy.
EnterpriseDB: http://www.enterprisedb.com
From 2939b2c51bff3548ea15c9054f9e2ab661dddd9b Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddy@enterprisedb.com>
Date: Thu, 10 Dec 2020 05:38:35 +0530
Subject: [PATCH v10] Parallel Inserts in CREATE TABLE AS

The idea of this patch is to allow the leader and each worker
insert the tuples in parallel if the SELECT part of the CTAS is
parallelizable.

The design:
Let the planner know that the SELECT is from CTAS in createas.c
so that it can set the number of tuples transferred from the
workers to Gather node to 0. With this change, there are chances
that the planner may choose the parallel plan. After the planning,
check if the upper plan node is Gather in createas.c and mark a
parallelism flag in the CTAS dest receiver. Pass the into clause,
object id, command id from the leader to workers, so that each
worker can create its own CTAS dest receiver. Leader inserts its
share of tuples if instructed to do, and so are workers. Each
worker writes atomically its number of inserted tuples into a
shared memory variable, the leader combines this with its own
number of inserted tuples and shares to the client.
---
 src/backend/access/heap/heapam.c             |  11 -
 src/backend/access/transam/xact.c            |  30 +-
 src/backend/commands/createas.c              | 332 ++++++++----
 src/backend/commands/explain.c               |  32 ++
 src/backend/executor/execParallel.c          |  70 ++-
 src/backend/executor/nodeGather.c            | 113 ++++-
 src/backend/executor/nodeGatherMerge.c       |   4 +-
 src/include/access/xact.h                    |   1 +
 src/include/commands/createas.h              |  28 ++
 src/include/executor/execParallel.h          |   6 +-
 src/include/nodes/execnodes.h                |   3 +
 src/test/regress/expected/write_parallel.out | 504 +++++++++++++++++++
 src/test/regress/sql/write_parallel.sql      | 180 +++++++
 13 files changed, 1174 insertions(+), 140 deletions(-)

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index a9583f3103..86347ba273 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -2043,17 +2043,6 @@ static HeapTuple
 heap_prepare_insert(Relation relation, HeapTuple tup, TransactionId xid,
 					CommandId cid, int options)
 {
-	/*
-	 * To allow parallel inserts, we need to ensure that they are safe to be
-	 * performed in workers. We have the infrastructure to allow parallel
-	 * inserts in general except for the cases where inserts generate a new
-	 * CommandId (eg. inserts into a table having a foreign key column).
-	 */
-	if (IsParallelWorker())
-		ereport(ERROR,
-				(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
-				 errmsg("cannot insert tuples in a parallel worker")));
-
 	tup->t_data->t_infomask &= ~(HEAP_XACT_MASK);
 	tup->t_data->t_infomask2 &= ~(HEAP2_XACT_MASK);
 	tup->t_data->t_infomask |= HEAP_XMAX_INVALID;
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 9cd0b7c11b..db6eedd635 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -763,18 +763,34 @@ GetCurrentCommandId(bool used)
 	/* this is global to a transaction, not subtransaction-local */
 	if (used)
 	{
-		/*
-		 * Forbid setting currentCommandIdUsed in a parallel worker, because
-		 * we have no provision for communicating this back to the leader.  We
-		 * could relax this restriction when currentCommandIdUsed was already
-		 * true at the start of the parallel operation.
-		 */
-		Assert(!IsParallelWorker());
+		 /*
+		  * This is a temporary hack for all common parallel insert cases i.e.
+		  * insert into, ctas, copy from. To be changed later. In a parallel
+		  * worker, set currentCommandIdUsed to true only if it was not set to
+		  * true at the start of the parallel operation (by way of
+		  * SetCurrentCommandIdUsedForWorker()). We have to do this because
+		  * GetCurrentCommandId(true) may be called from anywhere, especially
+		  * for parallel inserts, within parallel worker.
+		  */
+		Assert(!(IsParallelWorker() && !currentCommandIdUsed));
 		currentCommandIdUsed = true;
 	}
 	return currentCommandId;
 }
 
+/*
+ *	SetCurrentCommandIdUsedForWorker
+ *
+ * For a parallel worker, record that the currentCommandId has been used. This
+ * must only be called at the start of a parallel operation.
+ */
+void
+SetCurrentCommandIdUsedForWorker(void)
+{
+	Assert(IsParallelWorker() && !currentCommandIdUsed && currentCommandId != InvalidCommandId);
+	currentCommandIdUsed = true;
+}
+
 /*
  *	SetParallelStartTimestamps
  *
diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c
index 6bf6c5a310..20f59cc2b8 100644
--- a/src/backend/commands/createas.c
+++ b/src/backend/commands/createas.c
@@ -51,18 +51,6 @@
 #include "utils/rls.h"
 #include "utils/snapmgr.h"
 
-typedef struct
-{
-	DestReceiver pub;			/* publicly-known function pointers */
-	IntoClause *into;			/* target relation specification */
-	/* These fields are filled by intorel_startup: */
-	Relation	rel;			/* relation to write to */
-	ObjectAddress reladdr;		/* address of rel, for ExecCreateTableAs */
-	CommandId	output_cid;		/* cmin to insert in output tuples */
-	int			ti_options;		/* table_tuple_insert performance options */
-	BulkInsertState bistate;	/* bulk insert state */
-} DR_intorel;
-
 /* utility functions for CTAS definition creation */
 static ObjectAddress create_ctas_internal(List *attrList, IntoClause *into);
 static ObjectAddress create_ctas_nodata(List *tlist, IntoClause *into);
@@ -350,6 +338,15 @@ ExecCreateTableAs(ParseState *pstate, CreateTableAsStmt *stmt,
 		/* call ExecutorStart to prepare the plan for execution */
 		ExecutorStart(queryDesc, GetIntoRelEFlags(into));
 
+		/*
+		 * If SELECT part of the CTAS is parallelizable, then make each
+		 * parallel worker insert the tuples that are resulted in its execution
+		 * into the target table. We need plan state to be initialized by the
+		 * executor to decide whether to allow parallel inserts or not.
+		 */
+		if (IsParallelInsertInCTASAllowed(into, queryDesc))
+			SetCTASParallelInsertState(queryDesc);
+
 		/* run the plan to completion */
 		ExecutorRun(queryDesc, ForwardScanDirection, 0L, true);
 
@@ -418,6 +415,9 @@ CreateIntoRelDestReceiver(IntoClause *intoClause)
 	self->pub.rDestroy = intorel_destroy;
 	self->pub.mydest = DestIntoRel;
 	self->into = intoClause;
+	self->is_parallel = false;
+	self->is_parallel_worker = false;
+	self->object_id = InvalidOid;
 	/* other private fields will be set during intorel_startup */
 
 	return (DestReceiver *) self;
@@ -430,121 +430,169 @@ static void
 intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
 {
 	DR_intorel *myState = (DR_intorel *) self;
-	IntoClause *into = myState->into;
-	bool		is_matview;
-	List	   *attrList;
 	ObjectAddress intoRelationAddr;
 	Relation	intoRelationDesc;
-	ListCell   *lc;
-	int			attnum;
 
-	Assert(into != NULL);		/* else somebody forgot to set it */
-
-	/* This code supports both CREATE TABLE AS and CREATE MATERIALIZED VIEW */
-	is_matview = (into->viewQuery != NULL);
+	if (myState->is_parallel_worker)
+	{
+		/* In the worker */
+		intoRelationDesc = table_open(myState->object_id, AccessExclusiveLock);
+		myState->rel = intoRelationDesc;
+		myState->reladdr = InvalidObjectAddress;
+		myState->ti_options = 0;
+		myState->bistate = GetBulkInsertState();
 
-	/*
-	 * Build column definitions using "pre-cooked" type and collation info. If
-	 * a column name list was specified in CREATE TABLE AS, override the
-	 * column names derived from the query.  (Too few column names are OK, too
-	 * many are not.)
-	 */
-	attrList = NIL;
-	lc = list_head(into->colNames);
-	for (attnum = 0; attnum < typeinfo->natts; attnum++)
+		/*
+		 * Right after the table is created in the leader, the command id is
+		 * incremented (in create_ctas_internal()). The new command id is
+		 * marked as used in intorel_startup(), then the parallel mode is
+		 * entered. The command id and transaction id are serialized into
+		 * parallel DSM, they are then available to all parallel workers. All
+		 * the workers need to mark the command id as used before insertion.
+		 */
+		SetCurrentCommandIdUsedForWorker();
+		myState->output_cid = GetCurrentCommandId(false);
+	}
+	else
 	{
-		Form_pg_attribute attribute = TupleDescAttr(typeinfo, attnum);
-		ColumnDef  *col;
-		char	   *colname;
+		IntoClause *into = myState->into;
+		bool		is_matview;
+		List	   *attrList;
+		ListCell   *lc;
+		int			attnum;
+
+		Assert(into != NULL);		/* else somebody forgot to set it */
+
+		/*
+		 * This code supports both CREATE TABLE AS and CREATE MATERIALIZED
+		 * VIEW.
+		 */
+		is_matview = (into->viewQuery != NULL);
 
-		if (lc)
+		/*
+		 * Build column definitions using "pre-cooked" type and collation info.
+		 * If a column name list was specified in CREATE TABLE AS, override the
+		 * column names derived from the query.  (Too few column names are OK,
+		 * too many are not.)
+		 */
+		attrList = NIL;
+		lc = list_head(into->colNames);
+		for (attnum = 0; attnum < typeinfo->natts; attnum++)
 		{
-			colname = strVal(lfirst(lc));
-			lc = lnext(into->colNames, lc);
+			Form_pg_attribute attribute = TupleDescAttr(typeinfo, attnum);
+			ColumnDef  *col;
+			char	   *colname;
+
+			if (lc)
+			{
+				colname = strVal(lfirst(lc));
+				lc = lnext(into->colNames, lc);
+			}
+			else
+				colname = NameStr(attribute->attname);
+
+			col = makeColumnDef(colname,
+								attribute->atttypid,
+								attribute->atttypmod,
+								attribute->attcollation);
+
+			/*
+			 * It's possible that the column is of a collatable type but the
+			 * collation could not be resolved, so double-check.  (We must
+			 * check this here because DefineRelation would adopt the type's
+			 * default collation rather than complaining.)
+			 */
+			if (!OidIsValid(col->collOid) &&
+				type_is_collatable(col->typeName->typeOid))
+				ereport(ERROR,
+						(errcode(ERRCODE_INDETERMINATE_COLLATION),
+						errmsg("no collation was derived for column \"%s\" with collatable type %s",
+								col->colname,
+								format_type_be(col->typeName->typeOid)),
+						errhint("Use the COLLATE clause to set the collation explicitly.")));
+
+			attrList = lappend(attrList, col);
 		}
-		else
-			colname = NameStr(attribute->attname);
 
-		col = makeColumnDef(colname,
-							attribute->atttypid,
-							attribute->atttypmod,
-							attribute->attcollation);
+		if (lc != NULL)
+			ereport(ERROR,
+					(errcode(ERRCODE_SYNTAX_ERROR),
+					errmsg("too many column names were specified")));
 
 		/*
-		 * It's possible that the column is of a collatable type but the
-		 * collation could not be resolved, so double-check.  (We must check
-		 * this here because DefineRelation would adopt the type's default
-		 * collation rather than complaining.)
+		 * Actually create the target table
 		 */
-		if (!OidIsValid(col->collOid) &&
-			type_is_collatable(col->typeName->typeOid))
-			ereport(ERROR,
-					(errcode(ERRCODE_INDETERMINATE_COLLATION),
-					 errmsg("no collation was derived for column \"%s\" with collatable type %s",
-							col->colname,
-							format_type_be(col->typeName->typeOid)),
-					 errhint("Use the COLLATE clause to set the collation explicitly.")));
+		intoRelationAddr = create_ctas_internal(attrList, into);
 
-		attrList = lappend(attrList, col);
-	}
+		/*
+		 * Finally we can open the target table
+		 */
+		intoRelationDesc = table_open(intoRelationAddr.objectId, AccessExclusiveLock);
 
-	if (lc != NULL)
-		ereport(ERROR,
-				(errcode(ERRCODE_SYNTAX_ERROR),
-				 errmsg("too many column names were specified")));
+		/*
+		 * Make sure the constructed table does not have RLS enabled.
+		 *
+		 * check_enable_rls() will ereport(ERROR) itself if the user has
+		 * requested something invalid, and otherwise will return RLS_ENABLED
+		 * if RLS should be enabled here.  We don't actually support that
+		 * currently, so throw our own ereport(ERROR) if that happens.
+		 */
+		if (check_enable_rls(intoRelationAddr.objectId, InvalidOid, false) == RLS_ENABLED)
+			ereport(ERROR,
+					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+					errmsg("policies not yet implemented for this command")));
 
-	/*
-	 * Actually create the target table
-	 */
-	intoRelationAddr = create_ctas_internal(attrList, into);
+		/*
+		 * Tentatively mark the target as populated, if it's a matview and
+		 * we're going to fill it; otherwise, no change needed.
+		 */
+		if (is_matview && !into->skipData)
+			SetMatViewPopulatedState(intoRelationDesc, true);
 
-	/*
-	 * Finally we can open the target table
-	 */
-	intoRelationDesc = table_open(intoRelationAddr.objectId, AccessExclusiveLock);
+		/*
+		 * Fill private fields of myState for use by later routines
+		 */
+		myState->rel = intoRelationDesc;
+		myState->reladdr = intoRelationAddr;
+		myState->output_cid = GetCurrentCommandId(true);
+		myState->ti_options = TABLE_INSERT_SKIP_FSM;
 
-	/*
-	 * Make sure the constructed table does not have RLS enabled.
-	 *
-	 * check_enable_rls() will ereport(ERROR) itself if the user has requested
-	 * something invalid, and otherwise will return RLS_ENABLED if RLS should
-	 * be enabled here.  We don't actually support that currently, so throw
-	 * our own ereport(ERROR) if that happens.
-	 */
-	if (check_enable_rls(intoRelationAddr.objectId, InvalidOid, false) == RLS_ENABLED)
-		ereport(ERROR,
-				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-				 errmsg("policies not yet implemented for this command")));
+		/*
+		 * If WITH NO DATA is specified, there is no need to set up the state
+		 * for bulk inserts as there are no tuples to insert.
+		 */
+		if (!into->skipData)
+			myState->bistate = GetBulkInsertState();
+		else
+			myState->bistate = NULL;
 
-	/*
-	 * Tentatively mark the target as populated, if it's a matview and we're
-	 * going to fill it; otherwise, no change needed.
-	 */
-	if (is_matview && !into->skipData)
-		SetMatViewPopulatedState(intoRelationDesc, true);
+		if (myState->is_parallel)
+		{
+			myState->object_id = intoRelationAddr.objectId;
 
-	/*
-	 * Fill private fields of myState for use by later routines
-	 */
-	myState->rel = intoRelationDesc;
-	myState->reladdr = intoRelationAddr;
-	myState->output_cid = GetCurrentCommandId(true);
-	myState->ti_options = TABLE_INSERT_SKIP_FSM;
+			/*
+			 * We don't need to skip contacting FSM while inserting tuples
+			 * for parallel mode, while extending the relations, workers
+			 * instead of blocking on a page while another worker is inserting,
+			 * can check the FSM for another page that can accommodate the
+			 * tuples. This results in major benefit for parallel inserts.
+			 */
+			myState->ti_options = 0;
 
-	/*
-	 * If WITH NO DATA is specified, there is no need to set up the state for
-	 * bulk inserts as there are no tuples to insert.
-	 */
-	if (!into->skipData)
-		myState->bistate = GetBulkInsertState();
-	else
-		myState->bistate = NULL;
+			/*
+			 * rd_createSubid is marked invalid, otherwise, the table is not
+			 * allowed to be extended by the workers.
+			 */
+			myState->rel->rd_createSubid = InvalidSubTransactionId;
+		}
 
-	/*
-	 * Valid smgr_targblock implies something already wrote to the relation.
-	 * This may be harmless, but this function hasn't planned for it.
-	 */
-	Assert(RelationGetTargetBlock(intoRelationDesc) == InvalidBlockNumber);
+		/*
+		 * Valid smgr_targblock implies something already wrote to the
+		 * relation. This may be harmless, but this function hasn't planned for
+		 * it.
+		 */
+		Assert(RelationGetTargetBlock(intoRelationDesc) == InvalidBlockNumber);
+	}
 }
 
 /*
@@ -606,3 +654,75 @@ intorel_destroy(DestReceiver *self)
 {
 	pfree(self);
 }
+
+/*
+ * IsParallelInsertInCTASAllowed --- determine whether or not parallel
+ * insertion is possible.
+ */
+bool IsParallelInsertInCTASAllowed(IntoClause *into, QueryDesc *queryDesc)
+{
+	if (!IS_CTAS(into))
+		return false;
+
+	/*
+	 * Do not allow parallel inserts if the table is temporary. As the
+	 * temporary tables are backend local, workers can not know about them.
+	 * Currently, CTAS supports creation of normal(logged), temporary and
+	 * unlogged tables. It does not support foreign or partition table
+	 * creation. Hence the check for temporary table is enough here.
+	 */
+	if (!into->rel || into->rel->relpersistence == RELPERSISTENCE_TEMP)
+		return false;
+
+	if (queryDesc)
+	{
+		PlanState  *ps = queryDesc->planstate;
+		PlannedStmt *plannedstmt = queryDesc->plannedstmt;
+		bool  allow;
+
+		/*
+		 * We allow parallel inserts by the workers only if the Gather node has
+		 * no projections to perform and if the upper node is Gather. In case,
+		 * the Gather node has projections, which is possible if there are any
+		 * subplans in the query, the workers can not do those projections. And
+		 * when the upper node is GatherMerge, then the leader has to perform
+		 * the final phase i.e. merge the results by workers.
+		 */
+		allow = ps && IsA(ps, GatherState) && !ps->ps_ProjInfo &&
+				plannedstmt->parallelModeNeeded &&
+				plannedstmt->planTree &&
+				IsA(plannedstmt->planTree, Gather) &&
+				plannedstmt->planTree->lefttree &&
+				plannedstmt->planTree->lefttree->parallel_aware &&
+				plannedstmt->planTree->lefttree->parallel_safe;
+
+		return allow;
+	}
+
+	return true;
+}
+
+/*
+ * SetCTASParallelInsertState --- set the required info for the parallel
+ * inserts, that is required in the plan exection.
+ */
+void SetCTASParallelInsertState(QueryDesc *queryDesc)
+{
+	GatherState *gstate = (GatherState *) queryDesc->planstate;
+
+	/*
+	 * For parallelizing inserts in CTAS i.e. making each parallel worker
+	 * insert the tuples, we must send information such as intoclause(for
+	 * eachworker to build separate dest receiver), object id(for each worker
+	 * to open the created table).
+	 */
+	((DR_intorel *) queryDesc->dest)->is_parallel = true;
+	gstate->dest = queryDesc->dest;
+
+	/*
+	 * Since there are no rows that are transferred from workers to Gather
+	 * node, so we set it to 0 to be visible in estimated row count of explain
+	 * plans.
+	 */
+	queryDesc->plannedstmt->planTree->plan_rows = 0;
+}
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index 43f9b01e83..03ac29cd64 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -556,6 +556,15 @@ ExplainOnePlan(PlannedStmt *plannedstmt, IntoClause *into, ExplainState *es,
 	/* call ExecutorStart to prepare the plan for execution */
 	ExecutorStart(queryDesc, eflags);
 
+	/*
+	 * If SELECT part of the CTAS is parallelizable, then make each parallel
+	 * worker insert the tuples that are resulted in its execution into the
+	 * target table. We need plan state to be initialized by the executor to
+	 * decide whether to allow parallel inserts or not.
+	 */
+	if (IsParallelInsertInCTASAllowed(into, queryDesc))
+		SetCTASParallelInsertState(queryDesc);
+
 	/* Execute the plan for statistics if asked for */
 	if (es->analyze)
 	{
@@ -1775,6 +1784,29 @@ ExplainNode(PlanState *planstate, List *ancestors,
 
 				if (gather->single_copy || es->format != EXPLAIN_FORMAT_TEXT)
 					ExplainPropertyBool("Single Copy", gather->single_copy, es);
+
+				/*
+				 * Show the create table information under Gather node in case
+				 * parallel workers have inserted the rows.
+				 */
+				if (IsA(planstate, GatherState))
+				{
+					GatherState *gstate = (GatherState *) planstate;
+
+					if (IS_PARALLEL_CTAS_DEST(gstate->dest) &&
+						((DR_intorel *) gstate->dest)->into->rel &&
+						((DR_intorel *) gstate->dest)->into->rel->relname)
+					{
+						es->indent--;
+						ExplainIndentText(es);
+						appendStringInfoString(es->str, "->  ");
+						appendStringInfoString(es->str, "Create ");
+						appendStringInfo(es->str, "%s\n",
+							((DR_intorel *) gstate->dest)->into->rel->relname);
+						ExplainIndentText(es);
+						es->indent++;
+					}
+				}
 			}
 			break;
 		case T_GatherMerge:
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index befde52691..9ef33eee54 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -23,6 +23,7 @@
 
 #include "postgres.h"
 
+#include "commands/createas.h"
 #include "executor/execParallel.h"
 #include "executor/executor.h"
 #include "executor/nodeAgg.h"
@@ -65,6 +66,7 @@
 #define PARALLEL_KEY_QUERY_TEXT		UINT64CONST(0xE000000000000008)
 #define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009)
 #define PARALLEL_KEY_WAL_USAGE			UINT64CONST(0xE00000000000000A)
+#define PARALLEL_KEY_INTO_CLAUSE		UINT64CONST(0xE00000000000000B)
 
 #define PARALLEL_TUPLE_QUEUE_SIZE		65536
 
@@ -77,6 +79,9 @@ typedef struct FixedParallelExecutorState
 	dsa_pointer param_exec;
 	int			eflags;
 	int			jit_flags;
+	Oid			objectid;		/* workers to open relation/table.  */
+	/* Number tuples inserted by all the workers. */
+	pg_atomic_uint64	processed;
 } FixedParallelExecutorState;
 
 /*
@@ -578,7 +583,8 @@ ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
 ParallelExecutorInfo *
 ExecInitParallelPlan(PlanState *planstate, EState *estate,
 					 Bitmapset *sendParams, int nworkers,
-					 int64 tuples_needed)
+					 int64 tuples_needed, IntoClause *intoclause,
+					 Oid objectid)
 {
 	ParallelExecutorInfo *pei;
 	ParallelContext *pcxt;
@@ -600,6 +606,8 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
 	Size		dsa_minsize = dsa_minimum_size();
 	char	   *query_string;
 	int			query_len;
+	char 		*intoclausestr = NULL;
+	int			intoclause_len = 0;
 
 	/*
 	 * Force any initplan outputs that we're going to pass to workers to be
@@ -712,6 +720,15 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
 	shm_toc_estimate_chunk(&pcxt->estimator, dsa_minsize);
 	shm_toc_estimate_keys(&pcxt->estimator, 1);
 
+	/* Estimate space for into clause for CTAS. */
+	if (IS_CTAS(intoclause) && OidIsValid(objectid))
+	{
+		intoclausestr = nodeToString(intoclause);
+		intoclause_len = strlen(intoclausestr);
+		shm_toc_estimate_chunk(&pcxt->estimator, intoclause_len + 1);
+		shm_toc_estimate_keys(&pcxt->estimator, 1);
+	}
+
 	/* Everyone's had a chance to ask for space, so now create the DSM. */
 	InitializeParallelDSM(pcxt);
 
@@ -729,6 +746,14 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
 	fpes->param_exec = InvalidDsaPointer;
 	fpes->eflags = estate->es_top_eflags;
 	fpes->jit_flags = estate->es_jit_flags;
+	pg_atomic_init_u64(&fpes->processed, 0);
+	pei->processed = &fpes->processed;
+
+	if (intoclausestr && OidIsValid(objectid))
+		fpes->objectid = objectid;
+	else
+		fpes->objectid = InvalidOid;
+
 	shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, fpes);
 
 	/* Store query string */
@@ -758,8 +783,18 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
 	shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage_space);
 	pei->wal_usage = walusage_space;
 
-	/* Set up the tuple queues that the workers will write into. */
-	pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false);
+	if (intoclausestr)
+	{
+		char *intoclause_space = shm_toc_allocate(pcxt->toc,
+												  intoclause_len + 1);
+		memcpy(intoclause_space, intoclausestr, intoclause_len + 1);
+		shm_toc_insert(pcxt->toc, PARALLEL_KEY_INTO_CLAUSE, intoclause_space);
+	}
+	else
+	{
+		/* Set up the tuple queues that the workers will write into. */
+		pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false);
+	}
 
 	/* We don't need the TupleQueueReaders yet, though. */
 	pei->reader = NULL;
@@ -1387,12 +1422,30 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
 	void	   *area_space;
 	dsa_area   *area;
 	ParallelWorkerContext pwcxt;
+	char		*intoclausestr = NULL;
+	IntoClause	*intoclause = NULL;
 
 	/* Get fixed-size state. */
 	fpes = shm_toc_lookup(toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
 
-	/* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */
-	receiver = ExecParallelGetReceiver(seg, toc);
+	intoclausestr = shm_toc_lookup(toc, PARALLEL_KEY_INTO_CLAUSE, true);
+	if (intoclausestr)
+	{
+		/*
+		 * If the worker is for parallel insert in CTAS, then use the proper
+		 * dest receiver.
+		 */
+		intoclause = (IntoClause *) stringToNode(intoclausestr);
+		receiver = CreateIntoRelDestReceiver(intoclause);
+		((DR_intorel *)receiver)->is_parallel_worker = true;
+		((DR_intorel *)receiver)->object_id = fpes->objectid;
+	}
+	else
+	{
+		/* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */
+		receiver = ExecParallelGetReceiver(seg, toc);
+	}
+
 	instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, true);
 	if (instrumentation != NULL)
 		instrument_options = instrumentation->instrument_options;
@@ -1471,6 +1524,13 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
 			queryDesc->estate->es_jit->instr;
 	}
 
+	/*
+	 * Write out the number of tuples this worker has inserted. Leader will use
+	 * it to inform the end client.
+	 */
+	if (intoclausestr)
+		pg_atomic_add_fetch_u64(&fpes->processed, queryDesc->estate->es_processed);
+
 	/* Must do this after capturing instrumentation. */
 	ExecutorEnd(queryDesc);
 
diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c
index a01b46af14..e7c588c66a 100644
--- a/src/backend/executor/nodeGather.c
+++ b/src/backend/executor/nodeGather.c
@@ -32,6 +32,7 @@
 
 #include "access/relscan.h"
 #include "access/xact.h"
+#include "commands/createas.h"
 #include "executor/execdebug.h"
 #include "executor/execParallel.h"
 #include "executor/nodeGather.h"
@@ -48,6 +49,7 @@ static TupleTableSlot *ExecGather(PlanState *pstate);
 static TupleTableSlot *gather_getnext(GatherState *gatherstate);
 static MinimalTuple gather_readnext(GatherState *gatherstate);
 static void ExecShutdownGatherWorkers(GatherState *node);
+static void ExecParallelInsertInCTAS(GatherState *node);
 
 
 /* ----------------------------------------------------------------
@@ -131,6 +133,72 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
 	return gatherstate;
 }
 
+/* ----------------------------------------------------------------
+ *		ExecParallelInsertInCTAS(node)
+ *
+ *		Facilitates parallel inserts by parallel workers and/or
+ *		leader for Create Table AS.
+ * ----------------------------------------------------------------
+ */
+static void
+ExecParallelInsertInCTAS(GatherState *node)
+{
+	/* Enable leader to insert in case no parallel workers were launched. */
+	if (node->nworkers_launched == 0)
+		node->need_to_scan_locally = true;
+
+	/*
+	 * By now, for parallel workers (if launched any), would have started their
+	 * work i.e. insertion to target table. In case the leader is chosen to
+	 * participate for parallel inserts in CTAS, then finish its share before
+	 * going to wait for the parallel workers to finish.
+	 */
+	if (node->need_to_scan_locally)
+	{
+		EState	   *estate = node->ps.state;
+		TupleTableSlot *outerTupleSlot;
+
+		for(;;)
+		{
+			/* Install our DSA area while executing the plan. */
+			estate->es_query_dsa =
+					node->pei ? node->pei->area : NULL;
+
+			outerTupleSlot = ExecProcNode(node->ps.lefttree);
+
+			estate->es_query_dsa = NULL;
+
+			if(TupIsNull(outerTupleSlot))
+				break;
+
+			(void) node->dest->receiveSlot(outerTupleSlot, node->dest);
+
+			node->ps.state->es_processed++;
+		}
+
+		node->need_to_scan_locally = false;
+	}
+
+	if (node->nworkers_launched > 0)
+	{
+		/*
+		 * We wait here for the parallel workers to finish their work and
+		 * accumulate the tuples they inserted and also their buffer/WAL usage.
+		 * We do not destroy the parallel context here, it will be done in
+		 * ExecShutdownGather at the end of the plan. Note that the
+		 * ExecShutdownGatherWorkers call from ExecShutdownGather will be a
+		 * no-op.
+		 */
+		ExecShutdownGatherWorkers(node);
+
+		/*
+		 * Add up the total tuples inserted by all workers, to the tuples
+		 * inserted by the leader(if any). This will be shared to client.
+		 */
+		node->ps.state->es_processed += pg_atomic_read_u64(node->pei->processed);
+	}
+}
+
 /* ----------------------------------------------------------------
  *		ExecGather(node)
  *
@@ -157,6 +225,7 @@ ExecGather(PlanState *pstate)
 	{
 		EState	   *estate = node->ps.state;
 		Gather	   *gather = (Gather *) node->ps.plan;
+		bool		isctas = IS_PARALLEL_CTAS_DEST(node->dest);
 
 		/*
 		 * Sometimes we might have to run without parallelism; but if parallel
@@ -165,6 +234,18 @@ ExecGather(PlanState *pstate)
 		if (gather->num_workers > 0 && estate->es_use_parallel_mode)
 		{
 			ParallelContext *pcxt;
+			IntoClause	*intoclause = NULL;
+			Oid			objectid = InvalidOid;
+
+			/*
+			 * Take the necessary information to be passed to workers for
+			 * parallel inserts in CTAS.
+			 */
+			if (isctas)
+			{
+				intoclause = ((DR_intorel *) node->dest)->into;
+				objectid = ((DR_intorel *) node->dest)->object_id;
+			}
 
 			/* Initialize, or re-initialize, shared state needed by workers. */
 			if (!node->pei)
@@ -172,7 +253,10 @@ ExecGather(PlanState *pstate)
 												 estate,
 												 gather->initParam,
 												 gather->num_workers,
-												 node->tuples_needed);
+												 node->tuples_needed,
+												 /* CTAS info */
+												 intoclause,
+												 objectid);
 			else
 				ExecParallelReinitialize(node->ps.lefttree,
 										 node->pei,
@@ -190,13 +274,16 @@ ExecGather(PlanState *pstate)
 			/* Set up tuple queue readers to read the results. */
 			if (pcxt->nworkers_launched > 0)
 			{
-				ExecParallelCreateReaders(node->pei);
-				/* Make a working array showing the active readers */
-				node->nreaders = pcxt->nworkers_launched;
-				node->reader = (TupleQueueReader **)
-					palloc(node->nreaders * sizeof(TupleQueueReader *));
-				memcpy(node->reader, node->pei->reader,
-					   node->nreaders * sizeof(TupleQueueReader *));
+				if (!isctas)
+				{
+					ExecParallelCreateReaders(node->pei);
+					/* Make a working array showing the active readers */
+					node->nreaders = pcxt->nworkers_launched;
+					node->reader = (TupleQueueReader **)
+						palloc(node->nreaders * sizeof(TupleQueueReader *));
+					memcpy(node->reader, node->pei->reader,
+						node->nreaders * sizeof(TupleQueueReader *));
+				}
 			}
 			else
 			{
@@ -208,9 +295,17 @@ ExecGather(PlanState *pstate)
 		}
 
 		/* Run plan locally if no workers or enabled and not single-copy. */
-		node->need_to_scan_locally = (node->nreaders == 0)
+		node->need_to_scan_locally = (node->nreaders == 0 &&
+			!isctas)
 			|| (!gather->single_copy && parallel_leader_participation);
 		node->initialized = true;
+
+		/* Perform parallel inserts for CTAS. */
+		if (isctas)
+		{
+			ExecParallelInsertInCTAS(node);
+			return NULL;
+		}
 	}
 
 	/*
diff --git a/src/backend/executor/nodeGatherMerge.c b/src/backend/executor/nodeGatherMerge.c
index 47129344f3..ee45272c17 100644
--- a/src/backend/executor/nodeGatherMerge.c
+++ b/src/backend/executor/nodeGatherMerge.c
@@ -216,7 +216,9 @@ ExecGatherMerge(PlanState *pstate)
 												 estate,
 												 gm->initParam,
 												 gm->num_workers,
-												 node->tuples_needed);
+												 node->tuples_needed,
+												 NULL,
+												 InvalidOid);
 			else
 				ExecParallelReinitialize(node->ps.lefttree,
 										 node->pei,
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index 7320de345c..5beae6c617 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -389,6 +389,7 @@ extern FullTransactionId GetCurrentFullTransactionIdIfAny(void);
 extern void MarkCurrentTransactionIdLoggedIfAny(void);
 extern bool SubTransactionIsActive(SubTransactionId subxid);
 extern CommandId GetCurrentCommandId(bool used);
+extern void SetCurrentCommandIdUsedForWorker(void);
 extern void SetParallelStartTimestamps(TimestampTz xact_ts, TimestampTz stmt_ts);
 extern TimestampTz GetCurrentTransactionStartTimestamp(void);
 extern TimestampTz GetCurrentStatementStartTimestamp(void);
diff --git a/src/include/commands/createas.h b/src/include/commands/createas.h
index 7629230254..ab3aab58c5 100644
--- a/src/include/commands/createas.h
+++ b/src/include/commands/createas.h
@@ -14,12 +14,35 @@
 #ifndef CREATEAS_H
 #define CREATEAS_H
 
+#include "access/heapam.h"
 #include "catalog/objectaddress.h"
+#include "executor/execdesc.h"
 #include "nodes/params.h"
+#include "nodes/plannodes.h"
 #include "parser/parse_node.h"
 #include "tcop/dest.h"
 #include "utils/queryenvironment.h"
 
+typedef struct
+{
+	DestReceiver pub;			/* publicly-known function pointers */
+	IntoClause *into;			/* target relation specification */
+	/* These fields are filled by intorel_startup: */
+	Relation	rel;			/* relation to write to */
+	ObjectAddress reladdr;		/* address of rel, for ExecCreateTableAs */
+	CommandId	output_cid;		/* cmin to insert in output tuples */
+	int			ti_options;		/* table_tuple_insert performance options */
+	BulkInsertState bistate;	/* bulk insert state */
+	bool		is_parallel;	/* is parallelism to be considered? */
+	bool		is_parallel_worker; /* true for parallel worker */
+	/* Used for table open by parallel worker. */
+	Oid			object_id;
+} DR_intorel;
+
+#define IS_CTAS(intoclause) (intoclause && IsA(intoclause, IntoClause))
+#define IS_PARALLEL_CTAS_DEST(dest) (dest && dest->mydest == DestIntoRel && \
+									 IS_CTAS(((DR_intorel *) dest)->into) && \
+									 ((DR_intorel *) dest)->is_parallel)
 
 extern ObjectAddress ExecCreateTableAs(ParseState *pstate, CreateTableAsStmt *stmt,
 									   ParamListInfo params, QueryEnvironment *queryEnv,
@@ -29,4 +52,9 @@ extern int	GetIntoRelEFlags(IntoClause *intoClause);
 
 extern DestReceiver *CreateIntoRelDestReceiver(IntoClause *intoClause);
 
+extern bool IsParallelInsertInCTASAllowed(IntoClause *into,
+										  QueryDesc *queryDesc);
+
+extern void SetCTASParallelInsertState(QueryDesc *queryDesc);
+
 #endif							/* CREATEAS_H */
diff --git a/src/include/executor/execParallel.h b/src/include/executor/execParallel.h
index 5a39a5b29c..9f959f741b 100644
--- a/src/include/executor/execParallel.h
+++ b/src/include/executor/execParallel.h
@@ -35,11 +35,15 @@ typedef struct ParallelExecutorInfo
 	/* These two arrays have pcxt->nworkers_launched entries: */
 	shm_mq_handle **tqueue;		/* tuple queues for worker output */
 	struct TupleQueueReader **reader;	/* tuple reader/writer support */
+	/* Number of tuples inserted by all workers. */
+	volatile pg_atomic_uint64	*processed;
 } ParallelExecutorInfo;
 
 extern ParallelExecutorInfo *ExecInitParallelPlan(PlanState *planstate,
 												  EState *estate, Bitmapset *sendParam, int nworkers,
-												  int64 tuples_needed);
+												  int64 tuples_needed,
+												  IntoClause *intoclause,
+												  Oid objectid);
 extern void ExecParallelCreateReaders(ParallelExecutorInfo *pei);
 extern void ExecParallelFinish(ParallelExecutorInfo *pei);
 extern void ExecParallelCleanup(ParallelExecutorInfo *pei);
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 61ba4c3666..e9c4442c22 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -23,6 +23,7 @@
 #include "nodes/tidbitmap.h"
 #include "partitioning/partdefs.h"
 #include "storage/condition_variable.h"
+#include "tcop/dest.h"
 #include "utils/hsearch.h"
 #include "utils/queryenvironment.h"
 #include "utils/reltrigger.h"
@@ -2326,6 +2327,8 @@ typedef struct GatherState
 	int			nreaders;		/* number of still-active workers */
 	int			nextreader;		/* next one to try to read from */
 	struct TupleQueueReader **reader;	/* array with nreaders active entries */
+	/* Parallel inserts in CTAS related info is specified below. */
+	DestReceiver *dest;
 } GatherState;
 
 /* ----------------
diff --git a/src/test/regress/expected/write_parallel.out b/src/test/regress/expected/write_parallel.out
index 0c4da2591a..8831597d54 100644
--- a/src/test/regress/expected/write_parallel.out
+++ b/src/test/regress/expected/write_parallel.out
@@ -76,4 +76,508 @@ explain (costs off) create table parallel_write as execute prep_stmt;
 
 create table parallel_write as execute prep_stmt;
 drop table parallel_write;
+--
+-- Test parallel inserts in create table as/select into/create materialized
+-- view.
+--
+-- parallel inserts must occur
+explain (costs off, analyze on, timing off, summary off)
+create table parallel_write as select length(stringu1) from tenk1;
+                         QUERY PLAN                          
+-------------------------------------------------------------
+ Gather (actual rows=0 loops=1)
+   Workers Planned: 4
+   Workers Launched: 4
+ ->  Create parallel_write
+   ->  Parallel Seq Scan on tenk1 (actual rows=2000 loops=5)
+(5 rows)
+
+select count(*) from parallel_write;
+ count 
+-------
+ 10000
+(1 row)
+
+drop table parallel_write;
+-- parallel inserts must not occur as the table is temporary
+explain (costs off, analyze on, timing off, summary off)
+create temporary table parallel_write as select length(stringu1) from tenk1;
+                         QUERY PLAN                          
+-------------------------------------------------------------
+ Gather (actual rows=10000 loops=1)
+   Workers Planned: 4
+   Workers Launched: 4
+   ->  Parallel Seq Scan on tenk1 (actual rows=2000 loops=5)
+(4 rows)
+
+select count(*) from parallel_write;
+ count 
+-------
+ 10000
+(1 row)
+
+drop table parallel_write;
+-- parallel inserts must occur
+explain (costs off, analyze on, timing off, summary off)
+create unlogged table parallel_write as select length(stringu1) from tenk1;
+                         QUERY PLAN                          
+-------------------------------------------------------------
+ Gather (actual rows=0 loops=1)
+   Workers Planned: 4
+   Workers Launched: 4
+ ->  Create parallel_write
+   ->  Parallel Seq Scan on tenk1 (actual rows=2000 loops=5)
+(5 rows)
+
+select count(*) from parallel_write;
+ count 
+-------
+ 10000
+(1 row)
+
+drop table parallel_write;
+-- parallel inserts must occur
+explain (costs off, analyze on, timing off, summary off)
+select length(stringu1) into parallel_write from tenk1;
+                         QUERY PLAN                          
+-------------------------------------------------------------
+ Gather (actual rows=0 loops=1)
+   Workers Planned: 4
+   Workers Launched: 4
+ ->  Create parallel_write
+   ->  Parallel Seq Scan on tenk1 (actual rows=2000 loops=5)
+(5 rows)
+
+select count(*) from parallel_write;
+ count 
+-------
+ 10000
+(1 row)
+
+drop table parallel_write;
+-- parallel inserts must not occur as the table is temporary
+explain (costs off, analyze on, timing off, summary off)
+select length(stringu1) into temporary parallel_write from tenk1;
+                         QUERY PLAN                          
+-------------------------------------------------------------
+ Gather (actual rows=10000 loops=1)
+   Workers Planned: 4
+   Workers Launched: 4
+   ->  Parallel Seq Scan on tenk1 (actual rows=2000 loops=5)
+(4 rows)
+
+select count(*) from parallel_write;
+ count 
+-------
+ 10000
+(1 row)
+
+drop table parallel_write;
+-- parallel inserts must occur
+explain (costs off, analyze on, timing off, summary off)
+select length(stringu1) into unlogged parallel_write from tenk1;
+                         QUERY PLAN                          
+-------------------------------------------------------------
+ Gather (actual rows=0 loops=1)
+   Workers Planned: 4
+   Workers Launched: 4
+ ->  Create parallel_write
+   ->  Parallel Seq Scan on tenk1 (actual rows=2000 loops=5)
+(5 rows)
+
+select count(*) from parallel_write;
+ count 
+-------
+ 10000
+(1 row)
+
+drop table parallel_write;
+-- parallel inserts must not occur as the parallelism will not be picked
+-- for select part because of for update clause
+explain (costs off, analyze on, timing off, summary off)
+create table parallel_write as select length(stringu1) from tenk1 for update;
+                     QUERY PLAN                      
+-----------------------------------------------------
+ LockRows (actual rows=10000 loops=1)
+   ->  Seq Scan on tenk1 (actual rows=10000 loops=1)
+(2 rows)
+
+select count(*) from parallel_write;
+ count 
+-------
+ 10000
+(1 row)
+
+drop table parallel_write;
+-- parallel inserts must occur
+explain (costs off, analyze on, timing off, summary off)
+create materialized view parallel_mat_view as
+    select length(stringu1) from tenk1;
+                         QUERY PLAN                          
+-------------------------------------------------------------
+ Gather (actual rows=0 loops=1)
+   Workers Planned: 4
+   Workers Launched: 4
+ ->  Create parallel_mat_view
+   ->  Parallel Seq Scan on tenk1 (actual rows=2000 loops=5)
+(5 rows)
+
+select count(*) from parallel_mat_view;
+ count 
+-------
+ 10000
+(1 row)
+
+drop materialized view parallel_mat_view;
+-- parallel inserts must occur
+prepare parallel_write_prep as select length(stringu1) from tenk1;
+explain (costs off, analyze on, timing off, summary off)
+create table parallel_write as execute parallel_write_prep;
+                         QUERY PLAN                          
+-------------------------------------------------------------
+ Gather (actual rows=0 loops=1)
+   Workers Planned: 4
+   Workers Launched: 4
+ ->  Create parallel_write
+   ->  Parallel Seq Scan on tenk1 (actual rows=2000 loops=5)
+(5 rows)
+
+select count(*) from parallel_write;
+ count 
+-------
+ 10000
+(1 row)
+
+deallocate parallel_write_prep;
+drop table parallel_write;
+-- parallel inserts must occur
+explain (costs off, analyze on, timing off, summary off)
+create table parallel_write as select now(), four from tenk1;
+                         QUERY PLAN                          
+-------------------------------------------------------------
+ Gather (actual rows=0 loops=1)
+   Workers Planned: 4
+   Workers Launched: 4
+ ->  Create parallel_write
+   ->  Parallel Seq Scan on tenk1 (actual rows=2000 loops=5)
+(5 rows)
+
+select count(*) from parallel_write;
+ count 
+-------
+ 10000
+(1 row)
+
+drop table parallel_write;
+-- parallel inserts must not occur as the parallelism will not be picked
+-- for select part because of the parallel unsafe function
+create sequence parallel_write_sequence;
+explain (costs off, analyze on, timing off, summary off)
+create table parallel_write as
+    select nextval('parallel_write_sequence'), four from tenk1;
+                  QUERY PLAN                   
+-----------------------------------------------
+ Seq Scan on tenk1 (actual rows=10000 loops=1)
+(1 row)
+
+select count(*) from parallel_write;
+ count 
+-------
+ 10000
+(1 row)
+
+drop table parallel_write;
+drop sequence parallel_write_sequence;
+-- parallel inserts must occur, as there is init plan that gets executed by
+-- each parallel worker
+explain (costs off, analyze on, timing off, summary off)
+create table parallel_write as select two col1,
+    (select two from (select * from tenk2) as tt limit 1) col2
+    from tenk1  where tenk1.four = 3;
+                               QUERY PLAN                               
+------------------------------------------------------------------------
+ Gather (actual rows=0 loops=1)
+   Workers Planned: 4
+   Params Evaluated: $1
+   Workers Launched: 3
+ ->  Create parallel_write
+   InitPlan 1 (returns $1)
+     ->  Limit (actual rows=1 loops=1)
+           ->  Gather (actual rows=1 loops=1)
+                 Workers Planned: 4
+                 Workers Launched: 4
+                 ->  Parallel Seq Scan on tenk2 (actual rows=1 loops=5)
+   ->  Parallel Seq Scan on tenk1 (actual rows=625 loops=4)
+         Filter: (four = 3)
+         Rows Removed by Filter: 1875
+(14 rows)
+
+select count(*) from parallel_write;
+ count 
+-------
+  2500
+(1 row)
+
+drop table parallel_write;
+-- parallel inserts must not occur, as there is sub plan that gets executed by
+-- the Gather node in leader
+explain (costs off, analyze on, timing off, summary off)
+create table parallel_write as select two col1,
+    (select tenk1.two from generate_series(1,1)) col2
+    from tenk1  where tenk1.four = 3;
+                             QUERY PLAN                              
+---------------------------------------------------------------------
+ Gather (actual rows=2500 loops=1)
+   Workers Planned: 4
+   Workers Launched: 4
+   ->  Parallel Seq Scan on tenk1 (actual rows=500 loops=5)
+         Filter: (four = 3)
+         Rows Removed by Filter: 1500
+   SubPlan 1
+     ->  Function Scan on generate_series (actual rows=1 loops=2500)
+(8 rows)
+
+select count(*) from parallel_write;
+ count 
+-------
+  2500
+(1 row)
+
+drop table parallel_write;
+create table temp1(col1) as select * from generate_series(1,5);
+create table temp2(col2) as select * from temp1;
+create table temp3(col3) as select * from temp1;
+-- parallel inserts must not occur, as there is a limit clause
+explain (costs off, analyze on, timing off, summary off)
+create table parallel_write as select * from temp1 limit 4;
+                           QUERY PLAN                           
+----------------------------------------------------------------
+ Limit (actual rows=4 loops=1)
+   ->  Gather (actual rows=4 loops=1)
+         Workers Planned: 3
+         Workers Launched: 3
+         ->  Parallel Seq Scan on temp1 (actual rows=1 loops=4)
+(5 rows)
+
+drop table parallel_write;
+-- parallel inserts must not occur, as there is an order by clause
+explain (costs off, analyze on, timing off, summary off)
+create table parallel_write as select * from temp1 order by 1;
+                           QUERY PLAN                           
+----------------------------------------------------------------
+ Gather Merge (actual rows=5 loops=1)
+   Workers Planned: 3
+   Workers Launched: 3
+   ->  Sort (actual rows=1 loops=4)
+         Sort Key: col1
+         Sort Method: quicksort  Memory: 25kB
+         Worker 0:  Sort Method: quicksort  Memory: 25kB
+         Worker 1:  Sort Method: quicksort  Memory: 25kB
+         Worker 2:  Sort Method: quicksort  Memory: 25kB
+         ->  Parallel Seq Scan on temp1 (actual rows=1 loops=4)
+(10 rows)
+
+drop table parallel_write;
+-- parallel inserts must not occur, as there is an order by clause
+explain (costs off, analyze on, timing off, summary off)
+create table parallel_write as select * from temp1 order by 1;
+                           QUERY PLAN                           
+----------------------------------------------------------------
+ Gather Merge (actual rows=5 loops=1)
+   Workers Planned: 3
+   Workers Launched: 3
+   ->  Sort (actual rows=1 loops=4)
+         Sort Key: col1
+         Sort Method: quicksort  Memory: 25kB
+         Worker 0:  Sort Method: quicksort  Memory: 25kB
+         Worker 1:  Sort Method: quicksort  Memory: 25kB
+         Worker 2:  Sort Method: quicksort  Memory: 25kB
+         ->  Parallel Seq Scan on temp1 (actual rows=1 loops=4)
+(10 rows)
+
+drop table parallel_write;
+-- parallel inserts must not occur, as there is a distinct clause
+explain (costs off, analyze on, timing off, summary off)
+create table parallel_write as select distinct * from temp1;
+                           QUERY PLAN                           
+----------------------------------------------------------------
+ HashAggregate (actual rows=5 loops=1)
+   Group Key: col1
+   Batches: 1  Memory Usage: 40kB
+   ->  Gather (actual rows=5 loops=1)
+         Workers Planned: 3
+         Workers Launched: 3
+         ->  Parallel Seq Scan on temp1 (actual rows=1 loops=4)
+(7 rows)
+
+drop table parallel_write;
+-- parallel inserts must not occur, as there is an aggregate and group clause
+explain (costs off, analyze on, timing off, summary off)
+create table parallel_write as select count(*) from temp1 group by col1;
+                              QUERY PLAN                              
+----------------------------------------------------------------------
+ Finalize HashAggregate (actual rows=5 loops=1)
+   Group Key: col1
+   Batches: 1  Memory Usage: 40kB
+   ->  Gather (actual rows=5 loops=1)
+         Workers Planned: 3
+         Workers Launched: 3
+         ->  Partial HashAggregate (actual rows=1 loops=4)
+               Group Key: col1
+               Batches: 1  Memory Usage: 40kB
+               Worker 0:  Batches: 1  Memory Usage: 40kB
+               Worker 1:  Batches: 1  Memory Usage: 40kB
+               Worker 2:  Batches: 1  Memory Usage: 40kB
+               ->  Parallel Seq Scan on temp1 (actual rows=1 loops=4)
+(13 rows)
+
+drop table parallel_write;
+-- parallel inserts must not occur, as there is an aggregate, group and having
+-- clauses
+explain (costs off, analyze on, timing off, summary off)
+create table parallel_write as
+    select count(col1), (select col3 from
+        (select * from temp3) as tt limit 1) col4 from temp1, temp2
+    where temp1.col1 = temp2.col2 group by col4 having count(col1) > 0;
+                                    QUERY PLAN                                    
+----------------------------------------------------------------------------------
+ Finalize GroupAggregate (actual rows=1 loops=1)
+   Group Key: ($1)
+   Filter: (count(temp1.col1) > 0)
+   InitPlan 1 (returns $1)
+     ->  Limit (actual rows=1 loops=1)
+           ->  Gather (actual rows=1 loops=1)
+                 Workers Planned: 3
+                 Workers Launched: 3
+                 ->  Parallel Seq Scan on temp3 (actual rows=0 loops=4)
+   ->  Gather (actual rows=1 loops=1)
+         Workers Planned: 3
+         Params Evaluated: $1
+         Workers Launched: 3
+         ->  Partial GroupAggregate (actual rows=0 loops=4)
+               Group Key: $1
+               ->  Parallel Hash Join (actual rows=1 loops=4)
+                     Hash Cond: (temp1.col1 = temp2.col2)
+                     ->  Parallel Seq Scan on temp1 (actual rows=5 loops=1)
+                     ->  Parallel Hash (actual rows=1 loops=4)
+                           Buckets: 4096  Batches: 1  Memory Usage: 64kB
+                           ->  Parallel Seq Scan on temp2 (actual rows=5 loops=1)
+(21 rows)
+
+drop table parallel_write;
+-- parallel inserts must not occur, as there is a window function
+explain (costs off, analyze on, timing off, summary off)
+create table parallel_write as
+    select avg(col1) OVER (PARTITION BY col1) from temp1;
+                              QUERY PLAN                              
+----------------------------------------------------------------------
+ WindowAgg (actual rows=5 loops=1)
+   ->  Sort (actual rows=5 loops=1)
+         Sort Key: col1
+         Sort Method: quicksort  Memory: 25kB
+         ->  Gather (actual rows=5 loops=1)
+               Workers Planned: 3
+               Workers Launched: 3
+               ->  Parallel Seq Scan on temp1 (actual rows=1 loops=4)
+(8 rows)
+
+drop table parallel_write;
+-- nested loop join is the top node under which Gather node exists, so parallel
+-- inserts must not occur
+set enable_nestloop to on;
+set enable_mergejoin to off;
+set enable_hashjoin to off;
+explain (costs off, analyze on, timing off, summary off)
+create table parallel_write as
+    select * from temp1, temp2  where temp1.col1 = temp2.col2;
+                              QUERY PLAN                              
+----------------------------------------------------------------------
+ Nested Loop (actual rows=5 loops=1)
+   Join Filter: (temp1.col1 = temp2.col2)
+   Rows Removed by Join Filter: 20
+   ->  Gather (actual rows=5 loops=1)
+         Workers Planned: 3
+         Workers Launched: 3
+         ->  Parallel Seq Scan on temp1 (actual rows=1 loops=4)
+   ->  Materialize (actual rows=5 loops=5)
+         ->  Gather (actual rows=5 loops=1)
+               Workers Planned: 3
+               Workers Launched: 3
+               ->  Parallel Seq Scan on temp2 (actual rows=1 loops=4)
+(12 rows)
+
+select count(*) from parallel_write;
+ count 
+-------
+     5
+(1 row)
+
+drop table parallel_write;
+-- though the top node is Gather under which there exists parallel unsafe merge
+-- join node, so parallel inserts must not occur
+set enable_nestloop to off;
+set enable_mergejoin to on;
+explain (costs off, analyze on, timing off, summary off)
+create table parallel_write as
+    select * from temp1, temp2  where temp1.col1 = temp2.col2;
+                              QUERY PLAN                              
+----------------------------------------------------------------------
+ Gather (actual rows=5 loops=1)
+   Workers Planned: 3
+   Workers Launched: 3
+   ->  Merge Join (actual rows=1 loops=4)
+         Merge Cond: (temp1.col1 = temp2.col2)
+         ->  Sort (actual rows=1 loops=4)
+               Sort Key: temp1.col1
+               Sort Method: quicksort  Memory: 25kB
+               Worker 0:  Sort Method: quicksort  Memory: 25kB
+               Worker 1:  Sort Method: quicksort  Memory: 25kB
+               Worker 2:  Sort Method: quicksort  Memory: 25kB
+               ->  Parallel Seq Scan on temp1 (actual rows=1 loops=4)
+         ->  Sort (actual rows=5 loops=1)
+               Sort Key: temp2.col2
+               Sort Method: quicksort  Memory: 25kB
+               ->  Seq Scan on temp2 (actual rows=5 loops=1)
+(16 rows)
+
+select count(*) from parallel_write;
+ count 
+-------
+     5
+(1 row)
+
+drop table parallel_write;
+-- parallel hash join happens under Gather node, so parallel inserts must occur
+set enable_mergejoin to off;
+set enable_hashjoin to on;
+explain (costs off, analyze on, timing off, summary off)
+create table parallel_write as
+    select * from temp1, temp2  where temp1.col1 = temp2.col2;
+                              QUERY PLAN                              
+----------------------------------------------------------------------
+ Gather (actual rows=0 loops=1)
+   Workers Planned: 3
+   Workers Launched: 3
+ ->  Create parallel_write
+   ->  Parallel Hash Join (actual rows=1 loops=4)
+         Hash Cond: (temp1.col1 = temp2.col2)
+         ->  Parallel Seq Scan on temp1 (actual rows=5 loops=1)
+         ->  Parallel Hash (actual rows=1 loops=4)
+               Buckets: 4096  Batches: 1  Memory Usage: 64kB
+               ->  Parallel Seq Scan on temp2 (actual rows=5 loops=1)
+(10 rows)
+
+select count(*) from parallel_write;
+ count 
+-------
+     5
+(1 row)
+
+drop table parallel_write;
+reset enable_nestloop;
+reset enable_mergejoin;
+reset enable_hashjoin;
+drop table temp1;
+drop table temp2;
+drop table temp3;
 rollback;
diff --git a/src/test/regress/sql/write_parallel.sql b/src/test/regress/sql/write_parallel.sql
index 78b479cedf..6ee80ebd78 100644
--- a/src/test/regress/sql/write_parallel.sql
+++ b/src/test/regress/sql/write_parallel.sql
@@ -39,4 +39,184 @@ explain (costs off) create table parallel_write as execute prep_stmt;
 create table parallel_write as execute prep_stmt;
 drop table parallel_write;
 
+--
+-- Test parallel inserts in create table as/select into/create materialized
+-- view.
+--
+
+-- parallel inserts must occur
+explain (costs off, analyze on, timing off, summary off)
+create table parallel_write as select length(stringu1) from tenk1;
+select count(*) from parallel_write;
+drop table parallel_write;
+
+-- parallel inserts must not occur as the table is temporary
+explain (costs off, analyze on, timing off, summary off)
+create temporary table parallel_write as select length(stringu1) from tenk1;
+select count(*) from parallel_write;
+drop table parallel_write;
+
+-- parallel inserts must occur
+explain (costs off, analyze on, timing off, summary off)
+create unlogged table parallel_write as select length(stringu1) from tenk1;
+select count(*) from parallel_write;
+drop table parallel_write;
+
+-- parallel inserts must occur
+explain (costs off, analyze on, timing off, summary off)
+select length(stringu1) into parallel_write from tenk1;
+select count(*) from parallel_write;
+drop table parallel_write;
+
+-- parallel inserts must not occur as the table is temporary
+explain (costs off, analyze on, timing off, summary off)
+select length(stringu1) into temporary parallel_write from tenk1;
+select count(*) from parallel_write;
+drop table parallel_write;
+
+-- parallel inserts must occur
+explain (costs off, analyze on, timing off, summary off)
+select length(stringu1) into unlogged parallel_write from tenk1;
+select count(*) from parallel_write;
+drop table parallel_write;
+
+-- parallel inserts must not occur as the parallelism will not be picked
+-- for select part because of for update clause
+explain (costs off, analyze on, timing off, summary off)
+create table parallel_write as select length(stringu1) from tenk1 for update;
+select count(*) from parallel_write;
+drop table parallel_write;
+
+-- parallel inserts must occur
+explain (costs off, analyze on, timing off, summary off)
+create materialized view parallel_mat_view as
+    select length(stringu1) from tenk1;
+select count(*) from parallel_mat_view;
+drop materialized view parallel_mat_view;
+
+-- parallel inserts must occur
+prepare parallel_write_prep as select length(stringu1) from tenk1;
+explain (costs off, analyze on, timing off, summary off)
+create table parallel_write as execute parallel_write_prep;
+select count(*) from parallel_write;
+deallocate parallel_write_prep;
+drop table parallel_write;
+
+-- parallel inserts must occur
+explain (costs off, analyze on, timing off, summary off)
+create table parallel_write as select now(), four from tenk1;
+select count(*) from parallel_write;
+drop table parallel_write;
+
+-- parallel inserts must not occur as the parallelism will not be picked
+-- for select part because of the parallel unsafe function
+create sequence parallel_write_sequence;
+explain (costs off, analyze on, timing off, summary off)
+create table parallel_write as
+    select nextval('parallel_write_sequence'), four from tenk1;
+select count(*) from parallel_write;
+drop table parallel_write;
+drop sequence parallel_write_sequence;
+
+-- parallel inserts must occur, as there is init plan that gets executed by
+-- each parallel worker
+explain (costs off, analyze on, timing off, summary off)
+create table parallel_write as select two col1,
+    (select two from (select * from tenk2) as tt limit 1) col2
+    from tenk1  where tenk1.four = 3;
+select count(*) from parallel_write;
+drop table parallel_write;
+
+-- parallel inserts must not occur, as there is sub plan that gets executed by
+-- the Gather node in leader
+explain (costs off, analyze on, timing off, summary off)
+create table parallel_write as select two col1,
+    (select tenk1.two from generate_series(1,1)) col2
+    from tenk1  where tenk1.four = 3;
+select count(*) from parallel_write;
+drop table parallel_write;
+
+create table temp1(col1) as select * from generate_series(1,5);
+create table temp2(col2) as select * from temp1;
+create table temp3(col3) as select * from temp1;
+
+-- parallel inserts must not occur, as there is a limit clause
+explain (costs off, analyze on, timing off, summary off)
+create table parallel_write as select * from temp1 limit 4;
+drop table parallel_write;
+
+-- parallel inserts must not occur, as there is an order by clause
+explain (costs off, analyze on, timing off, summary off)
+create table parallel_write as select * from temp1 order by 1;
+drop table parallel_write;
+
+-- parallel inserts must not occur, as there is an order by clause
+explain (costs off, analyze on, timing off, summary off)
+create table parallel_write as select * from temp1 order by 1;
+drop table parallel_write;
+
+-- parallel inserts must not occur, as there is a distinct clause
+explain (costs off, analyze on, timing off, summary off)
+create table parallel_write as select distinct * from temp1;
+drop table parallel_write;
+
+-- parallel inserts must not occur, as there is an aggregate and group clause
+explain (costs off, analyze on, timing off, summary off)
+create table parallel_write as select count(*) from temp1 group by col1;
+drop table parallel_write;
+
+-- parallel inserts must not occur, as there is an aggregate, group and having
+-- clauses
+explain (costs off, analyze on, timing off, summary off)
+create table parallel_write as
+    select count(col1), (select col3 from
+        (select * from temp3) as tt limit 1) col4 from temp1, temp2
+    where temp1.col1 = temp2.col2 group by col4 having count(col1) > 0;
+drop table parallel_write;
+
+-- parallel inserts must not occur, as there is a window function
+explain (costs off, analyze on, timing off, summary off)
+create table parallel_write as
+    select avg(col1) OVER (PARTITION BY col1) from temp1;
+drop table parallel_write;
+
+-- nested loop join is the top node under which Gather node exists, so parallel
+-- inserts must not occur
+set enable_nestloop to on;
+set enable_mergejoin to off;
+set enable_hashjoin to off;
+
+explain (costs off, analyze on, timing off, summary off)
+create table parallel_write as
+    select * from temp1, temp2  where temp1.col1 = temp2.col2;
+select count(*) from parallel_write;
+drop table parallel_write;
+
+-- though the top node is Gather under which there exists parallel unsafe merge
+-- join node, so parallel inserts must not occur
+set enable_nestloop to off;
+set enable_mergejoin to on;
+
+explain (costs off, analyze on, timing off, summary off)
+create table parallel_write as
+    select * from temp1, temp2  where temp1.col1 = temp2.col2;
+select count(*) from parallel_write;
+drop table parallel_write;
+
+-- parallel hash join happens under Gather node, so parallel inserts must occur
+set enable_mergejoin to off;
+set enable_hashjoin to on;
+
+explain (costs off, analyze on, timing off, summary off)
+create table parallel_write as
+    select * from temp1, temp2  where temp1.col1 = temp2.col2;
+select count(*) from parallel_write;
+drop table parallel_write;
+
+reset enable_nestloop;
+reset enable_mergejoin;
+reset enable_hashjoin;
+drop table temp1;
+drop table temp2;
+drop table temp3;
 rollback;
-- 
2.25.1

From 3bf4f3aee0f6fd9d0f7b29602113e97f35ef6a6f Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddy@enterprisedb.com>
Date: Wed, 9 Dec 2020 18:31:55 +0530
Subject: [PATCH v10] Tuple Cost Adjustment for Parallel Inserts in CTAS

---
 src/backend/commands/createas.c       |  9 ++++
 src/backend/commands/explain.c        | 10 +++++
 src/backend/optimizer/path/costsize.c | 19 ++++++++-
 src/backend/optimizer/plan/planner.c  | 61 +++++++++++++++++++++++++++
 src/include/commands/createas.h       | 16 +++++++
 src/include/nodes/parsenodes.h        |  1 +
 6 files changed, 115 insertions(+), 1 deletion(-)

diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c
index 20f59cc2b8..eee8d19259 100644
--- a/src/backend/commands/createas.c
+++ b/src/backend/commands/createas.c
@@ -316,10 +316,19 @@ ExecCreateTableAs(ParseState *pstate, CreateTableAsStmt *stmt,
 		query = linitial_node(Query, rewritten);
 		Assert(query->commandType == CMD_SELECT);
 
+		/*
+		 * Indication to the planner that the SELECT is from CTAS so that it
+		 * can adjust the parallel tuple cost if possible.
+		 */
+		if (IsParallelInsertInCTASAllowed(into, NULL))
+			query->CTASParallelInsInfo |= CTAS_PARALLEL_INS_SELECT;
+
 		/* plan the query */
 		plan = pg_plan_query(query, pstate->p_sourcetext,
 							 CURSOR_OPT_PARALLEL_OK, params);
 
+		query->CTASParallelInsInfo &= CTAS_PARALLEL_INS_UNDEF;
+
 		/*
 		 * Use a snapshot with an updated command ID to ensure this query sees
 		 * results of any previously executed queries.  (This could only
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index 03ac29cd64..8bd231a0c3 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -387,9 +387,19 @@ ExplainOneQuery(Query *query, int cursorOptions,
 			bufusage_start = pgBufferUsage;
 		INSTR_TIME_SET_CURRENT(planstart);
 
+		/*
+		 * Indication to the planner that the SELECT is from CTAS so that it
+		 * can adjust the parallel tuple cost if possible.
+		 */
+		if (IsParallelInsertInCTASAllowed(into, NULL))
+			query->CTASParallelInsInfo |= CTAS_PARALLEL_INS_SELECT;
+
 		/* plan the query */
 		plan = pg_plan_query(query, queryString, cursorOptions, params);
 
+		if (into)
+			query->CTASParallelInsInfo &= CTAS_PARALLEL_INS_UNDEF;
+
 		INSTR_TIME_SET_CURRENT(planduration);
 		INSTR_TIME_SUBTRACT(planduration, planstart);
 
diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c
index 22d6935824..3a316f25f1 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -76,6 +76,7 @@
 #include "access/amapi.h"
 #include "access/htup_details.h"
 #include "access/tsmapi.h"
+#include "commands/createas.h"
 #include "executor/executor.h"
 #include "executor/nodeAgg.h"
 #include "executor/nodeHash.h"
@@ -378,6 +379,7 @@ cost_gather(GatherPath *path, PlannerInfo *root,
 {
 	Cost		startup_cost = 0;
 	Cost		run_cost = 0;
+	bool		ignore_tuple_cost = false;
 
 	/* Mark the path with the correct row estimate */
 	if (rows)
@@ -393,7 +395,22 @@ cost_gather(GatherPath *path, PlannerInfo *root,
 
 	/* Parallel setup and communication cost. */
 	startup_cost += parallel_setup_cost;
-	run_cost += parallel_tuple_cost * path->path.rows;
+
+	/*
+	 * Do not consider tuple cost in case of parallel inserts by workers. We
+	 * would have set ignore flag in apply_scanjoin_target_to_paths before
+	 * generating Gather path for the upper level SELECT part of the CTAS.
+	 */
+	if ((root->parse->CTASParallelInsInfo & CTAS_PARALLEL_INS_SELECT) &&
+		(root->parse->CTASParallelInsInfo & CTAS_PARALLEL_INS_IGN_TUP_COST))
+	{
+		ignore_tuple_cost = true;
+		/* Reset the ignore flag. */
+		root->parse->CTASParallelInsInfo &= ~CTAS_PARALLEL_INS_IGN_TUP_COST;
+	}
+
+	if (!ignore_tuple_cost)
+		run_cost += parallel_tuple_cost * path->path.rows;
 
 	path->path.startup_cost = startup_cost;
 	path->path.total_cost = (startup_cost + run_cost);
diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c
index 1a94b58f8b..1041593237 100644
--- a/src/backend/optimizer/plan/planner.c
+++ b/src/backend/optimizer/plan/planner.c
@@ -28,6 +28,7 @@
 #include "catalog/pg_inherits.h"
 #include "catalog/pg_proc.h"
 #include "catalog/pg_type.h"
+#include "commands/createas.h"
 #include "executor/executor.h"
 #include "executor/nodeAgg.h"
 #include "foreign/fdwapi.h"
@@ -7338,6 +7339,45 @@ can_partial_agg(PlannerInfo *root)
 	return true;
 }
 
+/*
+ * ignore_parallel_tuple_cost
+ *
+ * Gather node will not receive any tuples from the workers in case each worker
+ * inserts them in parallel. So, we set a flag to ignore parallel tuple cost by
+ * the Gather path in cost_gather if the SELECT is for CTAS and we are
+ * generating an upper level Gather path.
+*/
+static bool
+ignore_parallel_tuple_cost(PlannerInfo *root)
+{
+	if (root->query_level == 1 &&
+		(root->parse->CTASParallelInsInfo & CTAS_PARALLEL_INS_SELECT))
+	{
+		/*
+		 * In each of following cases, a parent path will be generated for the
+		 * upper Gather path(in grouping_planner), in which case we can not
+		 * let parallel inserts happen. So we do not set ignore tuple cost
+		 * flag.
+		 */
+		if (root->parse->rowMarks ||
+			limit_needed(root->parse) ||
+			root->parse->sortClause ||
+			root->parse->distinctClause ||
+			root->parse->hasWindowFuncs ||
+			root->parse->groupClause ||
+			root->parse->groupingSets ||
+			root->parse->hasAggs ||
+			root->hasHavingQual)
+			return false;
+
+		root->parse->CTASParallelInsInfo |= CTAS_PARALLEL_INS_IGN_TUP_COST;
+
+		return true;
+	}
+
+	return false;
+}
+
 /*
  * apply_scanjoin_target_to_paths
  *
@@ -7557,8 +7597,29 @@ apply_scanjoin_target_to_paths(PlannerInfo *root,
 	 * one of the generated paths may turn out to be the cheapest one.
 	 */
 	if (rel->consider_parallel && !IS_OTHER_REL(rel))
+	{
+		/*
+		 * Set a flag to ignore parallel tuple cost by the Gather path in
+		 * cost_gather if the SELECT is for CTAS and we are generating an upper
+		 * level Gather path.
+		 */
+		bool	ignore = ignore_parallel_tuple_cost(root);
+
 		generate_useful_gather_paths(root, rel, false);
 
+		/*
+		 * Reset the ignore flag, in case we set it but
+		 * generate_useful_gather_paths returned without reaching cost_gather.
+		 */
+		if (ignore &&
+			(root->parse->CTASParallelInsInfo &
+			 CTAS_PARALLEL_INS_IGN_TUP_COST))
+		{
+			root->parse->CTASParallelInsInfo &=
+											~CTAS_PARALLEL_INS_IGN_TUP_COST;
+		}
+	}
+
 	/*
 	 * Reassess which paths are the cheapest, now that we've potentially added
 	 * new Gather (or Gather Merge) and/or Append (or MergeAppend) paths to
diff --git a/src/include/commands/createas.h b/src/include/commands/createas.h
index ab3aab58c5..6e722f0ac0 100644
--- a/src/include/commands/createas.h
+++ b/src/include/commands/createas.h
@@ -39,6 +39,22 @@ typedef struct
 	Oid			object_id;
 } DR_intorel;
 
+/*
+ * Information sent to the planner from CTAS to account for the cost
+ * calculations in cost_gather. We need to do this because, no tuples will be
+ * received by the Gather node if the workers insert the tuples in parallel.
+ */
+typedef enum CTASParallelInsertOpt
+{
+	CTAS_PARALLEL_INS_UNDEF = 0, /* undefined */
+	CTAS_PARALLEL_INS_SELECT = 1 << 0, /* set to this before planning */
+	/*
+	 * Set to this while planning for upper Gather path to ignore parallel
+	 * tuple cost in cost_gather.
+	 */
+	CTAS_PARALLEL_INS_IGN_TUP_COST = 1 << 1
+} CTASParallelInsertOpt;
+
 #define IS_CTAS(intoclause) (intoclause && IsA(intoclause, IntoClause))
 #define IS_PARALLEL_CTAS_DEST(dest) (dest && dest->mydest == DestIntoRel && \
 									 IS_CTAS(((DR_intorel *) dest)->into) && \
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index ec14fc2036..b140a42551 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -180,6 +180,7 @@ typedef struct Query
 	 */
 	int			stmt_location;	/* start location, or -1 if unknown */
 	int			stmt_len;		/* length in bytes; 0 means "rest of string" */
+	uint8 		CTASParallelInsInfo; /* parallel insert in CTAS info */
 } Query;
 
 
-- 
2.25.1

Reply via email to