From 096df7a758b0b7cf00b99be9e4ffadf15ceef535 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <rupiredd@amazon.com>
Date: Sat, 26 Oct 2024 12:35:51 +0000
Subject: [PATCH v24 2/4] Optimize CTAS/CMV/RMV with new multi-inserts table AM

This commit optimizes the following commands for heap AM using new
multi-inserts table AM added by commit <<CHANGE_ME>>:
- CREATE TABLE AS
- CREATE MATERIALIZED VIEW
- REFRESH MATERIALIZED VIEW

Testing shows that performance of CTAS, CMV, RMV is improved by
<<TO_FILL>> respectively on <<TO_FILL>> system.
f
Author: Bharath Rupireddy
Reviewed-by: Jeff Davis
Discussion: https://www.postgresql.org/message-id/20200924024128.kyk3r5g7dnu3fxxx@alap3.anarazel.de
Discussion: https://www.postgresql.org/message-id/CALj2ACVi9eTRYR%3Dgdca5wxtj3Kk_9q9qVccxsS1hngTGOCjPwQ%40mail.gmail.com
Discussion: https://www.postgresql.org/message-id/8633171cb034aafc260fdf37df04b6c779aa1e2f.camel%40j-davis.com
---
 src/backend/commands/createas.c |  60 ++++++++++++++----
 src/backend/commands/matview.c  | 106 +++++++++++++++++++++++++++++---
 src/include/commands/matview.h  |   3 +
 3 files changed, 147 insertions(+), 22 deletions(-)

diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c
index 68ec122dbf..0affadf404 100644
--- a/src/backend/commands/createas.c
+++ b/src/backend/commands/createas.c
@@ -38,6 +38,7 @@
 #include "miscadmin.h"
 #include "nodes/makefuncs.h"
 #include "nodes/nodeFuncs.h"
+#include "optimizer/optimizer.h"
 #include "rewrite/rewriteHandler.h"
 #include "tcop/tcopprot.h"
 #include "utils/builtins.h"
@@ -56,6 +57,12 @@ typedef struct
 	CommandId	output_cid;		/* cmin to insert in output tuples */
 	int			ti_options;		/* table_tuple_insert performance options */
 	BulkInsertState bistate;	/* bulk insert state */
+
+	/* Table modify state. NULL if multi-inserts isn't supported. */
+	TableModifyState *mstate;
+
+	/* True if SELECT query contains volatile functions */
+	bool		volatile_funcs;
 } DR_intorel;
 
 /* utility functions for CTAS definition creation */
@@ -313,6 +320,10 @@ ExecCreateTableAs(ParseState *pstate, CreateTableAsStmt *stmt,
 		plan = pg_plan_query(query, pstate->p_sourcetext,
 							 CURSOR_OPT_PARALLEL_OK, params);
 
+		/* Check if the SELECT query has any volatile functions */
+		((DR_intorel *) dest)->volatile_funcs =
+			contain_volatile_functions_after_planning((Expr *) query);
+
 		/*
 		 * Use a snapshot with an updated command ID to ensure this query sees
 		 * results of any previously executed queries.  (This could only
@@ -548,16 +559,32 @@ intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
 	myState->rel = intoRelationDesc;
 	myState->reladdr = intoRelationAddr;
 	myState->output_cid = GetCurrentCommandId(true);
-	myState->ti_options = TABLE_INSERT_SKIP_FSM;
+	myState->ti_options = TABLE_INSERT_SKIP_FSM |
+		TABLE_INSERT_BAS_BULKWRITE;
+	myState->mstate = NULL;
+	myState->bistate = NULL;
 
 	/*
 	 * If WITH NO DATA is specified, there is no need to set up the state for
-	 * bulk inserts as there are no tuples to insert.
+	 * multi or bulk inserts as there are no tuples to insert.
 	 */
 	if (!into->skipData)
-		myState->bistate = GetBulkInsertState();
-	else
-		myState->bistate = NULL;
+	{
+		if (TableModifyIsMultiInsertsSupported(myState->rel,
+											   myState->volatile_funcs))
+		{
+			myState->mstate = table_modify_begin(myState->rel,
+												 myState->output_cid,
+												 myState->ti_options,
+												 NULL,	/* Multi-insert buffer
+														 * flush callback */
+												 NULL); /* Multi-insert buffer
+														 * flush callback
+														 * context */
+		}
+		else
+			myState->bistate = GetBulkInsertState();
+	}
 
 	/*
 	 * Valid smgr_targblock implies something already wrote to the relation.
@@ -585,11 +612,15 @@ intorel_receive(TupleTableSlot *slot, DestReceiver *self)
 		 * would not be cheap either. This also doesn't allow accessing per-AM
 		 * data (say a tuple's xmin), but since we don't do that here...
 		 */
-		table_tuple_insert(myState->rel,
-						   slot,
-						   myState->output_cid,
-						   myState->ti_options,
-						   myState->bistate);
+
+		if (myState->mstate != NULL)
+			table_modify_buffer_insert(myState->mstate, slot);
+		else
+			table_tuple_insert(myState->rel,
+							   slot,
+							   myState->output_cid,
+							   myState->ti_options,
+							   myState->bistate);
 	}
 
 	/* We know this is a newly created relation, so there are no indexes */
@@ -608,8 +639,13 @@ intorel_shutdown(DestReceiver *self)
 
 	if (!into->skipData)
 	{
-		FreeBulkInsertState(myState->bistate);
-		table_finish_bulk_insert(myState->rel, myState->ti_options);
+		if (myState->mstate != NULL)
+			table_modify_end(myState->mstate);
+		else
+		{
+			FreeBulkInsertState(myState->bistate);
+			table_finish_bulk_insert(myState->rel, myState->ti_options);
+		}
 	}
 
 	/* close rel, but keep lock until commit */
diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c
index 010097873d..fa495ec533 100644
--- a/src/backend/commands/matview.c
+++ b/src/backend/commands/matview.c
@@ -30,7 +30,9 @@
 #include "commands/tablespace.h"
 #include "executor/executor.h"
 #include "executor/spi.h"
+#include "foreign/fdwapi.h"
 #include "miscadmin.h"
+#include "optimizer/optimizer.h"
 #include "pgstat.h"
 #include "rewrite/rewriteHandler.h"
 #include "storage/lmgr.h"
@@ -51,6 +53,12 @@ typedef struct
 	CommandId	output_cid;		/* cmin to insert in output tuples */
 	int			ti_options;		/* table_tuple_insert performance options */
 	BulkInsertState bistate;	/* bulk insert state */
+
+	/* Table modify state. NULL if multi-inserts isn't supported. */
+	TableModifyState *mstate;
+
+	/* True if SELECT query contains volatile functions */
+	bool		volatile_funcs;
 } DR_transientrel;
 
 static int	matview_maintenance_depth = 0;
@@ -428,6 +436,12 @@ refresh_matview_datafill(DestReceiver *dest, Query *query,
 	/* Plan the query which will generate data for the refresh. */
 	plan = pg_plan_query(query, queryString, CURSOR_OPT_PARALLEL_OK, NULL);
 
+	/*
+	 * Check if the stored MATERIALIZED VIEW query has any volatile functions.
+	 */
+	((DR_transientrel *) dest)->volatile_funcs =
+		contain_volatile_functions_after_planning((Expr *) query);
+
 	/*
 	 * Use a snapshot with an updated command ID to ensure this query sees
 	 * results of any previously executed queries.  (This could only matter if
@@ -492,8 +506,26 @@ transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
 	 */
 	myState->transientrel = transientrel;
 	myState->output_cid = GetCurrentCommandId(true);
-	myState->ti_options = TABLE_INSERT_SKIP_FSM | TABLE_INSERT_FROZEN;
-	myState->bistate = GetBulkInsertState();
+	myState->ti_options = TABLE_INSERT_SKIP_FSM |
+		TABLE_INSERT_FROZEN |
+		TABLE_INSERT_BAS_BULKWRITE;
+	myState->bistate = NULL;
+	myState->mstate = NULL;
+
+	/* Set up the state for multi or bulk inserts */
+	if (TableModifyIsMultiInsertsSupported(myState->transientrel,
+										   myState->volatile_funcs))
+	{
+		myState->mstate = table_modify_begin(myState->transientrel,
+											 myState->output_cid,
+											 myState->ti_options,
+											 NULL,	/* Multi-insert buffer
+													 * flush callback */
+											 NULL); /* Multi-insert buffer
+													 * flush callback context */
+	}
+	else
+		myState->bistate = GetBulkInsertState();
 
 	/*
 	 * Valid smgr_targblock implies something already wrote to the relation.
@@ -519,11 +551,14 @@ transientrel_receive(TupleTableSlot *slot, DestReceiver *self)
 	 * tuple's xmin), but since we don't do that here...
 	 */
 
-	table_tuple_insert(myState->transientrel,
-					   slot,
-					   myState->output_cid,
-					   myState->ti_options,
-					   myState->bistate);
+	if (myState->mstate != NULL)
+		table_modify_buffer_insert(myState->mstate, slot);
+	else
+		table_tuple_insert(myState->transientrel,
+						   slot,
+						   myState->output_cid,
+						   myState->ti_options,
+						   myState->bistate);
 
 	/* We know this is a newly created relation, so there are no indexes */
 
@@ -538,9 +573,13 @@ transientrel_shutdown(DestReceiver *self)
 {
 	DR_transientrel *myState = (DR_transientrel *) self;
 
-	FreeBulkInsertState(myState->bistate);
-
-	table_finish_bulk_insert(myState->transientrel, myState->ti_options);
+	if (myState->mstate != NULL)
+		table_modify_end(myState->mstate);
+	else
+	{
+		FreeBulkInsertState(myState->bistate);
+		table_finish_bulk_insert(myState->transientrel, myState->ti_options);
+	}
 
 	/* close transientrel, but keep lock until commit */
 	table_close(myState->transientrel, NoLock);
@@ -984,3 +1023,50 @@ CloseMatViewIncrementalMaintenance(void)
 	matview_maintenance_depth--;
 	Assert(matview_maintenance_depth >= 0);
 }
+
+/*
+ * Check if multi-inserts is supported.
+ *
+ * It's generally more efficient to prepare a bunch of tuples for insertion,
+ * and insert them in one multi-inserts call, than call
+ * table_tuple_insert() separately for every tuple. However, there are a
+ * number of reasons why we might not be able to do this. In general, can't
+ * support multi-inserts in the following cases:
+ *
+ * When there are any BEFORE/INSTEAD OF triggers on the table or any volatile
+ * functions/expressions in the SELECT query. Such triggers or volatile
+ * expressions might query the table we're inserting into and act differently
+ * if the tuples that have already been processed and prepared for insertion
+ * are not there.
+ *
+ * When inserting into partitioned table. For partitioned tables, we may still
+ * be able to perform multi-inserts. However, the possibility of this depends
+ * on which types of triggers exist on the partition. We must disable
+ * multi-inserts if the partition is a foreign table that can't use batching or
+ * it has any before row insert or insert instead triggers (same as we checked
+ * above for the parent table). We really can't know all these unless we start
+ * inserting tuples into the respective partitions. We can have an intermediate
+ * insert state to show the intent to do multi-inserts and later determine if
+ * we can use multi-inserts for the partition being inserted into.
+ *
+ * When inserting into foreign table. For foreign tables, we may still be able
+ * to do multi-inserts if the FDW supports batching.
+ */
+bool
+TableModifyIsMultiInsertsSupported(Relation rel, bool volatile_funcs)
+{
+	if (volatile_funcs)
+		return false;
+
+	/*
+	 * For CREATE TABLE AS, CREATE MATERIALIZED VIEW, REFRESH MATERIALIZED
+	 * VIEW, we really can't have triggers or can't create table as
+	 * partitioned or foreign. So, we will assert.
+	 */
+	Assert(rel->trigdesc == NULL);
+	Assert(rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE);
+	Assert(rel->rd_rel->relkind != RELKIND_FOREIGN_TABLE);
+
+	/* Can support multi-inserts */
+	return true;
+}
diff --git a/src/include/commands/matview.h b/src/include/commands/matview.h
index c8811e8fc7..28abd7b89b 100644
--- a/src/include/commands/matview.h
+++ b/src/include/commands/matview.h
@@ -33,4 +33,7 @@ extern DestReceiver *CreateTransientRelDestReceiver(Oid transientoid);
 
 extern bool MatViewIncrementalMaintenanceIsEnabled(void);
 
+extern bool TableModifyIsMultiInsertsSupported(Relation rel,
+											   bool volatile_funcs);
+
 #endif							/* MATVIEW_H */
-- 
2.40.1

