From 16d488cff14c0ce9ace648a1b99e507edb184e74 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <rupiredd@amazon.com>
Date: Wed, 30 Oct 2024 17:30:41 +0000
Subject: [PATCH v25 3/3] Use new multi-inserts table AM for COPY ... FROM

This commit uses the new multi-inserts table AM added by commit
<<CHANGE_ME>> for COPY ... FROM command.

Author: Bharath Rupireddy
Reviewed-by: Jeff Davis
Discussion: https://www.postgresql.org/message-id/CALj2ACVi9eTRYR%3Dgdca5wxtj3Kk_9q9qVccxsS1hngTGOCjPwQ%40mail.gmail.com
Discussion: https://www.postgresql.org/message-id/8633171cb034aafc260fdf37df04b6c779aa1e2f.camel%40j-davis.com
---
 src/backend/commands/copyfrom.c          | 254 +++++++++++++++--------
 src/include/commands/copyfrom_internal.h |   4 +-
 src/tools/pgindent/typedefs.list         |   1 +
 3 files changed, 171 insertions(+), 88 deletions(-)

diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index 07cbd5d22b..18fb609cbe 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -74,14 +74,27 @@
  */
 #define MAX_PARTITION_BUFFERS	32
 
+/* Context for multi-inserts buffer flush callback */
+typedef struct MultiInsertBufferFlushCtx
+{
+	CopyFromState cstate;
+	ResultRelInfo *resultRelInfo;
+	EState	   *estate;
+} MultiInsertBufferFlushCtx;
+
 /* Stores multi-insert data related to a single relation in CopyFrom. */
 typedef struct CopyMultiInsertBuffer
 {
-	TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */
+	TableModifyState *mstate;	/* Table insert state; NULL if foreign table */
+	TupleTableSlot **slots;		/* Array to store tuples */
 	ResultRelInfo *resultRelInfo;	/* ResultRelInfo for 'relid' */
-	BulkInsertState bistate;	/* BulkInsertState for this rel if plain
-								 * table; NULL if foreign table */
+	TupleTableSlot *mislot;		/* Slot used for multi-inserts */
+	MultiInsertBufferFlushCtx *mibufferctx; /* Multi-inserts buffer flush
+											 * callback context */
 	int			nused;			/* number of 'slots' containing tuples */
+	int			currslotno;		/* Current buffered slot number that's being
+								 * flushed; Used to get correct cur_lineno for
+								 * errors while in flush callback. */
 	uint64		linenos[MAX_BUFFERED_TUPLES];	/* Line # of tuple in copy
 												 * stream */
 } CopyMultiInsertBuffer;
@@ -216,19 +229,96 @@ CopyLimitPrintoutLength(const char *str)
 	return res;
 }
 
+/*
+ * Implements for multi-inserts buffer flush callback
+ * i.e. TableModifyEndCallback.
+ *
+ * NB: Caller must take care of opening and closing the indices.
+ */
+static void
+MultiInsertBufferFlushCb(void *context, TupleTableSlot *slot)
+{
+	MultiInsertBufferFlushCtx *mibufferctx = (MultiInsertBufferFlushCtx *) context;
+	CopyFromState cstate = mibufferctx->cstate;
+	ResultRelInfo *resultRelInfo = mibufferctx->resultRelInfo;
+	EState	   *estate = mibufferctx->estate;
+	CopyMultiInsertBuffer *buffer = resultRelInfo->ri_CopyMultiInsertBuffer;
+
+	/*
+	 * If there are any indexes, update them for all the inserted tuples, and
+	 * run AFTER ROW INSERT triggers.
+	 */
+	if (resultRelInfo->ri_NumIndices > 0)
+	{
+		List	   *recheckIndexes;
+
+		cstate->cur_lineno = buffer->linenos[buffer->currslotno++];
+		recheckIndexes =
+			ExecInsertIndexTuples(resultRelInfo,
+								  slot, estate, false,
+								  false, NULL, NIL, false);
+
+		ExecARInsertTriggers(estate, resultRelInfo,
+							 slot, recheckIndexes,
+							 cstate->transition_capture);
+
+		list_free(recheckIndexes);
+	}
+
+	/*
+	 * There's no indexes, but see if we need to run AFTER ROW INSERT triggers
+	 * anyway.
+	 */
+	else if (resultRelInfo->ri_TrigDesc != NULL &&
+			 (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
+			  resultRelInfo->ri_TrigDesc->trig_insert_new_table))
+	{
+		cstate->cur_lineno = buffer->linenos[buffer->currslotno++];
+
+		ExecARInsertTriggers(estate, resultRelInfo,
+							 slot, NIL,
+							 cstate->transition_capture);
+	}
+
+	Assert(buffer->currslotno <= buffer->nused);
+}
+
 /*
  * Allocate memory and initialize a new CopyMultiInsertBuffer for this
  * ResultRelInfo.
  */
 static CopyMultiInsertBuffer *
-CopyMultiInsertBufferInit(ResultRelInfo *rri)
+CopyMultiInsertBufferInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
+						  CopyFromState cstate, EState *estate)
 {
 	CopyMultiInsertBuffer *buffer;
 
 	buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer));
-	memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
+
+	if (rri->ri_FdwRoutine == NULL)
+	{
+		buffer->mibufferctx =
+			(MultiInsertBufferFlushCtx *) palloc(sizeof(MultiInsertBufferFlushCtx));
+		buffer->mibufferctx->cstate = cstate;
+		buffer->mibufferctx->resultRelInfo = rri;
+		buffer->mibufferctx->estate = estate;
+
+		buffer->mstate = table_modify_begin(rri->ri_RelationDesc,
+											miinfo->mycid,
+											miinfo->ti_options,
+											MultiInsertBufferFlushCb,
+											buffer->mibufferctx);
+
+		buffer->slots = NULL;
+	}
+	else
+	{
+		buffer->mstate = NULL;
+		buffer->slots = palloc0(sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
+	}
+
+	buffer->mislot = NULL;
 	buffer->resultRelInfo = rri;
-	buffer->bistate = (rri->ri_FdwRoutine == NULL) ? GetBulkInsertState() : NULL;
 	buffer->nused = 0;
 
 	return buffer;
@@ -239,11 +329,12 @@ CopyMultiInsertBufferInit(ResultRelInfo *rri)
  */
 static inline void
 CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo,
-							   ResultRelInfo *rri)
+							   ResultRelInfo *rri, CopyFromState cstate,
+							   EState *estate)
 {
 	CopyMultiInsertBuffer *buffer;
 
-	buffer = CopyMultiInsertBufferInit(rri);
+	buffer = CopyMultiInsertBufferInit(miinfo, rri, cstate, estate);
 
 	/* Setup back-link so we can easily find this buffer again */
 	rri->ri_CopyMultiInsertBuffer = buffer;
@@ -276,7 +367,7 @@ CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
 	 * tuples their way for the first time.
 	 */
 	if (rri->ri_RelationDesc->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
-		CopyMultiInsertInfoSetupBuffer(miinfo, rri);
+		CopyMultiInsertInfoSetupBuffer(miinfo, rri, cstate, estate);
 }
 
 /*
@@ -320,8 +411,6 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
 		int			batch_size = resultRelInfo->ri_BatchSize;
 		int			sent = 0;
 
-		Assert(buffer->bistate == NULL);
-
 		/* Ensure that the FDW supports batching and it's enabled */
 		Assert(resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert);
 		Assert(batch_size > 1);
@@ -393,13 +482,8 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
 	}
 	else
 	{
-		CommandId	mycid = miinfo->mycid;
-		int			ti_options = miinfo->ti_options;
 		bool		line_buf_valid = cstate->line_buf_valid;
 		uint64		save_cur_lineno = cstate->cur_lineno;
-		MemoryContext oldcontext;
-
-		Assert(buffer->bistate != NULL);
 
 		/*
 		 * Print error context information correctly, if one of the operations
@@ -407,56 +491,18 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
 		 */
 		cstate->line_buf_valid = false;
 
-		/*
-		 * table_multi_insert may leak memory, so switch to short-lived memory
-		 * context before calling it.
-		 */
-		oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
-		table_multi_insert(resultRelInfo->ri_RelationDesc,
-						   slots,
-						   nused,
-						   mycid,
-						   ti_options,
-						   buffer->bistate);
-		MemoryContextSwitchTo(oldcontext);
+		Assert(buffer->currslotno <= buffer->nused);
+		buffer->currslotno = 0;
 
-		for (i = 0; i < nused; i++)
-		{
-			/*
-			 * If there are any indexes, update them for all the inserted
-			 * tuples, and run AFTER ROW INSERT triggers.
-			 */
-			if (resultRelInfo->ri_NumIndices > 0)
-			{
-				List	   *recheckIndexes;
-
-				cstate->cur_lineno = buffer->linenos[i];
-				recheckIndexes =
-					ExecInsertIndexTuples(resultRelInfo,
-										  buffer->slots[i], estate, false,
-										  false, NULL, NIL, false);
-				ExecARInsertTriggers(estate, resultRelInfo,
-									 slots[i], recheckIndexes,
-									 cstate->transition_capture);
-				list_free(recheckIndexes);
-			}
+		table_modify_buffer_flush(buffer->mstate);
 
-			/*
-			 * There's no indexes, but see if we need to run AFTER ROW INSERT
-			 * triggers anyway.
-			 */
-			else if (resultRelInfo->ri_TrigDesc != NULL &&
-					 (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
-					  resultRelInfo->ri_TrigDesc->trig_insert_new_table))
-			{
-				cstate->cur_lineno = buffer->linenos[i];
-				ExecARInsertTriggers(estate, resultRelInfo,
-									 slots[i], NIL,
-									 cstate->transition_capture);
-			}
+		Assert(buffer->currslotno <= buffer->nused);
+		buffer->currslotno = 0;
 
-			ExecClearTuple(slots[i]);
-		}
+		/*
+		 * Indexes are updated and AFTER ROW INSERT triggers (if any) are run
+		 * in the flush callback CopyModifyBufferFlushCallback.
+		 */
 
 		/* Update the row counter and progress of the COPY command */
 		*processed += nused;
@@ -492,19 +538,18 @@ CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo,
 
 	if (resultRelInfo->ri_FdwRoutine == NULL)
 	{
-		Assert(buffer->bistate != NULL);
-		FreeBulkInsertState(buffer->bistate);
+		table_modify_end(buffer->mstate);
+		ExecDropSingleTupleTableSlot(buffer->mislot);
+		pfree(buffer->mibufferctx);
 	}
 	else
-		Assert(buffer->bistate == NULL);
-
-	/* Since we only create slots on demand, just drop the non-null ones. */
-	for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
-		ExecDropSingleTupleTableSlot(buffer->slots[i]);
+	{
+		/* Since we only create slots on demand, just drop the non-null ones. */
+		for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
+			ExecDropSingleTupleTableSlot(buffer->slots[i]);
 
-	if (resultRelInfo->ri_FdwRoutine == NULL)
-		table_finish_bulk_insert(resultRelInfo->ri_RelationDesc,
-								 miinfo->ti_options);
+		pfree(buffer->slots);
+	}
 
 	pfree(buffer);
 }
@@ -598,15 +643,36 @@ CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo,
 {
 	CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
 	int			nused;
+	TupleTableSlot *slot;
 
 	Assert(buffer != NULL);
 	Assert(buffer->nused < MAX_BUFFERED_TUPLES);
 
 	nused = buffer->nused;
 
-	if (buffer->slots[nused] == NULL)
-		buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL);
-	return buffer->slots[nused];
+	if (rri->ri_FdwRoutine == NULL)
+	{
+		if (buffer->mislot == NULL)
+		{
+			buffer->mislot = MakeTupleTableSlot(RelationGetDescr(rri->ri_RelationDesc),
+												&TTSOpsVirtual);
+		}
+
+		/* Caller must clear the slot */
+		slot = buffer->mislot;
+	}
+	else
+	{
+		if (buffer->slots[nused] == NULL)
+		{
+			slot = table_slot_create(rri->ri_RelationDesc, NULL);
+			buffer->slots[nused] = slot;
+		}
+		else
+			slot = buffer->slots[nused];
+	}
+
+	return slot;
 }
 
 /*
@@ -620,7 +686,11 @@ CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
 	CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
 
 	Assert(buffer != NULL);
-	Assert(slot == buffer->slots[buffer->nused]);
+
+#ifdef USE_ASSERT_CHECKING
+	if (rri->ri_FdwRoutine != NULL)
+		Assert(slot == buffer->slots[buffer->nused]);
+#endif
 
 	/* Store the line number so we can properly report any errors later */
 	buffer->linenos[buffer->nused] = lineno;
@@ -628,6 +698,22 @@ CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
 	/* Record this slot as being used */
 	buffer->nused++;
 
+	if (rri->ri_FdwRoutine == NULL)
+	{
+		Assert(slot == buffer->mislot);
+		buffer->currslotno = 0;
+
+		table_modify_buffer_insert(buffer->mstate, slot);
+	}
+	else
+	{
+		/*
+		 * The slot previously might point into the per-tuple context. For
+		 * batching it needs to be longer lived.
+		 */
+		ExecMaterializeSlot(slot);
+	}
+
 	/* Update how many tuples are stored and their size */
 	miinfo->bufferedTuples++;
 	miinfo->bufferedBytes += tuplen;
@@ -841,7 +927,7 @@ CopyFrom(CopyFromState cstate)
 	/*
 	 * It's generally more efficient to prepare a bunch of tuples for
 	 * insertion, and insert them in one
-	 * table_multi_insert()/ExecForeignBatchInsert() call, than call
+	 * table_modify_buffer_insert()/ExecForeignBatchInsert() call, than call
 	 * table_tuple_insert()/ExecForeignInsert() separately for every tuple.
 	 * However, there are a number of reasons why we might not be able to do
 	 * this.  These are explained below.
@@ -925,7 +1011,8 @@ CopyFrom(CopyFromState cstate)
 			insertMethod = CIM_MULTI;
 
 		CopyMultiInsertInfoInit(&multiInsertInfo, resultRelInfo, cstate,
-								estate, mycid, ti_options);
+								estate, mycid,
+								ti_options | TABLE_INSERT_BAS_BULKWRITE);
 	}
 
 	/*
@@ -1094,7 +1181,8 @@ CopyFrom(CopyFromState cstate)
 				{
 					if (resultRelInfo->ri_CopyMultiInsertBuffer == NULL)
 						CopyMultiInsertInfoSetupBuffer(&multiInsertInfo,
-													   resultRelInfo);
+													   resultRelInfo, cstate,
+													   estate);
 				}
 				else if (insertMethod == CIM_MULTI_CONDITIONAL &&
 						 !CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
@@ -1224,12 +1312,6 @@ CopyFrom(CopyFromState cstate)
 				/* Store the slot in the multi-insert buffer, when enabled. */
 				if (insertMethod == CIM_MULTI || leafpart_use_multi_insert)
 				{
-					/*
-					 * The slot previously might point into the per-tuple
-					 * context. For batching it needs to be longer lived.
-					 */
-					ExecMaterializeSlot(myslot);
-
 					/* Add this tuple to the tuple buffer */
 					CopyMultiInsertInfoStore(&multiInsertInfo,
 											 resultRelInfo, myslot,
diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h
index cad52fcc78..14addbc6f6 100644
--- a/src/include/commands/copyfrom_internal.h
+++ b/src/include/commands/copyfrom_internal.h
@@ -46,9 +46,9 @@ typedef enum EolType
 typedef enum CopyInsertMethod
 {
 	CIM_SINGLE,					/* use table_tuple_insert or ExecForeignInsert */
-	CIM_MULTI,					/* always use table_multi_insert or
+	CIM_MULTI,					/* always use table_modify_buffer_insert or
 								 * ExecForeignBatchInsert */
-	CIM_MULTI_CONDITIONAL,		/* use table_multi_insert or
+	CIM_MULTI_CONDITIONAL,		/* use table_modify_buffer_insert or
 								 * ExecForeignBatchInsert only if valid */
 } CopyInsertMethod;
 
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index e7ddf29c16..bf21e43ce1 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1664,6 +1664,7 @@ MonotonicFunction
 MorphOpaque
 MsgType
 MultiAssignRef
+MultiInsertBufferFlushCtx
 MultiSortSupport
 MultiSortSupportData
 MultiXactId
-- 
2.40.1

