From 90ee038cec103a85307711b861b431317f1cd5bf Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddy@enterprisedb.com>
Date: Mon, 14 Dec 2020 15:16:49 +0530
Subject: [PATCH v11] Tuple Cost Adjustment for Parallel Inserts in CTAS

---
 src/backend/commands/createas.c       | 42 +++++++++++++++++-
 src/backend/commands/explain.c        | 14 ++++--
 src/backend/commands/prepare.c        |  3 +-
 src/backend/optimizer/path/costsize.c | 22 +++++++++-
 src/backend/optimizer/plan/planner.c  | 61 +++++++++++++++++++++++++++
 src/include/commands/createas.h       | 21 ++++++++-
 src/include/commands/explain.h        |  3 +-
 src/include/nodes/parsenodes.h        |  1 +
 8 files changed, 158 insertions(+), 9 deletions(-)

diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c
index 9e6c8fb2ba..3ffea41ea6 100644
--- a/src/backend/commands/createas.c
+++ b/src/backend/commands/createas.c
@@ -316,6 +316,13 @@ ExecCreateTableAs(ParseState *pstate, CreateTableAsStmt *stmt,
 		query = linitial_node(Query, rewritten);
 		Assert(query->commandType == CMD_SELECT);
 
+		/*
+		 * Indication to the planner that the SELECT is from CTAS so that it
+		 * can adjust the parallel tuple cost if possible.
+		 */
+		if (IsParallelInsertInCTASAllowed(into, NULL, NULL))
+			query->CTASParallelInsInfo |= CTAS_PARALLEL_INS_SELECT;
+
 		/* plan the query */
 		plan = pg_plan_query(query, pstate->p_sourcetext,
 							 CURSOR_OPT_PARALLEL_OK, params);
@@ -344,7 +351,8 @@ ExecCreateTableAs(ParseState *pstate, CreateTableAsStmt *stmt,
 		 * into the target table. We need plan state to be initialized by the
 		 * executor to decide whether to allow parallel inserts or not.
 		 */
-		if (IsParallelInsertInCTASAllowed(into, queryDesc))
+		if (IsParallelInsertInCTASAllowed(into, queryDesc,
+										  &query->CTASParallelInsInfo))
 			SetCTASParallelInsertState(queryDesc);
 
 		/* run the plan to completion */
@@ -659,7 +667,8 @@ intorel_destroy(DestReceiver *self)
  * IsParallelInsertInCTASAllowed --- determine whether or not parallel
  * insertion is possible.
  */
-bool IsParallelInsertInCTASAllowed(IntoClause *into, QueryDesc *queryDesc)
+bool IsParallelInsertInCTASAllowed(IntoClause *into, QueryDesc *queryDesc,
+								   uint8 *tuple_cost_flags)
 {
 	if (!IS_CTAS(into))
 		return false;
@@ -678,6 +687,7 @@ bool IsParallelInsertInCTASAllowed(IntoClause *into, QueryDesc *queryDesc)
 	{
 		PlanState  *ps = queryDesc->planstate;
 		bool  		allow;
+		bool		need_to_assert = false;
 
 		/*
 		 * We allow parallel inserts by the workers only if the Gather node has
@@ -690,6 +700,34 @@ bool IsParallelInsertInCTASAllowed(IntoClause *into, QueryDesc *queryDesc)
 		 */
 		allow = ps && IsA(ps, GatherState) && !ps->ps_ProjInfo;
 
+		/*
+		 * It should not happen that in cost_gather we have ignored the
+		 * parallel tuple cost and now we are not allowing the parallel
+		 * inserts. And also we might need assertion only if the top node is
+		 * GatherState. Because the main intention of assertion is to check if
+		 * we enforced planner to ignore the parallel tuple cost (with the
+		 * intention of choosing parallel inserts) due to which
+		 * the parallel plan was chosen, but we do not allow the parallel
+		 * inserts now.
+		 */
+		if (!allow && tuple_cost_flags && ps && IsA(ps, GatherState))
+			need_to_assert = true;
+
+		if (need_to_assert)
+		{
+			/*
+			 * If we have correctly ignored parallel tuple cost in planner
+			 * while creating Gather path, then this assertion failure should
+			 * not occur. If it occurs, that means the planner may have chosen
+			 * this parallel plan because of our enforcement to ignore the
+			 * parallel tuple cost.
+			 */
+			Assert(!(*tuple_cost_flags & CTAS_PARALLEL_INS_TUP_COST_IGNORED));
+		}
+
+		if (tuple_cost_flags)
+			*tuple_cost_flags = CTAS_PARALLEL_INS_UNDEF;
+
 		return allow;
 	}
 
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index 03ac29cd64..d0152deba7 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -387,6 +387,13 @@ ExplainOneQuery(Query *query, int cursorOptions,
 			bufusage_start = pgBufferUsage;
 		INSTR_TIME_SET_CURRENT(planstart);
 
+		/*
+		 * Indication to the planner that the SELECT is from CTAS so that it
+		 * can adjust the parallel tuple cost if possible.
+		 */
+		if (IsParallelInsertInCTASAllowed(into, NULL, NULL))
+			query->CTASParallelInsInfo |= CTAS_PARALLEL_INS_SELECT;
+
 		/* plan the query */
 		plan = pg_plan_query(query, queryString, cursorOptions, params);
 
@@ -402,7 +409,8 @@ ExplainOneQuery(Query *query, int cursorOptions,
 
 		/* run it (if needed) and produce output */
 		ExplainOnePlan(plan, into, es, queryString, params, queryEnv,
-					   &planduration, (es->buffers ? &bufusage : NULL));
+					   &planduration, (es->buffers ? &bufusage : NULL),
+					   &query->CTASParallelInsInfo);
 	}
 }
 
@@ -496,7 +504,7 @@ void
 ExplainOnePlan(PlannedStmt *plannedstmt, IntoClause *into, ExplainState *es,
 			   const char *queryString, ParamListInfo params,
 			   QueryEnvironment *queryEnv, const instr_time *planduration,
-			   const BufferUsage *bufusage)
+			   const BufferUsage *bufusage, uint8 *ctas_tuple_cost_flags)
 {
 	DestReceiver *dest;
 	QueryDesc  *queryDesc;
@@ -562,7 +570,7 @@ ExplainOnePlan(PlannedStmt *plannedstmt, IntoClause *into, ExplainState *es,
 	 * target table. We need plan state to be initialized by the executor to
 	 * decide whether to allow parallel inserts or not.
 	 */
-	if (IsParallelInsertInCTASAllowed(into, queryDesc))
+	if (IsParallelInsertInCTASAllowed(into, queryDesc, ctas_tuple_cost_flags))
 		SetCTASParallelInsertState(queryDesc);
 
 	/* Execute the plan for statistics if asked for */
diff --git a/src/backend/commands/prepare.c b/src/backend/commands/prepare.c
index 4b18be5b27..12227b6e79 100644
--- a/src/backend/commands/prepare.c
+++ b/src/backend/commands/prepare.c
@@ -674,7 +674,8 @@ ExplainExecuteQuery(ExecuteStmt *execstmt, IntoClause *into, ExplainState *es,
 
 		if (pstmt->commandType != CMD_UTILITY)
 			ExplainOnePlan(pstmt, into, es, query_string, paramLI, queryEnv,
-						   &planduration, (es->buffers ? &bufusage : NULL));
+						   &planduration, (es->buffers ? &bufusage : NULL),
+						   NULL);
 		else
 			ExplainOneUtility(pstmt->utilityStmt, into, es, query_string,
 							  paramLI, queryEnv);
diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c
index 22d6935824..800f25903d 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -76,6 +76,7 @@
 #include "access/amapi.h"
 #include "access/htup_details.h"
 #include "access/tsmapi.h"
+#include "commands/createas.h"
 #include "executor/executor.h"
 #include "executor/nodeAgg.h"
 #include "executor/nodeHash.h"
@@ -378,6 +379,7 @@ cost_gather(GatherPath *path, PlannerInfo *root,
 {
 	Cost		startup_cost = 0;
 	Cost		run_cost = 0;
+	bool		ignore_tuple_cost = false;
 
 	/* Mark the path with the correct row estimate */
 	if (rows)
@@ -393,7 +395,25 @@ cost_gather(GatherPath *path, PlannerInfo *root,
 
 	/* Parallel setup and communication cost. */
 	startup_cost += parallel_setup_cost;
-	run_cost += parallel_tuple_cost * path->path.rows;
+
+	/*
+	 * Do not consider tuple cost in case of we intend to perform parallel
+	 * inserts by workers. We would have set ignore flag in
+	 * apply_scanjoin_target_to_paths before generating Gather path for the
+	 * upper level SELECT part of the CTAS.
+	 */
+	if ((root->parse->CTASParallelInsInfo & CTAS_PARALLEL_INS_SELECT) &&
+		(root->parse->CTASParallelInsInfo &
+		 CTAS_PARALLEL_INS_TUP_COST_CAN_IGN))
+	{
+		ignore_tuple_cost = true;
+		root->parse->CTASParallelInsInfo &=
+									~CTAS_PARALLEL_INS_TUP_COST_CAN_IGN;
+		root->parse->CTASParallelInsInfo |= CTAS_PARALLEL_INS_TUP_COST_IGNORED;
+	}
+
+	if (!ignore_tuple_cost)
+		run_cost += parallel_tuple_cost * path->path.rows;
 
 	path->path.startup_cost = startup_cost;
 	path->path.total_cost = (startup_cost + run_cost);
diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c
index 1a94b58f8b..d287b6bfbb 100644
--- a/src/backend/optimizer/plan/planner.c
+++ b/src/backend/optimizer/plan/planner.c
@@ -28,6 +28,7 @@
 #include "catalog/pg_inherits.h"
 #include "catalog/pg_proc.h"
 #include "catalog/pg_type.h"
+#include "commands/createas.h"
 #include "executor/executor.h"
 #include "executor/nodeAgg.h"
 #include "foreign/fdwapi.h"
@@ -7338,6 +7339,45 @@ can_partial_agg(PlannerInfo *root)
 	return true;
 }
 
+/*
+ * ignore_parallel_tuple_cost
+ *
+ * Gather node will not receive any tuples from the workers in case each worker
+ * inserts them in parallel. So, we set a flag to ignore parallel tuple cost by
+ * the Gather path in cost_gather if the SELECT is for CTAS and we are
+ * generating an upper level Gather path.
+*/
+static bool
+ignore_parallel_tuple_cost(PlannerInfo *root)
+{
+	if (root->query_level == 1 &&
+		(root->parse->CTASParallelInsInfo & CTAS_PARALLEL_INS_SELECT))
+	{
+		/*
+		 * In each of following cases, a parent path will be generated for the
+		 * upper Gather path(in grouping_planner), in which case we can not
+		 * let parallel inserts happen. So we do not set ignore tuple cost
+		 * flag.
+		 */
+		if (root->parse->rowMarks ||
+			limit_needed(root->parse) ||
+			root->parse->sortClause ||
+			root->parse->distinctClause ||
+			root->parse->hasWindowFuncs ||
+			root->parse->groupClause ||
+			root->parse->groupingSets ||
+			root->parse->hasAggs ||
+			root->hasHavingQual)
+			return false;
+
+		root->parse->CTASParallelInsInfo |= CTAS_PARALLEL_INS_TUP_COST_CAN_IGN;
+
+		return true;
+	}
+
+	return false;
+}
+
 /*
  * apply_scanjoin_target_to_paths
  *
@@ -7557,8 +7597,29 @@ apply_scanjoin_target_to_paths(PlannerInfo *root,
 	 * one of the generated paths may turn out to be the cheapest one.
 	 */
 	if (rel->consider_parallel && !IS_OTHER_REL(rel))
+	{
+		/*
+		 * Set a flag to ignore parallel tuple cost by the Gather path in
+		 * cost_gather if the SELECT is for CTAS and we are generating an upper
+		 * level Gather path.
+		 */
+		bool	ignore = ignore_parallel_tuple_cost(root);
+
 		generate_useful_gather_paths(root, rel, false);
 
+		/*
+		 * Reset the ignore flag, in case we set it but
+		 * generate_useful_gather_paths returned without reaching cost_gather.
+		 */
+		if (ignore &&
+			(root->parse->CTASParallelInsInfo &
+			 CTAS_PARALLEL_INS_TUP_COST_CAN_IGN))
+		{
+			root->parse->CTASParallelInsInfo &=
+										~CTAS_PARALLEL_INS_TUP_COST_CAN_IGN;
+		}
+	}
+
 	/*
 	 * Reassess which paths are the cheapest, now that we've potentially added
 	 * new Gather (or Gather Merge) and/or Append (or MergeAppend) paths to
diff --git a/src/include/commands/createas.h b/src/include/commands/createas.h
index ab3aab58c5..e01a6152ce 100644
--- a/src/include/commands/createas.h
+++ b/src/include/commands/createas.h
@@ -39,6 +39,24 @@ typedef struct
 	Oid			object_id;
 } DR_intorel;
 
+/*
+ * Information sent to the planner from CTAS to account for the cost
+ * calculations in cost_gather. We need to do this because, no tuples will be
+ * received by the Gather node if the workers insert the tuples in parallel.
+ */
+typedef enum CTASParallelInsertOpt
+{
+	CTAS_PARALLEL_INS_UNDEF = 0, /* undefined */
+	CTAS_PARALLEL_INS_SELECT = 1 << 0, /* set to this before planning */
+	/*
+	 * Set to this while planning for upper Gather path to ignore parallel
+	 * tuple cost in cost_gather.
+	 */
+	CTAS_PARALLEL_INS_TUP_COST_CAN_IGN = 1 << 1,
+
+	CTAS_PARALLEL_INS_TUP_COST_IGNORED = 1 << 2
+} CTASParallelInsertOpt;
+
 #define IS_CTAS(intoclause) (intoclause && IsA(intoclause, IntoClause))
 #define IS_PARALLEL_CTAS_DEST(dest) (dest && dest->mydest == DestIntoRel && \
 									 IS_CTAS(((DR_intorel *) dest)->into) && \
@@ -53,7 +71,8 @@ extern int	GetIntoRelEFlags(IntoClause *intoClause);
 extern DestReceiver *CreateIntoRelDestReceiver(IntoClause *intoClause);
 
 extern bool IsParallelInsertInCTASAllowed(IntoClause *into,
-										  QueryDesc *queryDesc);
+										  QueryDesc *queryDesc,
+										  uint8 *tuple_cost_flags);
 
 extern void SetCTASParallelInsertState(QueryDesc *queryDesc);
 
diff --git a/src/include/commands/explain.h b/src/include/commands/explain.h
index ba661d32a6..1a1806dbf1 100644
--- a/src/include/commands/explain.h
+++ b/src/include/commands/explain.h
@@ -91,7 +91,8 @@ extern void ExplainOnePlan(PlannedStmt *plannedstmt, IntoClause *into,
 						   ExplainState *es, const char *queryString,
 						   ParamListInfo params, QueryEnvironment *queryEnv,
 						   const instr_time *planduration,
-						   const BufferUsage *bufusage);
+						   const BufferUsage *bufusage,
+						   uint8 *ctas_tuple_cost_flags);
 
 extern void ExplainPrintPlan(ExplainState *es, QueryDesc *queryDesc);
 extern void ExplainPrintTriggers(ExplainState *es, QueryDesc *queryDesc);
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 48a79a7657..81b148c383 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -180,6 +180,7 @@ typedef struct Query
 	 */
 	int			stmt_location;	/* start location, or -1 if unknown */
 	int			stmt_len;		/* length in bytes; 0 means "rest of string" */
+	uint8 		CTASParallelInsInfo; /* parallel insert in CTAS info */
 } Query;
 
 
-- 
2.25.1

