From d62023fd55756e4914fefd412d9380dadb7b3524 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddy@enterprisedb.com>
Date: Mon, 9 Nov 2020 15:35:01 +0530
Subject: [PATCH v2] Multi Inserts in CTAS & Refresh Materialized View.

This patch adds multi inserts to

1)Create Table As and Create Materialized View
2)Refresh Materialized View
---
 src/backend/commands/copy.c       |  26 ++-----
 src/backend/commands/createas.c   | 112 ++++++++++++++++++++++++++----
 src/backend/commands/matview.c    |  96 +++++++++++++++++++++++--
 src/backend/executor/execTuples.c |  66 ++++++++++++++++++
 src/include/access/tableam.h      |  15 ++++
 src/include/executor/tuptable.h   |   2 +-
 6 files changed, 278 insertions(+), 39 deletions(-)

diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 115860a9d4..29fd28051e 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -240,18 +240,6 @@ typedef struct
 	uint64		processed;		/* # of tuples processed */
 } DR_copy;
 
-
-/*
- * No more than this many tuples per CopyMultiInsertBuffer
- *
- * Caution: Don't make this too big, as we could end up with this many
- * CopyMultiInsertBuffer items stored in CopyMultiInsertInfo's
- * multiInsertBuffers list.  Increasing this can cause quadratic growth in
- * memory requirements during copies into partitioned tables with a large
- * number of partitions.
- */
-#define MAX_BUFFERED_TUPLES		1000
-
 /*
  * Flush buffers if there are >= this many bytes, as counted by the input
  * size, of tuples stored.
@@ -264,11 +252,11 @@ typedef struct
 /* Stores multi-insert data related to a single relation in CopyFrom. */
 typedef struct CopyMultiInsertBuffer
 {
-	TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */
+	TupleTableSlot *slots[MAX_MULTI_INSERT_TUPLES]; /* Array to store tuples */
 	ResultRelInfo *resultRelInfo;	/* ResultRelInfo for 'relid' */
 	BulkInsertState bistate;	/* BulkInsertState for this rel */
 	int			nused;			/* number of 'slots' containing tuples */
-	uint64		linenos[MAX_BUFFERED_TUPLES];	/* Line # of tuple in copy
+	uint64		linenos[MAX_MULTI_INSERT_TUPLES];	/* Line # of tuple in copy
 												 * stream */
 } CopyMultiInsertBuffer;
 
@@ -2396,7 +2384,7 @@ CopyMultiInsertBufferInit(ResultRelInfo *rri)
 	CopyMultiInsertBuffer *buffer;
 
 	buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer));
-	memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
+	memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_MULTI_INSERT_TUPLES);
 	buffer->resultRelInfo = rri;
 	buffer->bistate = GetBulkInsertState();
 	buffer->nused = 0;
@@ -2455,8 +2443,8 @@ CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
 static inline bool
 CopyMultiInsertInfoIsFull(CopyMultiInsertInfo *miinfo)
 {
-	if (miinfo->bufferedTuples >= MAX_BUFFERED_TUPLES ||
-		miinfo->bufferedBytes >= MAX_BUFFERED_BYTES)
+	if (miinfo->bufferedTuples >= MAX_MULTI_INSERT_TUPLES ||
+		miinfo->bufferedBytes >= MAX_MULTI_INSERT_BUFFERED_BYTES)
 		return true;
 	return false;
 }
@@ -2574,7 +2562,7 @@ CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo,
 	FreeBulkInsertState(buffer->bistate);
 
 	/* Since we only create slots on demand, just drop the non-null ones. */
-	for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
+	for (i = 0; i < MAX_MULTI_INSERT_TUPLES && buffer->slots[i] != NULL; i++)
 		ExecDropSingleTupleTableSlot(buffer->slots[i]);
 
 	table_finish_bulk_insert(buffer->resultRelInfo->ri_RelationDesc,
@@ -2666,7 +2654,7 @@ CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo,
 	int			nused = buffer->nused;
 
 	Assert(buffer != NULL);
-	Assert(nused < MAX_BUFFERED_TUPLES);
+	Assert(nused < MAX_MULTI_INSERT_TUPLES);
 
 	if (buffer->slots[nused] == NULL)
 		buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL);
diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c
index d53ec952d0..c4f91c8b5d 100644
--- a/src/backend/commands/createas.c
+++ b/src/backend/commands/createas.c
@@ -61,6 +61,13 @@ typedef struct
 	CommandId	output_cid;		/* cmin to insert in output tuples */
 	int			ti_options;		/* table_tuple_insert performance options */
 	BulkInsertState bistate;	/* bulk insert state */
+	MemoryContext	mi_context;	/* A temporary memory context for multi insert */
+	/* Buffered slots for a multi insert batch. */
+	TupleTableSlot *mi_slots[MAX_MULTI_INSERT_TUPLES];
+	/* Number of current buffered slots for a multi insert batch. */
+	int				mi_slots_num;
+	/* Total tuple size for a multi insert batch. */
+	int				mi_slots_size;
 } DR_intorel;
 
 /* utility functions for CTAS definition creation */
@@ -553,6 +560,17 @@ intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
 	myState->output_cid = GetCurrentCommandId(true);
 	myState->ti_options = TABLE_INSERT_SKIP_FSM;
 	myState->bistate = GetBulkInsertState();
+	memset(myState->mi_slots, 0, sizeof(TupleTableSlot *) * MAX_MULTI_INSERT_TUPLES);
+	myState->mi_slots_num = 0;
+	myState->mi_slots_size = 0;
+
+	/*
+	 * Create a temporary memory context so that we can reset once per
+	 * multi insert batch.
+	*/
+	myState->mi_context = AllocSetContextCreate(CurrentMemoryContext,
+												"intorel_multi_insert",
+												ALLOCSET_DEFAULT_SIZES);
 
 	/*
 	 * Valid smgr_targblock implies something already wrote to the relation.
@@ -562,28 +580,82 @@ intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
 }
 
 /*
- * intorel_receive --- receive one tuple
+ * intorel_flush_multi_insert --- insert multiple tuples
  */
-static bool
-intorel_receive(TupleTableSlot *slot, DestReceiver *self)
+static void
+intorel_flush_multi_insert(DR_intorel *myState)
 {
-	DR_intorel *myState = (DR_intorel *) self;
+	MemoryContext oldcontext;
+	int           i;
 
-	/*
-	 * Note that the input slot might not be of the type of the target
-	 * relation. That's supported by table_tuple_insert(), but slightly less
-	 * efficient than inserting with the right slot - but the alternative
-	 * would be to copy into a slot of the right type, which would not be
-	 * cheap either. This also doesn't allow accessing per-AM data (say a
-	 * tuple's xmin), but since we don't do that here...
-	 */
+	oldcontext = MemoryContextSwitchTo(myState->mi_context);
 
-	table_tuple_insert(myState->rel,
-					   slot,
+	table_multi_insert(myState->rel,
+					   myState->mi_slots,
+					   myState->mi_slots_num,
 					   myState->output_cid,
 					   myState->ti_options,
 					   myState->bistate);
 
+	MemoryContextReset(myState->mi_context);
+	MemoryContextSwitchTo(oldcontext);
+
+	for (i = 0; i < myState->mi_slots_num; i++)
+		ExecClearTuple(myState->mi_slots[i]);
+
+	myState->mi_slots_num = 0;
+	myState->mi_slots_size = 0;
+}
+
+/*
+ * intorel_receive --- receive one tuple
+ */
+static bool
+intorel_receive(TupleTableSlot *slot, DestReceiver *self)
+{
+	DR_intorel 	*myState = (DR_intorel *) self;
+	TupleTableSlot  *batchslot;
+	Size		sz = 0;
+
+	sz = GetTupleSize(slot, MAX_MULTI_INSERT_BUFFERED_BYTES);
+
+	/* In case the computed tuple size is 0, we go for single inserts. */
+	if (sz != 0)
+	{
+		if (myState->mi_slots[myState->mi_slots_num] == NULL)
+		{
+			batchslot = table_slot_create(myState->rel, NULL);
+			myState->mi_slots[myState->mi_slots_num] = batchslot;
+		}
+		else
+			batchslot = myState->mi_slots[myState->mi_slots_num];
+
+		ExecCopySlot(batchslot, slot);
+
+		myState->mi_slots_num++;
+		myState->mi_slots_size += sz;
+
+		if (myState->mi_slots_num >= MAX_MULTI_INSERT_TUPLES ||
+			myState->mi_slots_size >= MAX_MULTI_INSERT_BUFFERED_BYTES)
+			intorel_flush_multi_insert(myState);
+	}
+	else
+	{
+		/*
+		 * Note that the input slot might not be of the type of the target
+		 * relation. That's supported by table_tuple_insert(), but slightly
+		 * less efficient than inserting with the right slot - but the
+		 * alternative would be to copy into a slot of the right type, which
+		 * would not be cheap either. This also doesn't allow accessing per-AM
+		 * data (say a tuple's xmin), but since we don't do that here...
+		 */
+		table_tuple_insert(myState->rel,
+						   slot,
+						   myState->output_cid,
+						   myState->ti_options,
+						   myState->bistate);
+	}
+
 	/* We know this is a newly created relation, so there are no indexes */
 
 	return true;
@@ -596,11 +668,23 @@ static void
 intorel_shutdown(DestReceiver *self)
 {
 	DR_intorel *myState = (DR_intorel *) self;
+	int			i;
+
+	if (myState->mi_slots_num != 0)
+		intorel_flush_multi_insert(myState);
+
+	for (i = 0; i < MAX_MULTI_INSERT_TUPLES && myState->mi_slots[i] != NULL; i++)
+		ExecDropSingleTupleTableSlot(myState->mi_slots[i]);
 
 	FreeBulkInsertState(myState->bistate);
 
 	table_finish_bulk_insert(myState->rel, myState->ti_options);
 
+	if (myState->mi_context)
+		MemoryContextDelete(myState->mi_context);
+
+	myState->mi_context = NULL;
+
 	/* close rel, but keep lock until commit */
 	table_close(myState->rel, NoLock);
 	myState->rel = NULL;
diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c
index f80a9e96a9..b05bf357ea 100644
--- a/src/backend/commands/matview.c
+++ b/src/backend/commands/matview.c
@@ -56,6 +56,13 @@ typedef struct
 	CommandId	output_cid;		/* cmin to insert in output tuples */
 	int			ti_options;		/* table_tuple_insert performance options */
 	BulkInsertState bistate;	/* bulk insert state */
+	MemoryContext	mi_context;	/* A temporary memory context for multi insert */
+	/* Buffered slots for a multi insert batch. */
+	TupleTableSlot *mi_slots[MAX_MULTI_INSERT_TUPLES];
+	/* Number of current buffered slots for a multi insert batch. */
+	int				mi_slots_num;
+	/* Total tuple size for a multi insert batch. */
+	int				mi_slots_size;
 } DR_transientrel;
 
 static int	matview_maintenance_depth = 0;
@@ -460,6 +467,18 @@ transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
 	myState->ti_options = TABLE_INSERT_SKIP_FSM | TABLE_INSERT_FROZEN;
 	myState->bistate = GetBulkInsertState();
 
+	memset(myState->mi_slots, 0, sizeof(TupleTableSlot *) * MAX_MULTI_INSERT_TUPLES);
+	myState->mi_slots_num = 0;
+	myState->mi_slots_size = 0;
+
+	/*
+	 * Create a temporary memory context so that we can reset once per
+	 * multi insert batch.
+	*/
+	myState->mi_context = AllocSetContextCreate(CurrentMemoryContext,
+												"transientrel_multi_insert",
+												ALLOCSET_DEFAULT_SIZES);
+
 	/*
 	 * Valid smgr_targblock implies something already wrote to the relation.
 	 * This may be harmless, but this function hasn't planned for it.
@@ -467,6 +486,34 @@ transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
 	Assert(RelationGetTargetBlock(transientrel) == InvalidBlockNumber);
 }
 
+/*
+ * transientrel_flush_multi_insert --- insert multiple tuples
+ */
+static void
+transientrel_flush_multi_insert(DR_transientrel *myState)
+{
+	MemoryContext oldcontext;
+	int           i;
+
+	oldcontext = MemoryContextSwitchTo(myState->mi_context);
+
+	table_multi_insert(myState->transientrel,
+					   myState->mi_slots,
+					   myState->mi_slots_num,
+					   myState->output_cid,
+					   myState->ti_options,
+					   myState->bistate);
+
+	MemoryContextReset(myState->mi_context);
+	MemoryContextSwitchTo(oldcontext);
+
+	for (i = 0; i < myState->mi_slots_num; i++)
+		ExecClearTuple(myState->mi_slots[i]);
+
+	myState->mi_slots_num = 0;
+	myState->mi_slots_size = 0;
+}
+
 /*
  * transientrel_receive --- receive one tuple
  */
@@ -474,7 +521,33 @@ static bool
 transientrel_receive(TupleTableSlot *slot, DestReceiver *self)
 {
 	DR_transientrel *myState = (DR_transientrel *) self;
+	TupleTableSlot  *batchslot;
+	Size		sz = 0;
+
+	sz = GetTupleSize(slot, MAX_MULTI_INSERT_BUFFERED_BYTES);
 
+	/* In case the computed tuple size is 0, we go for single inserts. */
+	if (sz != 0)
+	{
+		if (myState->mi_slots[myState->mi_slots_num] == NULL)
+		{
+			batchslot = table_slot_create(myState->transientrel, NULL);
+			myState->mi_slots[myState->mi_slots_num] = batchslot;
+		}
+		else
+			batchslot = myState->mi_slots[myState->mi_slots_num];
+
+		ExecCopySlot(batchslot, slot);
+
+		myState->mi_slots_num++;
+		myState->mi_slots_size += sz;
+
+		if (myState->mi_slots_num >= MAX_MULTI_INSERT_TUPLES ||
+			myState->mi_slots_size >= MAX_MULTI_INSERT_BUFFERED_BYTES)
+			transientrel_flush_multi_insert(myState);
+	}
+	else
+	{
 	/*
 	 * Note that the input slot might not be of the type of the target
 	 * relation. That's supported by table_tuple_insert(), but slightly less
@@ -484,11 +557,12 @@ transientrel_receive(TupleTableSlot *slot, DestReceiver *self)
 	 * tuple's xmin), but since we don't do that here...
 	 */
 
-	table_tuple_insert(myState->transientrel,
-					   slot,
-					   myState->output_cid,
-					   myState->ti_options,
-					   myState->bistate);
+		table_tuple_insert(myState->transientrel,
+						   slot,
+						   myState->output_cid,
+						   myState->ti_options,
+						   myState->bistate);
+	}
 
 	/* We know this is a newly created relation, so there are no indexes */
 
@@ -502,11 +576,23 @@ static void
 transientrel_shutdown(DestReceiver *self)
 {
 	DR_transientrel *myState = (DR_transientrel *) self;
+	int			i;
+
+	if (myState->mi_slots_num != 0)
+		transientrel_flush_multi_insert(myState);
+
+	for (i = 0; i < MAX_MULTI_INSERT_TUPLES && myState->mi_slots[i] != NULL; i++)
+		ExecDropSingleTupleTableSlot(myState->mi_slots[i]);
 
 	FreeBulkInsertState(myState->bistate);
 
 	table_finish_bulk_insert(myState->transientrel, myState->ti_options);
 
+	if (myState->mi_context)
+		MemoryContextDelete(myState->mi_context);
+
+	myState->mi_context = NULL;
+
 	/* close transientrel, but keep lock until commit */
 	table_close(myState->transientrel, NoLock);
 	myState->transientrel = NULL;
diff --git a/src/backend/executor/execTuples.c b/src/backend/executor/execTuples.c
index 4c90ac5236..5ee4135151 100644
--- a/src/backend/executor/execTuples.c
+++ b/src/backend/executor/execTuples.c
@@ -2318,3 +2318,69 @@ end_tup_output(TupOutputState *tstate)
 	ExecDropSingleTupleTableSlot(tstate->slot);
 	pfree(tstate);
 }
+
+/*
+ * GetTupleSize - Compute the tuple size given a table slot.
+ *
+ * For heap tuple, buffer tuple and minimal tuple slot types return the actual
+ * tuple size that exists. For virtual tuple, the size is calculated as the
+ * slot does not have the tuple size. If the computed size exceeds the given
+ * maxsize for the virtual tuple, this function exits, not investing time in
+ * further unnecessary calculation.
+ */
+inline Size
+GetTupleSize(TupleTableSlot *slot, Size maxsize)
+{
+	Size sz = 0;
+	HeapTuple tuple = NULL;
+
+	if (TTS_IS_HEAPTUPLE(slot))
+		tuple = ((HeapTupleTableSlot *) slot)->tuple;
+	else if(TTS_IS_BUFFERTUPLE(slot))
+		tuple = ((BufferHeapTupleTableSlot *) slot)->base.tuple;
+	else if(TTS_IS_MINIMALTUPLE(slot))
+		tuple = ((MinimalTupleTableSlot *) slot)->tuple;
+	else if(TTS_IS_VIRTUAL(slot))
+	{
+		/* Size calculation is inspired from tts_virtual_materialize(). */
+		TupleDesc	desc = slot->tts_tupleDescriptor;
+
+		for (int natt = 0; natt < desc->natts; natt++)
+		{
+			Form_pg_attribute att = TupleDescAttr(desc, natt);
+			Datum		val;
+
+			if (att->attbyval)
+				sz += att->attlen;
+
+			if (slot->tts_isnull[natt])
+				continue;
+
+			val = slot->tts_values[natt];
+
+			if (att->attlen == -1 &&
+				VARATT_IS_EXTERNAL_EXPANDED(DatumGetPointer(val)))
+			{
+				sz = att_align_nominal(sz, att->attalign);
+				sz += EOH_get_flat_size(DatumGetEOHP(val));
+			}
+			else
+			{
+				sz = att_align_nominal(sz, att->attalign);
+				sz = att_addlength_datum(sz, att->attlen, val);
+			}
+
+			/*
+			 * We are not interested in proceeding further if the computed size
+			 * crosses maxsize limit that we are looking for.
+			 */
+			if (maxsize != 0 && sz >= maxsize)
+				break;
+		}
+	}
+
+	if (tuple != NULL && !TTS_IS_VIRTUAL(slot))
+		sz = tuple->t_len;
+
+	return sz;
+}
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index 387eb34a61..087aabe880 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -140,6 +140,21 @@ typedef struct TM_FailureData
 /* Follow update chain and lock latest version of tuple */
 #define TUPLE_LOCK_FLAG_FIND_LAST_VERSION		(1 << 1)
 
+/*
+ * No more than this many tuples per multi insert buffer
+ *
+ * Caution: Don't make this too big. We could end up with this many multi
+ * insert buffer items stored as a list. Increasing this can cause quadratic
+ * growth in memory requirements during copies into partitioned tables with a
+ * large number of partitions.
+ */
+#define MAX_MULTI_INSERT_TUPLES        1000
+
+/*
+ * Flush buffers if there are >= this many bytes, as counted by the input
+ * size of the tuples stored.
+ */
+#define MAX_MULTI_INSERT_BUFFERED_BYTES		65535
 
 /* Typedef for callback function for table_index_build_scan */
 typedef void (*IndexBuildCallback) (Relation index,
diff --git a/src/include/executor/tuptable.h b/src/include/executor/tuptable.h
index f7df70b5ab..4336cc8ec8 100644
--- a/src/include/executor/tuptable.h
+++ b/src/include/executor/tuptable.h
@@ -329,7 +329,7 @@ extern Datum ExecFetchSlotHeapTupleDatum(TupleTableSlot *slot);
 extern void slot_getmissingattrs(TupleTableSlot *slot, int startAttNum,
 								 int lastAttNum);
 extern void slot_getsomeattrs_int(TupleTableSlot *slot, int attnum);
-
+extern Size GetTupleSize(TupleTableSlot *slot, Size maxsize);
 
 #ifndef FRONTEND
 
-- 
2.25.1

