On Mon, Apr 5, 2021 at 9:49 AM Bharath Rupireddy
<bharath.rupireddyforpostg...@gmail.com> wrote:
>
> On Wed, Mar 10, 2021 at 10:21 AM Bharath Rupireddy
> <bharath.rupireddyforpostg...@gmail.com> wrote:
> > Attaching the v4 patch set. Please review it further.
>
> Attaching v5 patch set after rebasing onto the latest master.

Another rebase due to conflicts in 0003. Attaching v6 for review.

With Regards,
Bharath Rupireddy.
EnterpriseDB: http://www.enterprisedb.com
From 6518212583e24b017375512701d9fefa6de20e42 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddy@enterprisedb.com>
Date: Wed, 10 Mar 2021 09:53:48 +0530
Subject: [PATCH v6 1/3] New Table AMs for Multi and Single Inserts

This patch introduces new table access methods for multi and
single inserts. Also implements/rearranges the outside code for
heap am into these new APIs.

Main design goal of these new APIs is to give flexibility to
tableam developers in implementing multi insert logic dependent on
the underlying storage engine. Currently, for all the underlying
storage engines, we follow the same multi insert logic such as when
and how to flush the buffered tuples, tuple size calculation, and
this logic doesn't take into account the underlying storage engine
capabilities.

We can also avoid duplicating multi insert code (for existing COPY,
and upcoming CTAS, CREATE/REFRESH MAT VIEW and INSERT SELECTs). We
can also move bulk insert state allocation and deallocation inside
these APIs.
---
 src/backend/access/heap/heapam.c         | 212 +++++++++++++++++++++++
 src/backend/access/heap/heapam_handler.c |   5 +
 src/backend/access/table/tableamapi.c    |   7 +
 src/backend/executor/execTuples.c        |  83 ++++++++-
 src/include/access/heapam.h              |  49 +++++-
 src/include/access/tableam.h             |  87 ++++++++++
 src/include/executor/tuptable.h          |   1 +
 7 files changed, 438 insertions(+), 6 deletions(-)

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 3b435c107d..d8bfe17f22 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -67,6 +67,7 @@
 #include "utils/datum.h"
 #include "utils/inval.h"
 #include "utils/lsyscache.h"
+#include "utils/memutils.h"
 #include "utils/relcache.h"
 #include "utils/snapmgr.h"
 #include "utils/spccache.h"
@@ -2669,6 +2670,217 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples,
 	pgstat_count_heap_insert(relation, ntuples);
 }
 
+/*
+ * heap_insert_begin - allocate and initialize TableInsertState
+ *
+ * For single inserts:
+ *  1) Specify is_multi as false, then multi insert state will be NULL.
+ *
+ * For multi inserts:
+ *  1) Specify is_multi as true, then multi insert state will be allocated and
+ * 	   initialized.
+ *
+ *  Other input parameters i.e. relation, command id, options are common for
+ *  both single and multi inserts.
+ */
+TableInsertState*
+heap_insert_begin(Relation rel, CommandId cid, int options, bool is_multi)
+{
+	TableInsertState *state;
+
+	state = palloc(sizeof(TableInsertState));
+	state->rel = rel;
+	state->cid = cid;
+	state->options = options;
+	/* Below parameters are not used for single inserts. */
+	state->mi_slots = NULL;
+	state->mistate = NULL;
+	state->mi_cur_slots = 0;
+	state->flushed = false;
+
+	if (is_multi)
+	{
+		HeapMultiInsertState *mistate;
+
+		mistate = palloc(sizeof(HeapMultiInsertState));
+		state->mi_slots =
+				palloc0(sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
+		mistate->max_slots = MAX_BUFFERED_TUPLES;
+		mistate->max_size = MAX_BUFFERED_BYTES;
+		mistate->cur_size = 0;
+		/*
+		 * Create a temporary memory context so that we can reset once per
+		 * multi insert batch.
+		 */
+		mistate->context = AllocSetContextCreate(CurrentMemoryContext,
+												 "heap_multi_insert",
+												 ALLOCSET_DEFAULT_SIZES);
+		state->mistate = mistate;
+	}
+
+	return state;
+}
+
+/*
+ * heap_insert_v2 - insert single tuple into a heap
+ *
+ * Insert tuple from slot into table. This is like heap_insert(), the only
+ * difference is that the parameters for insertion are inside table insert
+ * state structure.
+ */
+void
+heap_insert_v2(TableInsertState *state, TupleTableSlot *slot)
+{
+	bool		shouldFree = true;
+	HeapTuple	tuple = ExecFetchSlotHeapTuple(slot, true, &shouldFree);
+
+	Assert(state);
+
+	/* Update tuple with table oid */
+	slot->tts_tableOid = RelationGetRelid(state->rel);
+	tuple->t_tableOid = slot->tts_tableOid;
+
+	/* Perform insertion, and copy the resulting ItemPointer */
+	heap_insert(state->rel, tuple, state->cid, state->options, state->bistate);
+	ItemPointerCopy(&tuple->t_self, &slot->tts_tid);
+
+	if (shouldFree)
+		pfree(tuple);
+}
+
+/*
+ * heap_multi_insert_v2 - insert multiple tuples into a heap
+ *
+ * Compute size of tuple. See if the buffered slots can hold the tuple. If yes,
+ * store it in the buffers, otherwise flush i.e. insert the so far buffered
+ * tuples into heap.
+ *
+ * Flush can happen:
+ *  1) either if all the buffered slots are filled up
+ *  2) or if total tuple size of the currently buffered slots are >= max_size
+ */
+void
+heap_multi_insert_v2(TableInsertState *state, TupleTableSlot *slot)
+{
+	TupleTableSlot  *batchslot;
+	HeapMultiInsertState *mistate;
+	Size sz;
+
+	Assert(state);
+
+	mistate = (HeapMultiInsertState *)state->mistate;
+
+	Assert(mistate && state->mi_slots);
+
+	/* Reset flush state if previously set. */
+	if (state->flushed)
+	{
+		state->mi_cur_slots = 0;
+		state->flushed = false;
+	}
+
+	Assert(state->mi_cur_slots < mistate->max_slots);
+
+	if (state->mi_slots[state->mi_cur_slots] == NULL)
+		state->mi_slots[state->mi_cur_slots] =
+									table_slot_create(state->rel, NULL);
+
+	batchslot = state->mi_slots[state->mi_cur_slots];
+
+	ExecClearTuple(batchslot);
+	ExecCopySlot(batchslot, slot);
+
+	/*
+	 * Calculate tuple size after original slot is copied, because the copied
+	 * slot type and tuple size may change.
+	 */
+	sz = GetTupleSize(batchslot, mistate->max_size);
+
+	Assert(sz > 0);
+
+	state->mi_cur_slots++;
+	mistate->cur_size += sz;
+
+	if (state->mi_cur_slots >= mistate->max_slots ||
+		mistate->cur_size >= mistate->max_size)
+		heap_multi_insert_flush(state);
+}
+
+/*
+ * heap_multi_insert_flush - flush buffered tuples, if any, into a heap
+ *
+ * Flush the buffered tuples, indicate caller that flushing happened and reset
+ * parameters.
+ */
+void
+heap_multi_insert_flush(TableInsertState *state)
+{
+	HeapMultiInsertState *mistate;
+	MemoryContext oldcontext;
+
+	Assert(state);
+
+	mistate = (HeapMultiInsertState *)state->mistate;
+
+	Assert(mistate && state->mi_slots && state->mi_cur_slots >= 0 &&
+		   mistate->context);
+
+	if (state->flushed)
+	{
+		state->mi_cur_slots = 0;
+		state->flushed = false;
+		return;
+	}
+
+	oldcontext = MemoryContextSwitchTo(mistate->context);
+	heap_multi_insert(state->rel, state->mi_slots, state->mi_cur_slots,
+					  state->cid, state->options, state->bistate);
+	MemoryContextReset(mistate->context);
+	MemoryContextSwitchTo(oldcontext);
+
+	state->flushed = true;
+	mistate->cur_size = 0;
+}
+
+/*
+ * heap_insert_end - clean up TableInsertState
+ *
+ * For multi inserts, ensure to flush all the remaining buffers with
+ * heap_multi_insert_flush before calling this function.
+ *
+ * In this function, buffered slots are dropped, short-lived memory context is
+ * deleted, mistate and TableInsertState are freed up.
+ */
+void
+heap_insert_end(TableInsertState *state)
+{
+	HeapMultiInsertState *mistate;
+	int i;
+
+	Assert(state);
+
+	mistate = (HeapMultiInsertState *)state->mistate;
+
+	if (!mistate)
+	{
+		pfree(state);
+		return;
+	}
+
+	Assert(state->mi_slots &&  mistate->context);
+
+	/* Ensure that the buffers have been flushed before. */
+	Assert(state->mi_cur_slots == 0 || state->flushed);
+
+	for (i = 0; i < mistate->max_slots && state->mi_slots[i] != NULL; i++)
+		ExecDropSingleTupleTableSlot(state->mi_slots[i]);
+
+	MemoryContextDelete(mistate->context);
+	pfree(mistate);
+	pfree(state->mi_slots);
+	pfree(state);
+}
+
 /*
  *	simple_heap_insert - insert a tuple
  *
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index bd5faf0c1f..655de8e6b7 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -2558,6 +2558,11 @@ static const TableAmRoutine heapam_methods = {
 	.tuple_insert_speculative = heapam_tuple_insert_speculative,
 	.tuple_complete_speculative = heapam_tuple_complete_speculative,
 	.multi_insert = heap_multi_insert,
+	.tuple_insert_begin = heap_insert_begin,
+	.tuple_insert_v2 = heap_insert_v2,
+	.multi_insert_v2 = heap_multi_insert_v2,
+	.multi_insert_flush = heap_multi_insert_flush,
+	.tuple_insert_end = heap_insert_end,
 	.tuple_delete = heapam_tuple_delete,
 	.tuple_update = heapam_tuple_update,
 	.tuple_lock = heapam_tuple_lock,
diff --git a/src/backend/access/table/tableamapi.c b/src/backend/access/table/tableamapi.c
index 325ecdc122..95f1f9b6a0 100644
--- a/src/backend/access/table/tableamapi.c
+++ b/src/backend/access/table/tableamapi.c
@@ -78,6 +78,13 @@ GetTableAmRoutine(Oid amhandler)
 	Assert(routine->tuple_complete_speculative != NULL);
 
 	Assert(routine->multi_insert != NULL);
+
+	Assert(routine->tuple_insert_begin != NULL);
+	Assert(routine->tuple_insert_v2 != NULL);
+	Assert(routine->multi_insert_v2 != NULL);
+	Assert(routine->multi_insert_flush != NULL);
+	Assert(routine->tuple_insert_end != NULL);
+
 	Assert(routine->tuple_delete != NULL);
 	Assert(routine->tuple_update != NULL);
 	Assert(routine->tuple_lock != NULL);
diff --git a/src/backend/executor/execTuples.c b/src/backend/executor/execTuples.c
index 73c35df9c9..79ae22455a 100644
--- a/src/backend/executor/execTuples.c
+++ b/src/backend/executor/execTuples.c
@@ -159,7 +159,11 @@ tts_virtual_materialize(TupleTableSlot *slot)
 	if (TTS_SHOULDFREE(slot))
 		return;
 
-	/* compute size of memory required */
+	/*
+	 * Compute size of memory required. This size calculation code is also used
+	 * in GetTupleSize(), hence ensure to have the same changes or fixes here
+	 * and also there.
+	 */
 	for (int natt = 0; natt < desc->natts; natt++)
 	{
 		Form_pg_attribute att = TupleDescAttr(desc, natt);
@@ -1239,6 +1243,83 @@ ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
 	pfree(slot);
 }
 
+/*
+ * GetTupleSize - Compute the tuple size given a table slot.
+ *
+ * For heap tuple, buffer tuple and minimal tuple slot types return the actual
+ * tuple size that exists. For virtual tuple, the size is calculated as the
+ * slot does not have the tuple size. If the computed size exceeds the given
+ * maxsize for the virtual tuple, this function exits, not investing time in
+ * further unnecessary calculation.
+ *
+ * Important Notes:
+ * 1) Size calculation code for virtual slots is being used from
+ * 	  tts_virtual_materialize(), hence ensure to have the same changes or fixes
+ * 	  here and also there.
+ * 2) Currently, GetTupleSize() handles the existing heap, buffer, minimal and
+ * 	  virtual slots. Ensure to add related code in case any new slot type is
+ *    introduced.
+ */
+inline Size
+GetTupleSize(TupleTableSlot *slot, Size maxsize)
+{
+	Size sz = 0;
+	HeapTuple tuple = NULL;
+
+	if (TTS_IS_HEAPTUPLE(slot))
+		tuple = ((HeapTupleTableSlot *) slot)->tuple;
+	else if(TTS_IS_BUFFERTUPLE(slot))
+		tuple = ((BufferHeapTupleTableSlot *) slot)->base.tuple;
+	else if(TTS_IS_MINIMALTUPLE(slot))
+		tuple = ((MinimalTupleTableSlot *) slot)->tuple;
+	else if(TTS_IS_VIRTUAL(slot))
+	{
+		/*
+		 * Size calculation code being used here is from
+		 * tts_virtual_materialize(), ensure to have the same changes or fixes
+		 * here and also there.
+		 */
+		TupleDesc	desc = slot->tts_tupleDescriptor;
+
+		for (int natt = 0; natt < desc->natts; natt++)
+		{
+			Form_pg_attribute att = TupleDescAttr(desc, natt);
+			Datum		val;
+
+			if (att->attbyval)
+				sz += att->attlen;
+
+			if (slot->tts_isnull[natt])
+				continue;
+
+			val = slot->tts_values[natt];
+
+			if (att->attlen == -1 &&
+				VARATT_IS_EXTERNAL_EXPANDED(DatumGetPointer(val)))
+			{
+				sz = att_align_nominal(sz, att->attalign);
+				sz += EOH_get_flat_size(DatumGetEOHP(val));
+			}
+			else
+			{
+				sz = att_align_nominal(sz, att->attalign);
+				sz = att_addlength_datum(sz, att->attlen, val);
+			}
+
+			/*
+			 * We are not interested in proceeding further if the computed size
+			 * crosses maxsize limit that we are looking for.
+			 */
+			if (maxsize != 0 && sz >= maxsize)
+				break;
+		}
+	}
+
+	if (tuple != NULL && !TTS_IS_VIRTUAL(slot))
+		sz = tuple->t_len;
+
+	return sz;
+}
 
 /* ----------------------------------------------------------------
  *				  tuple table slot accessor functions
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index bc0936bc2d..da74ab072d 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -36,11 +36,26 @@
 #define HEAP_INSERT_NO_LOGICAL	TABLE_INSERT_NO_LOGICAL
 #define HEAP_INSERT_SPECULATIVE 0x0010
 
-typedef struct BulkInsertStateData *BulkInsertState;
 struct TupleTableSlot;
 
 #define MaxLockTupleMode	LockTupleExclusive
 
+/*
+ * No more than this many tuples per single multi insert batch
+ *
+ * Caution: Don't make this too big, as we could end up with this many tuples
+ * stored in multi insert buffer. Increasing this can cause quadratic growth in
+ * memory requirements during copies into partitioned tables with a large
+ * number of partitions.
+ */
+#define MAX_BUFFERED_TUPLES		1000
+
+/*
+ * Flush multi insert buffers if there are >= this many bytes, as counted by
+ * the size of the tuples buffered.
+ */
+#define MAX_BUFFERED_BYTES		65535
+
 /*
  * Descriptor for heap table scans.
  */
@@ -93,6 +108,25 @@ typedef enum
 	HEAPTUPLE_DELETE_IN_PROGRESS	/* deleting xact is still in progress */
 } HTSV_Result;
 
+/* Holds multi insert state for heap access method.*/
+typedef struct HeapMultiInsertState
+{
+	/* Switch to short-lived memory context before flushing. */
+	MemoryContext       context;
+	/* Maximum number of slots that can be buffered. */
+	int32               max_slots;
+	/*
+	 * Maximum size (in bytes) of all the tuples that a single batch of
+	 * buffered slots can hold.
+	 */
+	int64               max_size;
+	/*
+	 * Total tuple size (in bytes) of the slots that are currently buffered.
+	 * Flush the buffered slots when cur_size >= max_size.
+	 */
+	int64               cur_size;
+} HeapMultiInsertState;
+
 /* ----------------
  *		function prototypes for heap access method
  *
@@ -134,15 +168,20 @@ extern bool heap_hot_search_buffer(ItemPointer tid, Relation relation,
 
 extern void heap_get_latest_tid(TableScanDesc scan, ItemPointer tid);
 
-extern BulkInsertState GetBulkInsertState(void);
-extern void FreeBulkInsertState(BulkInsertState);
-extern void ReleaseBulkInsertStatePin(BulkInsertState bistate);
-
 extern void heap_insert(Relation relation, HeapTuple tup, CommandId cid,
 						int options, BulkInsertState bistate);
 extern void heap_multi_insert(Relation relation, struct TupleTableSlot **slots,
 							  int ntuples, CommandId cid, int options,
 							  BulkInsertState bistate);
+
+extern TableInsertState* heap_insert_begin(Relation rel, CommandId cid,
+										   int options, bool is_multi);
+extern void heap_insert_v2(TableInsertState *state, TupleTableSlot *slot);
+extern void heap_multi_insert_v2(TableInsertState *state,
+								 TupleTableSlot *slot);
+extern void heap_multi_insert_flush(TableInsertState *state);
+extern void heap_insert_end(TableInsertState *state);
+
 extern TM_Result heap_delete(Relation relation, ItemPointer tid,
 							 CommandId cid, Snapshot crosscheck, bool wait,
 							 struct TM_FailureData *tmfd, bool changingPart);
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index 414b6b4d57..2a1470a7b6 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -229,6 +229,32 @@ typedef struct TM_IndexDeleteOp
 	TM_IndexStatus *status;
 } TM_IndexDeleteOp;
 
+/* Holds table insert state. */
+typedef struct TableInsertState
+{
+	Relation	rel;
+	/* Bulk insert state if requested, otherwise NULL. */
+	struct BulkInsertStateData	*bistate;
+	CommandId	cid;
+	int	options;
+	/* Below members are only used for multi inserts. */
+	/* Array of buffered slots. */
+	TupleTableSlot	**mi_slots;
+	/* Number of slots that are currently buffered. */
+	int32	mi_cur_slots;
+	/*
+	 * Access method specific information such as parameters that are needed
+	 * for buffering and flushing decisions can go here.
+	 */
+	void	*mistate;
+	/*
+	 * This parameter indicates whether or not the buffered slots have been
+	 * flushed to a table. Used by callers of multi insert API for inserting
+	 * into indexes or executing after row triggers, if any.
+	 */
+	bool	flushed;
+}TableInsertState;
+
 /* "options" flag bits for table_tuple_insert */
 /* TABLE_INSERT_SKIP_WAL was 0x0001; RelationNeedsWAL() now governs */
 #define TABLE_INSERT_SKIP_FSM		0x0002
@@ -504,6 +530,17 @@ typedef struct TableAmRoutine
 	void		(*multi_insert) (Relation rel, TupleTableSlot **slots, int nslots,
 								 CommandId cid, int options, struct BulkInsertStateData *bistate);
 
+	TableInsertState* (*tuple_insert_begin) (Relation rel, CommandId cid,
+											 int options, bool is_multi);
+
+	void (*tuple_insert_v2) (TableInsertState *state, TupleTableSlot *slot);
+
+	void (*multi_insert_v2) (TableInsertState *state, TupleTableSlot *slot);
+
+	void (*multi_insert_flush) (TableInsertState *state);
+
+	void (*tuple_insert_end) (TableInsertState *state);
+
 	/* see table_tuple_delete() for reference about parameters */
 	TM_Result	(*tuple_delete) (Relation rel,
 								 ItemPointer tid,
@@ -851,6 +888,8 @@ typedef struct TableAmRoutine
 } TableAmRoutine;
 
 
+typedef struct BulkInsertStateData *BulkInsertState;
+
 /* ----------------------------------------------------------------------------
  * Slot functions.
  * ----------------------------------------------------------------------------
@@ -869,6 +908,10 @@ extern const TupleTableSlotOps *table_slot_callbacks(Relation rel);
  */
 extern TupleTableSlot *table_slot_create(Relation rel, List **reglist);
 
+/* Bulk insert state functions. */
+extern BulkInsertState GetBulkInsertState(void);
+extern void FreeBulkInsertState(BulkInsertState);
+extern void ReleaseBulkInsertStatePin(BulkInsertState bistate);
 
 /* ----------------------------------------------------------------------------
  * Table scan functions.
@@ -1430,6 +1473,50 @@ table_multi_insert(Relation rel, TupleTableSlot **slots, int nslots,
 								  cid, options, bistate);
 }
 
+static inline TableInsertState*
+table_insert_begin(Relation rel, CommandId cid, int options,
+				   bool alloc_bistate, bool is_multi)
+{
+	TableInsertState *state = rel->rd_tableam->tuple_insert_begin(rel, cid,
+										options, is_multi);
+
+	/* Allocate bulk insert state here, since it's AM independent. */
+	if (alloc_bistate)
+		state->bistate = GetBulkInsertState();
+	else
+		state->bistate = NULL;
+
+	return state;
+}
+
+static inline void
+table_tuple_insert_v2(TableInsertState *state, TupleTableSlot *slot)
+{
+	state->rel->rd_tableam->tuple_insert_v2(state, slot);
+}
+
+static inline void
+table_multi_insert_v2(TableInsertState *state, TupleTableSlot *slot)
+{
+	state->rel->rd_tableam->multi_insert_v2(state, slot);
+}
+
+static inline void
+table_multi_insert_flush(TableInsertState *state)
+{
+	state->rel->rd_tableam->multi_insert_flush(state);
+}
+
+static inline void
+table_insert_end(TableInsertState *state)
+{
+	/* Deallocate bulk insert state here, since it's AM independent. */
+	if (state->bistate)
+		FreeBulkInsertState(state->bistate);
+
+	state->rel->rd_tableam->tuple_insert_end(state);
+}
+
 /*
  * Delete a tuple.
  *
diff --git a/src/include/executor/tuptable.h b/src/include/executor/tuptable.h
index 679e57fbdd..1f59614183 100644
--- a/src/include/executor/tuptable.h
+++ b/src/include/executor/tuptable.h
@@ -330,6 +330,7 @@ extern void slot_getmissingattrs(TupleTableSlot *slot, int startAttNum,
 								 int lastAttNum);
 extern void slot_getsomeattrs_int(TupleTableSlot *slot, int attnum);
 
+extern Size GetTupleSize(TupleTableSlot *slot, Size maxsize);
 
 #ifndef FRONTEND
 
-- 
2.25.1

From d9de92281d7b5c44a6a15994a9a11052149c9981 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddy@enterprisedb.com>
Date: Wed, 10 Mar 2021 09:54:59 +0530
Subject: [PATCH v6 2/3] CTAS and REFRESH Mat View With New Multi Insert Table AM

This patch adds new multi insert table access methods to
CREATE TABLE AS, CREATE MATERIALIZED VIEW and REFRESH MATERIALIZED
VIEW.
---
 src/backend/commands/createas.c | 49 +++++++++++++++++----------------
 src/backend/commands/matview.c  | 35 ++++++++++++-----------
 2 files changed, 43 insertions(+), 41 deletions(-)

diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c
index dce882012e..36ad0ef698 100644
--- a/src/backend/commands/createas.c
+++ b/src/backend/commands/createas.c
@@ -58,9 +58,7 @@ typedef struct
 	/* These fields are filled by intorel_startup: */
 	Relation	rel;			/* relation to write to */
 	ObjectAddress reladdr;		/* address of rel, for ExecCreateTableAs */
-	CommandId	output_cid;		/* cmin to insert in output tuples */
-	int			ti_options;		/* table_tuple_insert performance options */
-	BulkInsertState bistate;	/* bulk insert state */
+	TableInsertState *istate;	/* insert state */
 } DR_intorel;
 
 /* utility functions for CTAS definition creation */
@@ -546,22 +544,26 @@ intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
 	if (is_matview && !into->skipData)
 		SetMatViewPopulatedState(intoRelationDesc, true);
 
-	/*
-	 * Fill private fields of myState for use by later routines
-	 */
-	myState->rel = intoRelationDesc;
-	myState->reladdr = intoRelationAddr;
-	myState->output_cid = GetCurrentCommandId(true);
-	myState->ti_options = TABLE_INSERT_SKIP_FSM;
-
 	/*
 	 * If WITH NO DATA is specified, there is no need to set up the state for
-	 * bulk inserts as there are no tuples to insert.
+	 * bulk inserts and multi inserts as there are no tuples to insert.
 	 */
 	if (!into->skipData)
-		myState->bistate = GetBulkInsertState();
+	{
+		myState->istate = table_insert_begin(intoRelationDesc,
+											 GetCurrentCommandId(true),
+											 TABLE_INSERT_SKIP_FSM,
+											 true,
+											 true);
+	}
 	else
-		myState->bistate = NULL;
+		myState->istate = NULL;
+
+	/*
+	 * Fill private fields of myState for use by later routines
+	 */
+	myState->rel = intoRelationDesc;
+	myState->reladdr = intoRelationAddr;
 
 	/*
 	 * Valid smgr_targblock implies something already wrote to the relation.
@@ -589,11 +591,7 @@ intorel_receive(TupleTableSlot *slot, DestReceiver *self)
 		 * would not be cheap either. This also doesn't allow accessing per-AM
 		 * data (say a tuple's xmin), but since we don't do that here...
 		 */
-		table_tuple_insert(myState->rel,
-						   slot,
-						   myState->output_cid,
-						   myState->ti_options,
-						   myState->bistate);
+		table_multi_insert_v2(myState->istate, slot);
 	}
 
 	/* We know this is a newly created relation, so there are no indexes */
@@ -608,12 +606,17 @@ static void
 intorel_shutdown(DestReceiver *self)
 {
 	DR_intorel *myState = (DR_intorel *) self;
-	IntoClause *into = myState->into;
+	int ti_options;
 
-	if (!into->skipData)
+	if (!myState->into->skipData)
 	{
-		FreeBulkInsertState(myState->bistate);
-		table_finish_bulk_insert(myState->rel, myState->ti_options);
+		ti_options = myState->istate->options;
+
+		table_multi_insert_flush(myState->istate);
+
+		table_insert_end(myState->istate);
+
+		table_finish_bulk_insert(myState->rel, ti_options);
 	}
 
 	/* close rel, but keep lock until commit */
diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c
index c5c25ce11d..9c6b5f8525 100644
--- a/src/backend/commands/matview.c
+++ b/src/backend/commands/matview.c
@@ -52,10 +52,7 @@ typedef struct
 	DestReceiver pub;			/* publicly-known function pointers */
 	Oid			transientoid;	/* OID of new heap into which to store */
 	/* These fields are filled by transientrel_startup: */
-	Relation	transientrel;	/* relation to write to */
-	CommandId	output_cid;		/* cmin to insert in output tuples */
-	int			ti_options;		/* table_tuple_insert performance options */
-	BulkInsertState bistate;	/* bulk insert state */
+	TableInsertState *istate;	/* insert state */
 } DR_transientrel;
 
 static int	matview_maintenance_depth = 0;
@@ -466,10 +463,11 @@ transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
 	/*
 	 * Fill private fields of myState for use by later routines
 	 */
-	myState->transientrel = transientrel;
-	myState->output_cid = GetCurrentCommandId(true);
-	myState->ti_options = TABLE_INSERT_SKIP_FSM | TABLE_INSERT_FROZEN;
-	myState->bistate = GetBulkInsertState();
+	myState->istate = table_insert_begin(transientrel,
+										 GetCurrentCommandId(true),
+										 TABLE_INSERT_SKIP_FSM | TABLE_INSERT_FROZEN,
+										 true,
+										 true);
 
 	/*
 	 * Valid smgr_targblock implies something already wrote to the relation.
@@ -494,12 +492,7 @@ transientrel_receive(TupleTableSlot *slot, DestReceiver *self)
 	 * cheap either. This also doesn't allow accessing per-AM data (say a
 	 * tuple's xmin), but since we don't do that here...
 	 */
-
-	table_tuple_insert(myState->transientrel,
-					   slot,
-					   myState->output_cid,
-					   myState->ti_options,
-					   myState->bistate);
+	table_multi_insert_v2(myState->istate, slot);
 
 	/* We know this is a newly created relation, so there are no indexes */
 
@@ -513,14 +506,20 @@ static void
 transientrel_shutdown(DestReceiver *self)
 {
 	DR_transientrel *myState = (DR_transientrel *) self;
+	int ti_options;
+	Relation transientrel;
+
+	ti_options = myState->istate->options;
+	transientrel = myState->istate->rel;
+
+	table_multi_insert_flush(myState->istate);
 
-	FreeBulkInsertState(myState->bistate);
+	table_insert_end(myState->istate);
 
-	table_finish_bulk_insert(myState->transientrel, myState->ti_options);
+	table_finish_bulk_insert(transientrel, ti_options);
 
 	/* close transientrel, but keep lock until commit */
-	table_close(myState->transientrel, NoLock);
-	myState->transientrel = NULL;
+	table_close(transientrel, NoLock);
 }
 
 /*
-- 
2.25.1

From 26740527f650f6edb70e580d46a4b86124da74e5 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddy@enterprisedb.com>
Date: Mon, 19 Apr 2021 10:02:31 +0530
Subject: [PATCH v6 3/3] COPY With New Multi and Single Insert Table AM

This patch adds new single and multi insert table access method to
COPY code.
---
 src/backend/commands/copyfrom.c | 464 ++++++++++----------------------
 1 file changed, 144 insertions(+), 320 deletions(-)

diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index 40a54ad0bd..0117413943 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -53,54 +53,17 @@
 #include "utils/rel.h"
 #include "utils/snapmgr.h"
 
-/*
- * No more than this many tuples per CopyMultiInsertBuffer
- *
- * Caution: Don't make this too big, as we could end up with this many
- * CopyMultiInsertBuffer items stored in CopyMultiInsertInfo's
- * multiInsertBuffers list.  Increasing this can cause quadratic growth in
- * memory requirements during copies into partitioned tables with a large
- * number of partitions.
- */
-#define MAX_BUFFERED_TUPLES		1000
-
-/*
- * Flush buffers if there are >= this many bytes, as counted by the input
- * size, of tuples stored.
- */
-#define MAX_BUFFERED_BYTES		65535
-
 /* Trim the list of buffers back down to this number after flushing */
 #define MAX_PARTITION_BUFFERS	32
 
 /* Stores multi-insert data related to a single relation in CopyFrom. */
 typedef struct CopyMultiInsertBuffer
 {
-	TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */
-	ResultRelInfo *resultRelInfo;	/* ResultRelInfo for 'relid' */
-	BulkInsertState bistate;	/* BulkInsertState for this rel */
-	int			nused;			/* number of 'slots' containing tuples */
-	uint64		linenos[MAX_BUFFERED_TUPLES];	/* Line # of tuple in copy
-												 * stream */
+	TableInsertState *istate;
+	/* Line # of tuple in copy stream. */
+	uint64          linenos[MAX_BUFFERED_TUPLES];
 } CopyMultiInsertBuffer;
 
-/*
- * Stores one or many CopyMultiInsertBuffers and details about the size and
- * number of tuples which are stored in them.  This allows multiple buffers to
- * exist at once when COPYing into a partitioned table.
- */
-typedef struct CopyMultiInsertInfo
-{
-	List	   *multiInsertBuffers; /* List of tracked CopyMultiInsertBuffers */
-	int			bufferedTuples; /* number of tuples buffered over all buffers */
-	int			bufferedBytes;	/* number of bytes from all buffered tuples */
-	CopyFromState cstate;		/* Copy state for this CopyMultiInsertInfo */
-	EState	   *estate;			/* Executor state used for COPY */
-	CommandId	mycid;			/* Command Id used for COPY */
-	int			ti_options;		/* table insert options */
-} CopyMultiInsertInfo;
-
-
 /* non-export function prototypes */
 static char *limit_printout_length(const char *str);
 
@@ -207,108 +170,33 @@ limit_printout_length(const char *str)
 	return res;
 }
 
-/*
- * Allocate memory and initialize a new CopyMultiInsertBuffer for this
- * ResultRelInfo.
- */
-static CopyMultiInsertBuffer *
-CopyMultiInsertBufferInit(ResultRelInfo *rri)
-{
-	CopyMultiInsertBuffer *buffer;
-
-	buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer));
-	memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
-	buffer->resultRelInfo = rri;
-	buffer->bistate = GetBulkInsertState();
-	buffer->nused = 0;
-
-	return buffer;
-}
-
-/*
- * Make a new buffer for this ResultRelInfo.
- */
-static inline void
-CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo,
-							   ResultRelInfo *rri)
+static void
+InitCopyMultiInsertBufferInfo(List **mirri, ResultRelInfo *rri,
+							   CommandId mycid, int ti_options)
 {
 	CopyMultiInsertBuffer *buffer;
 
-	buffer = CopyMultiInsertBufferInit(rri);
-
+	buffer = (CopyMultiInsertBuffer *) palloc0(sizeof(CopyMultiInsertBuffer));
+	buffer->istate = table_insert_begin(rri->ri_RelationDesc, mycid,
+										ti_options, true, true);
 	/* Setup back-link so we can easily find this buffer again */
 	rri->ri_CopyMultiInsertBuffer = buffer;
-	/* Record that we're tracking this buffer */
-	miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
+	*mirri = lappend(*mirri, rri);
 }
 
 /*
- * Initialize an already allocated CopyMultiInsertInfo.
- *
- * If rri is a non-partitioned table then a CopyMultiInsertBuffer is set up
- * for that table.
+ * Run AFTER ROW INSERT triggers or insert into indexes, if any, after buffered
+ * tuples are flushed to table.
  */
 static void
-CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
-						CopyFromState cstate, EState *estate, CommandId mycid,
-						int ti_options)
+HandleAfterRowEvents(ResultRelInfo *rri, EState *estate, CopyFromState cstate,
+					 int32 cur_slots)
 {
-	miinfo->multiInsertBuffers = NIL;
-	miinfo->bufferedTuples = 0;
-	miinfo->bufferedBytes = 0;
-	miinfo->cstate = cstate;
-	miinfo->estate = estate;
-	miinfo->mycid = mycid;
-	miinfo->ti_options = ti_options;
-
-	/*
-	 * Only setup the buffer when not dealing with a partitioned table.
-	 * Buffers for partitioned tables will just be setup when we need to send
-	 * tuples their way for the first time.
-	 */
-	if (rri->ri_RelationDesc->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
-		CopyMultiInsertInfoSetupBuffer(miinfo, rri);
-}
-
-/*
- * Returns true if the buffers are full
- */
-static inline bool
-CopyMultiInsertInfoIsFull(CopyMultiInsertInfo *miinfo)
-{
-	if (miinfo->bufferedTuples >= MAX_BUFFERED_TUPLES ||
-		miinfo->bufferedBytes >= MAX_BUFFERED_BYTES)
-		return true;
-	return false;
-}
-
-/*
- * Returns true if we have no buffered tuples
- */
-static inline bool
-CopyMultiInsertInfoIsEmpty(CopyMultiInsertInfo *miinfo)
-{
-	return miinfo->bufferedTuples == 0;
-}
-
-/*
- * Write the tuples stored in 'buffer' out to the table.
- */
-static inline void
-CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
-						   CopyMultiInsertBuffer *buffer)
-{
-	MemoryContext oldcontext;
-	int			i;
-	uint64		save_cur_lineno;
-	CopyFromState cstate = miinfo->cstate;
-	EState	   *estate = miinfo->estate;
-	CommandId	mycid = miinfo->mycid;
-	int			ti_options = miinfo->ti_options;
-	bool		line_buf_valid = cstate->line_buf_valid;
-	int			nused = buffer->nused;
-	ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
-	TupleTableSlot **slots = buffer->slots;
+	int i;
+	CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
+	TableInsertState *istate = buffer->istate;
+	uint64  save_cur_lineno;
+	bool    line_buf_valid = cstate->line_buf_valid;
 
 	/*
 	 * Print error context information correctly, if one of the operations
@@ -317,36 +205,27 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
 	cstate->line_buf_valid = false;
 	save_cur_lineno = cstate->cur_lineno;
 
-	/*
-	 * table_multi_insert may leak memory, so switch to short-lived memory
-	 * context before calling it.
-	 */
-	oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
-	table_multi_insert(resultRelInfo->ri_RelationDesc,
-					   slots,
-					   nused,
-					   mycid,
-					   ti_options,
-					   buffer->bistate);
-	MemoryContextSwitchTo(oldcontext);
-
-	for (i = 0; i < nused; i++)
+	for (i = 0; i < cur_slots; i++)
 	{
 		/*
 		 * If there are any indexes, update them for all the inserted tuples,
 		 * and run AFTER ROW INSERT triggers.
 		 */
-		if (resultRelInfo->ri_NumIndices > 0)
+		if (rri->ri_NumIndices > 0)
 		{
 			List	   *recheckIndexes;
 
 			cstate->cur_lineno = buffer->linenos[i];
-			recheckIndexes =
-				ExecInsertIndexTuples(resultRelInfo,
-									  buffer->slots[i], estate, false, false,
-									  NULL, NIL);
-			ExecARInsertTriggers(estate, resultRelInfo,
-								 slots[i], recheckIndexes,
+
+			recheckIndexes = ExecInsertIndexTuples(rri,
+												   istate->mi_slots[i], estate,
+												   false,
+												   false,
+												   NULL,
+												   NULL);
+
+			ExecARInsertTriggers(estate, rri,
+								 istate->mi_slots[i], recheckIndexes,
 								 cstate->transition_capture);
 			list_free(recheckIndexes);
 		}
@@ -355,79 +234,69 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
 		 * There's no indexes, but see if we need to run AFTER ROW INSERT
 		 * triggers anyway.
 		 */
-		else if (resultRelInfo->ri_TrigDesc != NULL &&
-				 (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
-				  resultRelInfo->ri_TrigDesc->trig_insert_new_table))
+		else if (rri->ri_TrigDesc != NULL &&
+				 (rri->ri_TrigDesc->trig_insert_after_row ||
+				 rri->ri_TrigDesc->trig_insert_new_table))
 		{
 			cstate->cur_lineno = buffer->linenos[i];
-			ExecARInsertTriggers(estate, resultRelInfo,
-								 slots[i], NIL, cstate->transition_capture);
+			ExecARInsertTriggers(estate,
+								 rri,
+								 istate->mi_slots[i],
+								 NULL,
+								 cstate->transition_capture);
 		}
-
-		ExecClearTuple(slots[i]);
 	}
 
-	/* Mark that all slots are free */
-	buffer->nused = 0;
-
 	/* reset cur_lineno and line_buf_valid to what they were */
 	cstate->line_buf_valid = line_buf_valid;
 	cstate->cur_lineno = save_cur_lineno;
 }
 
 /*
- * Drop used slots and free member for this buffer.
- *
- * The buffer must be flushed before cleanup.
+ * Store tuple from the incoming slot into buffered slots.
  */
-static inline void
-CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo,
-							 CopyMultiInsertBuffer *buffer)
+static void
+CopyMultiInsertBufferTuple(ResultRelInfo *rri, TupleTableSlot *slot,
+						   CopyFromState cstate, EState *estate)
 {
-	int			i;
-
-	/* Ensure buffer was flushed */
-	Assert(buffer->nused == 0);
-
-	/* Remove back-link to ourself */
-	buffer->resultRelInfo->ri_CopyMultiInsertBuffer = NULL;
-
-	FreeBulkInsertState(buffer->bistate);
+	CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
+	TableInsertState *istate = buffer->istate;
 
-	/* Since we only create slots on demand, just drop the non-null ones. */
-	for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
-		ExecDropSingleTupleTableSlot(buffer->slots[i]);
+	table_multi_insert_v2(buffer->istate, slot);
 
-	table_finish_bulk_insert(buffer->resultRelInfo->ri_RelationDesc,
-							 miinfo->ti_options);
+	buffer->linenos[istate->mi_cur_slots - 1] = cstate->cur_lineno;
 
-	pfree(buffer);
+	if (istate->flushed)
+		HandleAfterRowEvents(rri, estate, cstate, istate->mi_cur_slots);
 }
 
 /*
- * Write out all stored tuples in all buffers out to the tables.
- *
- * Once flushed we also trim the tracked buffers list down to size by removing
- * the buffers created earliest first.
- *
- * Callers should pass 'curr_rri' as the ResultRelInfo that's currently being
- * used.  When cleaning up old buffers we'll never remove the one for
- * 'curr_rri'.
+ * Flush tuples into table from the buffered slots.
  */
-static inline void
-CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri)
+static void
+CopyMulitInsertFlushBuffers(List **mirri, ResultRelInfo *curr_rri,
+							CopyFromState cstate, EState *estate)
 {
 	ListCell   *lc;
+	ResultRelInfo *rri;
+	CopyMultiInsertBuffer *buffer;
+	TableInsertState *istate;
 
-	foreach(lc, miinfo->multiInsertBuffers)
+	foreach(lc, *mirri)
 	{
-		CopyMultiInsertBuffer *buffer = (CopyMultiInsertBuffer *) lfirst(lc);
+		rri = lfirst(lc);
+		buffer = rri->ri_CopyMultiInsertBuffer;
+		istate = buffer->istate;
+
+		table_multi_insert_flush(istate);
 
-		CopyMultiInsertBufferFlush(miinfo, buffer);
+		if (istate->flushed)
+			HandleAfterRowEvents(rri, estate, cstate, istate->mi_cur_slots);
 	}
 
-	miinfo->bufferedTuples = 0;
-	miinfo->bufferedBytes = 0;
+	rri = NULL;
+	buffer = NULL;
+	istate = NULL;
 
 	/*
 	 * Trim the list of tracked buffers down if it exceeds the limit.  Here we
@@ -435,87 +304,59 @@ CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri)
 	 * likely that these older ones will be needed than the ones that were
 	 * just created.
 	 */
-	while (list_length(miinfo->multiInsertBuffers) > MAX_PARTITION_BUFFERS)
+	while (list_length(*mirri) > MAX_PARTITION_BUFFERS)
 	{
-		CopyMultiInsertBuffer *buffer;
+		int ti_options;
 
-		buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
+		rri = (ResultRelInfo *) linitial(*mirri);
 
 		/*
 		 * We never want to remove the buffer that's currently being used, so
 		 * if we happen to find that then move it to the end of the list.
 		 */
-		if (buffer->resultRelInfo == curr_rri)
+		if (rri == curr_rri)
 		{
-			miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
-			miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
-			buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
+			*mirri = list_delete_first(*mirri);
+			*mirri = lappend(*mirri, rri);
+			rri = (ResultRelInfo *) linitial(*mirri);
 		}
 
-		CopyMultiInsertBufferCleanup(miinfo, buffer);
-		miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
-	}
-}
-
-/*
- * Cleanup allocated buffers and free memory
- */
-static inline void
-CopyMultiInsertInfoCleanup(CopyMultiInsertInfo *miinfo)
-{
-	ListCell   *lc;
+		buffer = rri->ri_CopyMultiInsertBuffer;
+		istate = buffer->istate;
+		ti_options = istate->options;
 
-	foreach(lc, miinfo->multiInsertBuffers)
-		CopyMultiInsertBufferCleanup(miinfo, lfirst(lc));
+		table_insert_end(istate);
+		table_finish_bulk_insert(rri->ri_RelationDesc, ti_options);
 
-	list_free(miinfo->multiInsertBuffers);
+		*mirri = list_delete_first(*mirri);
+	}
 }
 
 /*
- * Get the next TupleTableSlot that the next tuple should be stored in.
- *
- * Callers must ensure that the buffer is not full.
- *
- * Note: 'miinfo' is unused but has been included for consistency with the
- * other functions in this area.
+ * Drop the buffered slots.
  */
-static inline TupleTableSlot *
-CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo,
-								ResultRelInfo *rri)
+static void
+CopyMulitInsertDropBuffers(List *mirri)
 {
-	CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
-	int			nused = buffer->nused;
-
-	Assert(buffer != NULL);
-	Assert(nused < MAX_BUFFERED_TUPLES);
+	ListCell   *lc;
 
-	if (buffer->slots[nused] == NULL)
-		buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL);
-	return buffer->slots[nused];
-}
+	foreach(lc, mirri)
+	{
+		int ti_options;
+		ResultRelInfo *rri = lfirst(lc);
+		CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
+		TableInsertState *istate = buffer->istate;
 
-/*
- * Record the previously reserved TupleTableSlot that was reserved by
- * CopyMultiInsertInfoNextFreeSlot as being consumed.
- */
-static inline void
-CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
-						 TupleTableSlot *slot, int tuplen, uint64 lineno)
-{
-	CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
+		ti_options = istate->options;
 
-	Assert(buffer != NULL);
-	Assert(slot == buffer->slots[buffer->nused]);
+		table_insert_end(istate);
 
-	/* Store the line number so we can properly report any errors later */
-	buffer->linenos[buffer->nused] = lineno;
+		table_finish_bulk_insert(rri->ri_RelationDesc, ti_options);
 
-	/* Record this slot as being used */
-	buffer->nused++;
+		pfree(buffer);
+	}
 
-	/* Update how many tuples are stored and their size */
-	miinfo->bufferedTuples++;
-	miinfo->bufferedBytes += tuplen;
+	list_free(mirri);
 }
 
 /*
@@ -530,21 +371,21 @@ CopyFrom(CopyFromState cstate)
 	EState	   *estate = CreateExecutorState(); /* for ExecConstraints() */
 	ModifyTableState *mtstate;
 	ExprContext *econtext;
-	TupleTableSlot *singleslot = NULL;
+	TupleTableSlot *slot = NULL;
 	MemoryContext oldcontext = CurrentMemoryContext;
 
 	PartitionTupleRouting *proute = NULL;
 	ErrorContextCallback errcallback;
 	CommandId	mycid = GetCurrentCommandId(true);
 	int			ti_options = 0; /* start with default options for insert */
-	BulkInsertState bistate = NULL;
 	CopyInsertMethod insertMethod;
-	CopyMultiInsertInfo multiInsertInfo = {0};	/* pacify compiler */
 	int64		processed = 0;
 	int64		excluded = 0;
 	bool		has_before_insert_row_trig;
 	bool		has_instead_insert_row_trig;
 	bool		leafpart_use_multi_insert = false;
+	List       *multi_insert_rris = NULL;
+	TableInsertState *istate = NULL;
 
 	Assert(cstate->rel);
 	Assert(list_length(cstate->range_table) == 1);
@@ -729,7 +570,7 @@ CopyFrom(CopyFromState cstate)
 		 * For partitioned tables we can't support multi-inserts when there
 		 * are any statement level insert triggers. It might be possible to
 		 * allow partitioned tables with such triggers in the future, but for
-		 * now, CopyMultiInsertInfoFlush expects that any before row insert
+		 * now, CopyMulitInsertFlushBuffers expects that any before row insert
 		 * and statement level insert triggers are on the same relation.
 		 */
 		insertMethod = CIM_SINGLE;
@@ -777,22 +618,22 @@ CopyFrom(CopyFromState cstate)
 		else
 			insertMethod = CIM_MULTI;
 
-		CopyMultiInsertInfoInit(&multiInsertInfo, resultRelInfo, cstate,
-								estate, mycid, ti_options);
+		/*
+		* Only setup the buffer when not dealing with a partitioned table.
+		* Buffers for partitioned tables will just be setup when we need to
+		* send tuples their way for the first time.
+		*/
+		if (!proute)
+			InitCopyMultiInsertBufferInfo(&multi_insert_rris, resultRelInfo,
+										  mycid, ti_options);
 	}
 
 	/*
-	 * If not using batch mode (which allocates slots as needed) set up a
-	 * tuple slot too. When inserting into a partitioned table, we also need
-	 * one, even if we might batch insert, to read the tuple in the root
-	 * partition's form.
+	 * Set up a tuple slot to which the input data from copy stream is read
+	 * into and used for inserts into table.
 	 */
-	if (insertMethod == CIM_SINGLE || insertMethod == CIM_MULTI_CONDITIONAL)
-	{
-		singleslot = table_slot_create(resultRelInfo->ri_RelationDesc,
-									   &estate->es_tupleTable);
-		bistate = GetBulkInsertState();
-	}
+	slot = table_slot_create(resultRelInfo->ri_RelationDesc,
+							 &estate->es_tupleTable);
 
 	has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
 								  resultRelInfo->ri_TrigDesc->trig_insert_before_row);
@@ -830,19 +671,8 @@ CopyFrom(CopyFromState cstate)
 		ResetPerTupleExprContext(estate);
 
 		/* select slot to (initially) load row into */
-		if (insertMethod == CIM_SINGLE || proute)
-		{
-			myslot = singleslot;
-			Assert(myslot != NULL);
-		}
-		else
-		{
-			Assert(resultRelInfo == target_resultRelInfo);
-			Assert(insertMethod == CIM_MULTI);
-
-			myslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
-													 resultRelInfo);
-		}
+		myslot = slot;
+		Assert(myslot != NULL);
 
 		/*
 		 * Switch to per-tuple context before calling NextCopyFrom, which does
@@ -918,21 +748,22 @@ CopyFrom(CopyFromState cstate)
 				if (leafpart_use_multi_insert)
 				{
 					if (resultRelInfo->ri_CopyMultiInsertBuffer == NULL)
-						CopyMultiInsertInfoSetupBuffer(&multiInsertInfo,
-													   resultRelInfo);
+						InitCopyMultiInsertBufferInfo(&multi_insert_rris,
+													  resultRelInfo, mycid,
+													  ti_options);
 				}
-				else if (insertMethod == CIM_MULTI_CONDITIONAL &&
-						 !CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
+				else if (insertMethod == CIM_MULTI_CONDITIONAL)
 				{
 					/*
 					 * Flush pending inserts if this partition can't use
 					 * batching, so rows are visible to triggers etc.
 					 */
-					CopyMultiInsertInfoFlush(&multiInsertInfo, resultRelInfo);
+					CopyMulitInsertFlushBuffers(&multi_insert_rris,
+												resultRelInfo, cstate, estate);
 				}
 
-				if (bistate != NULL)
-					ReleaseBulkInsertStatePin(bistate);
+				if (istate && istate->bistate)
+					ReleaseBulkInsertStatePin(istate->bistate);
 				prevResultRelInfo = resultRelInfo;
 			}
 
@@ -974,8 +805,8 @@ CopyFrom(CopyFromState cstate)
 				/* no other path available for partitioned table */
 				Assert(insertMethod == CIM_MULTI_CONDITIONAL);
 
-				batchslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
-															resultRelInfo);
+				batchslot = table_slot_create(resultRelInfo->ri_RelationDesc,
+											  &estate->es_tupleTable);
 
 				if (map != NULL)
 					myslot = execute_attr_map_slot(map->attrMap, myslot,
@@ -1047,24 +878,9 @@ CopyFrom(CopyFromState cstate)
 				/* Store the slot in the multi-insert buffer, when enabled. */
 				if (insertMethod == CIM_MULTI || leafpart_use_multi_insert)
 				{
-					/*
-					 * The slot previously might point into the per-tuple
-					 * context. For batching it needs to be longer lived.
-					 */
-					ExecMaterializeSlot(myslot);
-
 					/* Add this tuple to the tuple buffer */
-					CopyMultiInsertInfoStore(&multiInsertInfo,
-											 resultRelInfo, myslot,
-											 cstate->line_buf.len,
-											 cstate->cur_lineno);
-
-					/*
-					 * If enough inserts have queued up, then flush all
-					 * buffers out to their tables.
-					 */
-					if (CopyMultiInsertInfoIsFull(&multiInsertInfo))
-						CopyMultiInsertInfoFlush(&multiInsertInfo, resultRelInfo);
+					CopyMultiInsertBufferTuple(resultRelInfo, myslot, cstate,
+											   estate);
 				}
 				else
 				{
@@ -1090,9 +906,19 @@ CopyFrom(CopyFromState cstate)
 					}
 					else
 					{
+						if (!istate)
+						{
+							istate = table_insert_begin(resultRelInfo->ri_RelationDesc,
+														mycid,
+														ti_options,
+														true,
+														false);
+						}
+
+						istate->rel = resultRelInfo->ri_RelationDesc;
+
 						/* OK, store the tuple and create index entries for it */
-						table_tuple_insert(resultRelInfo->ri_RelationDesc,
-										   myslot, mycid, ti_options, bistate);
+						table_tuple_insert_v2(istate, myslot);
 
 						if (resultRelInfo->ri_NumIndices > 0)
 							recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
@@ -1125,16 +951,14 @@ CopyFrom(CopyFromState cstate)
 
 	/* Flush any remaining buffered tuples */
 	if (insertMethod != CIM_SINGLE)
-	{
-		if (!CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
-			CopyMultiInsertInfoFlush(&multiInsertInfo, NULL);
-	}
+		CopyMulitInsertFlushBuffers(&multi_insert_rris, resultRelInfo,
+									cstate, estate);
 
 	/* Done, clean up */
 	error_context_stack = errcallback.previous;
 
-	if (bistate != NULL)
-		FreeBulkInsertState(bistate);
+	if (istate)
+		table_insert_end(istate);
 
 	MemoryContextSwitchTo(oldcontext);
 
@@ -1154,7 +978,7 @@ CopyFrom(CopyFromState cstate)
 
 	/* Tear down the multi-insert buffer data */
 	if (insertMethod != CIM_SINGLE)
-		CopyMultiInsertInfoCleanup(&multiInsertInfo);
+		CopyMulitInsertDropBuffers(multi_insert_rris);
 
 	/* Close all the partitioned tables, leaf partitions, and their indices */
 	if (proute)
-- 
2.25.1

Reply via email to