On Thu, Apr 25, 2024 at 10:11 PM Jeff Davis <pg...@j-davis.com> wrote:
>
> On Wed, 2024-04-24 at 18:19 +0530, Bharath Rupireddy wrote:
> > I added a flush callback named TableModifyBufferFlushCallback; when
> > provided by callers invoked after tuples are flushed to disk from the
> > buffers but before the AM frees them up. Index insertions and AFTER
> > ROW INSERT triggers can be executed in this callback. See the v19-
> > 0001 patch for how AM invokes the flush callback, and see either v19-
> > 0003 or v19-0004 or v19-0005 for how a caller can supply the callback
> > and required context to execute index insertions and AR triggers.
>
> The flush callback takes a pointer to an array of slot pointers, and I
> don't think that's the right API. I think the callback should be called
> on each slot individually.
>
> We shouldn't assume that a table AM stores buffered inserts as an array
> of slot pointers. A TupleTableSlot has a fair amount of memory overhead
> (64 bytes), so most AMs wouldn't want to pay that overhead for every
> tuple. COPY does, but that's because the number of buffered tuples is
> fairly small.

I get your point. An AM can choose to implement the buffering strategy
by just storing the plain tuple rather than the tuple slots in which
case the flush callback with an array of tuple slots won't work.
Therefore, I now changed the flush callback to accept only a single
tuple slot.

> > > 11. Deprecate the multi_insert API.
> >
> > I did remove both table_multi_insert and table_finish_bulk_insert in
> > v19-0006.
>
> That's OK with me. Let's leave those functions out for now.

Okay. Dropped the 0006 patch for now.

Please see the attached v20 patch set.

--
Bharath Rupireddy
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com
From 06cd6e242cd2fa3514aa1c76596bc4b4ad330040 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Sat, 27 Apr 2024 15:40:27 +0000
Subject: [PATCH v20 1/5] Introduce new Table Access Methods for single and
 multi inserts

---
 src/backend/access/heap/heapam.c         | 205 ++++++++++++++++++++++-
 src/backend/access/heap/heapam_handler.c |   6 +
 src/backend/access/table/tableam.c       |  97 +++++++++++
 src/backend/access/table/tableamapi.c    |  10 ++
 src/include/access/heapam.h              |  44 +++++
 src/include/access/tableam.h             | 145 ++++++++++++++++
 src/tools/pgindent/typedefs.list         |   3 +
 7 files changed, 509 insertions(+), 1 deletion(-)

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 4a4cf76269..fdc50c42df 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -64,6 +64,7 @@
 #include "storage/standby.h"
 #include "utils/datum.h"
 #include "utils/inval.h"
+#include "utils/memutils.h"
 #include "utils/relcache.h"
 #include "utils/snapmgr.h"
 #include "utils/spccache.h"
@@ -112,7 +113,7 @@ static int	bottomup_sort_and_shrink(TM_IndexDeleteOp *delstate);
 static XLogRecPtr log_heap_new_cid(Relation relation, HeapTuple tup);
 static HeapTuple ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_required,
 										bool *copy);
-
+static void heap_modify_insert_end_callback(TableModifyState *state);
 
 /*
  * Each tuple lock mode has a corresponding heavyweight lock, and one or two
@@ -2608,6 +2609,208 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples,
 	pgstat_count_heap_insert(relation, ntuples);
 }
 
+/*
+ * Initialize heap modify state.
+ */
+TableModifyState *
+heap_modify_begin(Relation rel, int modify_flags,
+				  CommandId cid, int options,
+				  TableModifyBufferFlushCallback modify_buffer_flush_callback,
+				  void *modify_buffer_flush_context)
+{
+	TableModifyState *state;
+	MemoryContext context;
+	MemoryContext oldcontext;
+
+	context = AllocSetContextCreate(TopTransactionContext,
+									"heap_modify memory context",
+									ALLOCSET_DEFAULT_SIZES);
+
+	oldcontext = MemoryContextSwitchTo(context);
+	state = palloc0(sizeof(TableModifyState));
+	state->rel = rel;
+	state->modify_flags = modify_flags;
+	state->mem_cxt = context;
+	state->cid = cid;
+	state->options = options;
+	state->modify_buffer_flush_callback = modify_buffer_flush_callback;
+	state->modify_buffer_flush_context = modify_buffer_flush_context;
+	state->modify_end_callback = NULL;	/* To be installed lazily */
+	MemoryContextSwitchTo(oldcontext);
+
+	return state;
+}
+
+/*
+ * Store passed-in tuple into in-memory buffered slots. When full, insert
+ * multiple tuples from the buffers into heap.
+ */
+void
+heap_modify_buffer_insert(TableModifyState *state,
+						  TupleTableSlot *slot)
+{
+	TupleTableSlot *dstslot;
+	HeapInsertState *istate;
+	HeapMultiInsertState *mistate;
+	MemoryContext oldcontext;
+
+	oldcontext = MemoryContextSwitchTo(state->mem_cxt);
+
+	/* First time through, initialize heap insert state */
+	if (state->data == NULL)
+	{
+		istate = (HeapInsertState *) palloc0(sizeof(HeapInsertState));
+		istate->bistate = NULL;
+		istate->mistate = NULL;
+		state->data = istate;
+
+		if ((state->modify_flags & TM_FLAG_MULTI_INSERTS) != 0)
+		{
+			mistate = (HeapMultiInsertState *) palloc0(sizeof(HeapMultiInsertState));
+			mistate->slots = (TupleTableSlot **) palloc0(sizeof(TupleTableSlot *) * HEAP_MAX_BUFFERED_SLOTS);
+			istate->mistate = mistate;
+			mistate->mem_cxt = AllocSetContextCreate(CurrentMemoryContext,
+													 "heap_multi_insert memory context",
+													 ALLOCSET_DEFAULT_SIZES);
+		}
+
+		if ((state->modify_flags & TM_FLAG_BAS_BULKWRITE) != 0)
+			istate->bistate = GetBulkInsertState();
+
+		state->modify_end_callback = heap_modify_insert_end_callback;
+	}
+
+	istate = (HeapInsertState *) state->data;
+	Assert(istate->mistate != NULL);
+	mistate = istate->mistate;
+
+	dstslot = mistate->slots[mistate->cur_slots];
+	if (dstslot == NULL)
+	{
+		/*
+		 * We use virtual tuple slots buffered slots for leveraging the
+		 * optimization it provides to minimize physical data copying. The
+		 * virtual slot gets materialized when we copy (via below
+		 * ExecCopySlot) the tuples from the source slot which can be of any
+		 * type. This way, it is ensured that the tuple storage doesn't depend
+		 * on external memory, because all the datums that aren't passed by
+		 * value are copied into the slot's memory context.
+		 */
+		dstslot = MakeTupleTableSlot(RelationGetDescr(state->rel),
+									 &TTSOpsVirtual);
+		mistate->slots[mistate->cur_slots] = dstslot;
+	}
+
+	ExecClearTuple(dstslot);
+	ExecCopySlot(dstslot, slot);
+
+	mistate->cur_slots++;
+
+	/*
+	 * Memory allocated for the whole tuple is in slot's memory context, so
+	 * use it keep track of the total space occupied by all buffered tuples.
+	 */
+	if (TTS_SHOULDFREE(dstslot))
+		mistate->cur_size += MemoryContextMemAllocated(dstslot->tts_mcxt, false);
+
+	if (mistate->cur_slots >= HEAP_MAX_BUFFERED_SLOTS ||
+		mistate->cur_size >= HEAP_MAX_BUFFERED_BYTES)
+		heap_modify_buffer_flush(state);
+
+	MemoryContextSwitchTo(oldcontext);
+}
+
+/*
+ * Insert multiple tuples from in-memory buffered slots into heap.
+ */
+void
+heap_modify_buffer_flush(TableModifyState *state)
+{
+	HeapInsertState *istate;
+	HeapMultiInsertState *mistate;
+	MemoryContext oldcontext;
+
+	/* Quick exit if we haven't inserted anything yet */
+	if (state->data == NULL)
+		return;
+
+	istate = (HeapInsertState *) state->data;
+	Assert(istate->mistate != NULL);
+	mistate = istate->mistate;
+
+	if (mistate->cur_slots == 0)
+		return;
+
+	/*
+	 * heap_multi_insert may leak memory, so switch to short-lived memory
+	 * context before calling it.
+	 */
+	oldcontext = MemoryContextSwitchTo(mistate->mem_cxt);
+
+	heap_multi_insert(state->rel, mistate->slots, mistate->cur_slots,
+					  state->cid, state->options, istate->bistate);
+
+	MemoryContextSwitchTo(oldcontext);
+	MemoryContextReset(mistate->mem_cxt);
+
+	if (state->modify_buffer_flush_callback != NULL)
+	{
+		for (int i = 0; i < mistate->cur_slots; i++)
+			state->modify_buffer_flush_callback(state->modify_buffer_flush_context,
+												mistate->slots[i]);
+	}
+
+	mistate->cur_slots = 0;
+	mistate->cur_size = 0;
+}
+
+/*
+ * Heap insert specific callback used for performing work at the end like
+ * flushing buffered tuples if any, cleaning up the insert state and buffered
+ * slots.
+ */
+static void
+heap_modify_insert_end_callback(TableModifyState *state)
+{
+	HeapInsertState *istate;
+
+	/* Quick exit if we haven't inserted anything yet */
+	if (state->data == NULL)
+		return;
+
+	istate = (HeapInsertState *) state->data;
+
+	if (istate->mistate != NULL)
+	{
+		HeapMultiInsertState *mistate = istate->mistate;
+
+		heap_modify_buffer_flush(state);
+
+		Assert(mistate->cur_slots == 0 &&
+			   mistate->cur_size == 0);
+
+		for (int i = 0; i < HEAP_MAX_BUFFERED_SLOTS && mistate->slots[i] != NULL; i++)
+			ExecDropSingleTupleTableSlot(mistate->slots[i]);
+
+		MemoryContextDelete(mistate->mem_cxt);
+	}
+
+	if (istate->bistate != NULL)
+		FreeBulkInsertState(istate->bistate);
+}
+
+/*
+ * Clean heap modify state.
+ */
+void
+heap_modify_end(TableModifyState *state)
+{
+	if (state->modify_end_callback != NULL)
+		state->modify_end_callback(state);
+
+	MemoryContextDelete(state->mem_cxt);
+}
+
 /*
  *	simple_heap_insert - insert a tuple
  *
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index 6f8b1b7929..eda0c73a16 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -2615,6 +2615,12 @@ static const TableAmRoutine heapam_methods = {
 	.tuple_insert_speculative = heapam_tuple_insert_speculative,
 	.tuple_complete_speculative = heapam_tuple_complete_speculative,
 	.multi_insert = heap_multi_insert,
+
+	.tuple_modify_begin = heap_modify_begin,
+	.tuple_modify_buffer_insert = heap_modify_buffer_insert,
+	.tuple_modify_buffer_flush = heap_modify_buffer_flush,
+	.tuple_modify_end = heap_modify_end,
+
 	.tuple_delete = heapam_tuple_delete,
 	.tuple_update = heapam_tuple_update,
 	.tuple_lock = heapam_tuple_lock,
diff --git a/src/backend/access/table/tableam.c b/src/backend/access/table/tableam.c
index e57a0b7ea3..35a3e43c59 100644
--- a/src/backend/access/table/tableam.c
+++ b/src/backend/access/table/tableam.c
@@ -21,6 +21,7 @@
 
 #include <math.h>
 
+#include "access/heapam.h"		/* just for BulkInsertState */
 #include "access/syncscan.h"
 #include "access/tableam.h"
 #include "access/xact.h"
@@ -29,6 +30,7 @@
 #include "storage/bufmgr.h"
 #include "storage/shmem.h"
 #include "storage/smgr.h"
+#include "utils/memutils.h"
 
 /*
  * Constants to control the behavior of block allocation to parallel workers
@@ -48,6 +50,7 @@
 char	   *default_table_access_method = DEFAULT_TABLE_ACCESS_METHOD;
 bool		synchronize_seqscans = true;
 
+static void default_table_modify_insert_end_callback(TableModifyState *state);
 
 /* ----------------------------------------------------------------------------
  * Slot functions.
@@ -756,3 +759,97 @@ table_block_relation_estimate_size(Relation rel, int32 *attr_widths,
 	else
 		*allvisfrac = (double) relallvisible / curpages;
 }
+
+/*
+ * Initialize default table modify state.
+ */
+TableModifyState *
+default_table_modify_begin(Relation rel, int modify_flags,
+						   CommandId cid, int options,
+						   TableModifyBufferFlushCallback modify_buffer_flush_callback,
+						   void *modify_buffer_flush_context)
+{
+	TableModifyState *state;
+	MemoryContext context;
+	MemoryContext oldcontext;
+
+	context = AllocSetContextCreate(CurrentMemoryContext,
+									"default_table_modify memory context",
+									ALLOCSET_DEFAULT_SIZES);
+
+	oldcontext = MemoryContextSwitchTo(context);
+	state = palloc0(sizeof(TableModifyState));
+	state->rel = rel;
+	state->modify_flags = modify_flags;
+	state->mem_cxt = context;
+	state->cid = cid;
+	state->options = options;
+	state->modify_buffer_flush_callback = modify_buffer_flush_callback;
+	state->modify_buffer_flush_context = modify_buffer_flush_context;
+	state->modify_end_callback = NULL;	/* To be installed lazily */
+	MemoryContextSwitchTo(oldcontext);
+
+	return state;
+}
+
+/*
+ * Default table modify implementation for inserts.
+ */
+void
+default_table_modify_buffer_insert(TableModifyState *state,
+								   TupleTableSlot *slot)
+{
+	MemoryContext oldcontext;
+
+	oldcontext = MemoryContextSwitchTo(state->mem_cxt);
+
+	/* First time through, initialize default table modify state */
+	if (state->data == NULL)
+	{
+		if ((state->modify_flags & TM_FLAG_BAS_BULKWRITE) != 0)
+			state->data = (BulkInsertState) GetBulkInsertState();
+
+		state->modify_end_callback = default_table_modify_insert_end_callback;
+	}
+
+	/* Fallback to table AM single insert routine */
+	table_tuple_insert(state->rel,
+					   slot,
+					   state->cid,
+					   state->options,
+					   (BulkInsertState) state->data);
+
+	MemoryContextSwitchTo(oldcontext);
+}
+
+/*
+ * Default table modify implementation for flush.
+ */
+void
+default_table_modify_buffer_flush(TableModifyState *state)
+{
+	/* no-op */
+}
+
+/*
+ * Default table modify insert specific callback used for performing work at
+ * the end like cleaning up the bulk insert state.
+ */
+static void
+default_table_modify_insert_end_callback(TableModifyState *state)
+{
+	if (state->data != NULL)
+		FreeBulkInsertState((BulkInsertState) state->data);
+}
+
+/*
+ * Clean default table modify state.
+ */
+void
+default_table_modify_end(TableModifyState *state)
+{
+	if (state->modify_end_callback != NULL)
+		state->modify_end_callback(state);
+
+	MemoryContextDelete(state->mem_cxt);
+}
diff --git a/src/backend/access/table/tableamapi.c b/src/backend/access/table/tableamapi.c
index ce637a5a5d..96ac951af6 100644
--- a/src/backend/access/table/tableamapi.c
+++ b/src/backend/access/table/tableamapi.c
@@ -97,6 +97,16 @@ GetTableAmRoutine(Oid amhandler)
 	Assert(routine->scan_sample_next_block != NULL);
 	Assert(routine->scan_sample_next_tuple != NULL);
 
+	/* optional, but either all of them are defined or none. */
+	Assert((routine->tuple_modify_begin == NULL &&
+			routine->tuple_modify_buffer_insert == NULL &&
+			routine->tuple_modify_buffer_flush == NULL &&
+			routine->tuple_modify_end == NULL) ||
+		   (routine->tuple_modify_begin != NULL &&
+			routine->tuple_modify_buffer_insert != NULL &&
+			routine->tuple_modify_buffer_flush != NULL &&
+			routine->tuple_modify_end != NULL));
+
 	return routine;
 }
 
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index c47a5045ce..c10ebbb5ea 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -271,6 +271,38 @@ typedef enum
 	PRUNE_VACUUM_CLEANUP,		/* VACUUM 2nd heap pass */
 } PruneReason;
 
+/*
+ * Maximum number of slots that multi-insert buffers can hold.
+ *
+ * Caution: Don't make this too big, as we could end up with this many tuples
+ * stored in multi insert buffer.
+ */
+#define HEAP_MAX_BUFFERED_SLOTS		1000
+
+/* Maximum size of all tuples that multi-insert buffers can hold */
+#define HEAP_MAX_BUFFERED_BYTES		65535
+
+typedef struct HeapMultiInsertState
+{
+	/* Array of buffered slots */
+	TupleTableSlot **slots;
+
+	/* Number of buffered slots currently held */
+	int			cur_slots;
+
+	/* Approximate size of all tuples currently held in buffered slots */
+	Size		cur_size;
+
+	MemoryContext mem_cxt;
+} HeapMultiInsertState;
+
+typedef struct HeapInsertState
+{
+	struct BulkInsertStateData *bistate;
+	HeapMultiInsertState *mistate;
+} HeapInsertState;
+
+
 /* ----------------
  *		function prototypes for heap access method
  *
@@ -321,6 +353,18 @@ extern void heap_insert(Relation relation, HeapTuple tup, CommandId cid,
 extern void heap_multi_insert(Relation relation, struct TupleTableSlot **slots,
 							  int ntuples, CommandId cid, int options,
 							  BulkInsertState bistate);
+
+extern TableModifyState *heap_modify_begin(Relation rel,
+										   int modify_flags,
+										   CommandId cid,
+										   int options,
+										   TableModifyBufferFlushCallback modify_buffer_flush_callback,
+										   void *modify_buffer_flush_context);
+extern void heap_modify_buffer_insert(TableModifyState *state,
+									  TupleTableSlot *slot);
+extern void heap_modify_buffer_flush(TableModifyState *state);
+extern void heap_modify_end(TableModifyState *state);
+
 extern TM_Result heap_delete(Relation relation, ItemPointer tid,
 							 CommandId cid, Snapshot crosscheck, bool wait,
 							 struct TM_FailureData *tmfd, bool changingPart);
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index 8e583b45cd..2e96154d6e 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -255,6 +255,42 @@ typedef struct TM_IndexDeleteOp
 	TM_IndexStatus *status;
 } TM_IndexDeleteOp;
 
+/* Table modify flags */
+
+/* Use multi inserts, i.e. buffer multiple tuples and insert them at once */
+#define TM_FLAG_MULTI_INSERTS	0x000001
+
+/* Use BAS_BULKWRITE buffer access strategy */
+#define TM_FLAG_BAS_BULKWRITE	0x000002
+
+struct TableModifyState;
+
+/* Callback invoked for each tuple that gets flushed to disk from buffer */
+typedef void (*TableModifyBufferFlushCallback) (void *context,
+												TupleTableSlot *slot);
+
+/* Table AM specific callback that gets called in table_modify_end() */
+typedef void (*TableModifyEndCallback) (struct TableModifyState *state);
+
+/* Holds table modify state */
+typedef struct TableModifyState
+{
+	Relation	rel;
+	int			modify_flags;
+	MemoryContext mem_cxt;
+	CommandId	cid;
+	int			options;
+
+	/* Flush callback and its context */
+	TableModifyBufferFlushCallback modify_buffer_flush_callback;
+	void	   *modify_buffer_flush_context;
+
+	/* Table AM specific data */
+	void	   *data;
+
+	TableModifyEndCallback modify_end_callback;
+} TableModifyState;
+
 /* "options" flag bits for table_tuple_insert */
 /* TABLE_INSERT_SKIP_WAL was 0x0001; RelationNeedsWAL() now governs */
 #define TABLE_INSERT_SKIP_FSM		0x0002
@@ -578,6 +614,21 @@ typedef struct TableAmRoutine
 	void		(*finish_bulk_insert) (Relation rel, int options);
 
 
+	/* ------------------------------------------------------------------------
+	 * Table Modify related functions.
+	 * ------------------------------------------------------------------------
+	 */
+	TableModifyState *(*tuple_modify_begin) (Relation rel,
+											 int modify_flags,
+											 CommandId cid,
+											 int options,
+											 TableModifyBufferFlushCallback modify_buffer_flush_callback,
+											 void *modify_buffer_flush_context);
+	void		(*tuple_modify_buffer_insert) (TableModifyState *state,
+											   TupleTableSlot *slot);
+	void		(*tuple_modify_buffer_flush) (TableModifyState *state);
+	void		(*tuple_modify_end) (TableModifyState *state);
+
 	/* ------------------------------------------------------------------------
 	 * DDL related functionality.
 	 * ------------------------------------------------------------------------
@@ -1609,6 +1660,100 @@ table_finish_bulk_insert(Relation rel, int options)
 		rel->rd_tableam->finish_bulk_insert(rel, options);
 }
 
+/* ------------------------------------------------------------------------
+ * Table Modify related functions.
+ * ------------------------------------------------------------------------
+ */
+extern TableModifyState *default_table_modify_begin(Relation rel, int modify_flags,
+													CommandId cid, int options,
+													TableModifyBufferFlushCallback modify_buffer_flush_callback,
+													void *modify_buffer_flush_context);
+extern void default_table_modify_buffer_insert(TableModifyState *state,
+											   TupleTableSlot *slot);
+extern void default_table_modify_buffer_flush(TableModifyState *state);
+extern void default_table_modify_end(TableModifyState *state);
+
+static inline TableModifyState *
+table_modify_begin(Relation rel, int modify_flags, CommandId cid, int options,
+				   TableModifyBufferFlushCallback modify_buffer_flush_callback,
+				   void *modify_buffer_flush_context)
+{
+	if (rel->rd_tableam &&
+		rel->rd_tableam->tuple_modify_begin != NULL)
+	{
+		return rel->rd_tableam->tuple_modify_begin(rel, modify_flags,
+												   cid, options,
+												   modify_buffer_flush_callback,
+												   modify_buffer_flush_context);
+	}
+	else if (rel->rd_tableam &&
+			 rel->rd_tableam->tuple_modify_begin == NULL)
+	{
+		/* Fallback to a default implementation */
+		return default_table_modify_begin(rel, modify_flags,
+										  cid, options,
+										  modify_buffer_flush_callback,
+										  modify_buffer_flush_context);
+	}
+	else
+		Assert(false);
+
+	return NULL;
+}
+
+static inline void
+table_modify_buffer_insert(TableModifyState *state, TupleTableSlot *slot)
+{
+	if (state->rel->rd_tableam &&
+		state->rel->rd_tableam->tuple_modify_buffer_insert != NULL)
+	{
+		state->rel->rd_tableam->tuple_modify_buffer_insert(state, slot);
+	}
+	else if (state->rel->rd_tableam &&
+			 state->rel->rd_tableam->tuple_modify_buffer_insert == NULL)
+	{
+		/* Fallback to a default implementation */
+		default_table_modify_buffer_insert(state, slot);
+	}
+	else
+		Assert(false);
+}
+
+static inline void
+table_modify_buffer_flush(TableModifyState *state)
+{
+	if (state->rel->rd_tableam &&
+		state->rel->rd_tableam->tuple_modify_buffer_flush != NULL)
+	{
+		state->rel->rd_tableam->tuple_modify_buffer_flush(state);
+	}
+	else if (state->rel->rd_tableam &&
+			 state->rel->rd_tableam->tuple_modify_buffer_flush == NULL)
+	{
+		/* Fallback to a default implementation */
+		default_table_modify_buffer_flush(state);
+	}
+	else
+		Assert(false);
+}
+
+static inline void
+table_modify_end(TableModifyState *state)
+{
+	if (state->rel->rd_tableam &&
+		state->rel->rd_tableam->tuple_modify_end != NULL)
+	{
+		state->rel->rd_tableam->tuple_modify_end(state);
+	}
+	else if (state->rel->rd_tableam &&
+			 state->rel->rd_tableam->tuple_modify_end == NULL)
+	{
+		/* Fallback to a default implementation */
+		default_table_modify_end(state);
+	}
+	else
+		Assert(false);
+}
 
 /* ------------------------------------------------------------------------
  * DDL related functionality.
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index e10ff28ee5..11744f2ccc 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1130,6 +1130,8 @@ HeadlineJsonState
 HeadlineParsedText
 HeadlineWordEntry
 HeapCheckContext
+HeapInsertState
+HeapMultiInsertState
 HeapPageFreeze
 HeapScanDesc
 HeapTuple
@@ -2843,6 +2845,7 @@ TableFuncScanState
 TableFuncType
 TableInfo
 TableLikeClause
+TableModifyState
 TableSampleClause
 TableScanDesc
 TableScanDescData
-- 
2.34.1

From dc9b8b01d586b64d18628424fff9ed14b20b92d7 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Sat, 27 Apr 2024 15:41:12 +0000
Subject: [PATCH v20 2/5] Optimize CTAS, CMV, RMV and TABLE REWRITES with multi
 inserts

---
 src/backend/commands/createas.c  | 27 +++++++++++----------------
 src/backend/commands/matview.c   | 26 +++++++++++---------------
 src/backend/commands/tablecmds.c | 31 +++++++++++--------------------
 3 files changed, 33 insertions(+), 51 deletions(-)

diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c
index 62050f4dc5..2d6fffbf07 100644
--- a/src/backend/commands/createas.c
+++ b/src/backend/commands/createas.c
@@ -53,9 +53,7 @@ typedef struct
 	/* These fields are filled by intorel_startup: */
 	Relation	rel;			/* relation to write to */
 	ObjectAddress reladdr;		/* address of rel, for ExecCreateTableAs */
-	CommandId	output_cid;		/* cmin to insert in output tuples */
-	int			ti_options;		/* table_tuple_insert performance options */
-	BulkInsertState bistate;	/* bulk insert state */
+	TableModifyState *mstate;	/* table insert state */
 } DR_intorel;
 
 /* utility functions for CTAS definition creation */
@@ -552,17 +550,21 @@ intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
 	 */
 	myState->rel = intoRelationDesc;
 	myState->reladdr = intoRelationAddr;
-	myState->output_cid = GetCurrentCommandId(true);
-	myState->ti_options = TABLE_INSERT_SKIP_FSM;
 
 	/*
 	 * If WITH NO DATA is specified, there is no need to set up the state for
 	 * bulk inserts as there are no tuples to insert.
 	 */
 	if (!into->skipData)
-		myState->bistate = GetBulkInsertState();
+		myState->mstate = table_modify_begin(intoRelationDesc,
+											 TM_FLAG_MULTI_INSERTS |
+											 TM_FLAG_BAS_BULKWRITE,
+											 GetCurrentCommandId(true),
+											 TABLE_INSERT_SKIP_FSM,
+											 NULL,
+											 NULL);
 	else
-		myState->bistate = NULL;
+		myState->mstate = NULL;
 
 	/*
 	 * Valid smgr_targblock implies something already wrote to the relation.
@@ -590,11 +592,7 @@ intorel_receive(TupleTableSlot *slot, DestReceiver *self)
 		 * would not be cheap either. This also doesn't allow accessing per-AM
 		 * data (say a tuple's xmin), but since we don't do that here...
 		 */
-		table_tuple_insert(myState->rel,
-						   slot,
-						   myState->output_cid,
-						   myState->ti_options,
-						   myState->bistate);
+		table_modify_buffer_insert(myState->mstate, slot);
 	}
 
 	/* We know this is a newly created relation, so there are no indexes */
@@ -612,10 +610,7 @@ intorel_shutdown(DestReceiver *self)
 	IntoClause *into = myState->into;
 
 	if (!into->skipData)
-	{
-		FreeBulkInsertState(myState->bistate);
-		table_finish_bulk_insert(myState->rel, myState->ti_options);
-	}
+		table_modify_end(myState->mstate);
 
 	/* close rel, but keep lock until commit */
 	table_close(myState->rel, NoLock);
diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c
index 6d09b75556..bb97e2fa5f 100644
--- a/src/backend/commands/matview.c
+++ b/src/backend/commands/matview.c
@@ -48,9 +48,7 @@ typedef struct
 	Oid			transientoid;	/* OID of new heap into which to store */
 	/* These fields are filled by transientrel_startup: */
 	Relation	transientrel;	/* relation to write to */
-	CommandId	output_cid;		/* cmin to insert in output tuples */
-	int			ti_options;		/* table_tuple_insert performance options */
-	BulkInsertState bistate;	/* bulk insert state */
+	TableModifyState *mstate;	/* table insert state */
 } DR_transientrel;
 
 static int	matview_maintenance_depth = 0;
@@ -458,9 +456,14 @@ transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
 	 * Fill private fields of myState for use by later routines
 	 */
 	myState->transientrel = transientrel;
-	myState->output_cid = GetCurrentCommandId(true);
-	myState->ti_options = TABLE_INSERT_SKIP_FSM | TABLE_INSERT_FROZEN;
-	myState->bistate = GetBulkInsertState();
+	myState->mstate = table_modify_begin(transientrel,
+										 TM_FLAG_MULTI_INSERTS |
+										 TM_FLAG_BAS_BULKWRITE,
+										 GetCurrentCommandId(true),
+										 TABLE_INSERT_SKIP_FSM |
+										 TABLE_INSERT_FROZEN,
+										 NULL,
+										 NULL);
 
 	/*
 	 * Valid smgr_targblock implies something already wrote to the relation.
@@ -485,12 +488,7 @@ transientrel_receive(TupleTableSlot *slot, DestReceiver *self)
 	 * cheap either. This also doesn't allow accessing per-AM data (say a
 	 * tuple's xmin), but since we don't do that here...
 	 */
-
-	table_tuple_insert(myState->transientrel,
-					   slot,
-					   myState->output_cid,
-					   myState->ti_options,
-					   myState->bistate);
+	table_modify_buffer_insert(myState->mstate, slot);
 
 	/* We know this is a newly created relation, so there are no indexes */
 
@@ -505,9 +503,7 @@ transientrel_shutdown(DestReceiver *self)
 {
 	DR_transientrel *myState = (DR_transientrel *) self;
 
-	FreeBulkInsertState(myState->bistate);
-
-	table_finish_bulk_insert(myState->transientrel, myState->ti_options);
+	table_modify_end(myState->mstate);
 
 	/* close transientrel, but keep lock until commit */
 	table_close(myState->transientrel, NoLock);
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 3556240c8e..0c984aa656 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -6060,10 +6060,8 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode)
 	int			i;
 	ListCell   *l;
 	EState	   *estate;
-	CommandId	mycid;
-	BulkInsertState bistate;
-	int			ti_options;
 	ExprState  *partqualstate = NULL;
+	TableModifyState *mstate = NULL;
 
 	/*
 	 * Open the relation(s).  We have surely already locked the existing
@@ -6082,18 +6080,15 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode)
 	 * Prepare a BulkInsertState and options for table_tuple_insert.  The FSM
 	 * is empty, so don't bother using it.
 	 */
-	if (newrel)
+	if (newrel && mstate == NULL)
 	{
-		mycid = GetCurrentCommandId(true);
-		bistate = GetBulkInsertState();
-		ti_options = TABLE_INSERT_SKIP_FSM;
-	}
-	else
-	{
-		/* keep compiler quiet about using these uninitialized */
-		mycid = 0;
-		bistate = NULL;
-		ti_options = 0;
+		mstate = table_modify_begin(newrel,
+									TM_FLAG_MULTI_INSERTS |
+									TM_FLAG_BAS_BULKWRITE,
+									GetCurrentCommandId(true),
+									TABLE_INSERT_SKIP_FSM,
+									NULL,
+									NULL);
 	}
 
 	/*
@@ -6392,8 +6387,7 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode)
 
 			/* Write the tuple out to the new relation */
 			if (newrel)
-				table_tuple_insert(newrel, insertslot, mycid,
-								   ti_options, bistate);
+				table_modify_buffer_insert(mstate, insertslot);
 
 			ResetExprContext(econtext);
 
@@ -6414,10 +6408,7 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode)
 	table_close(oldrel, NoLock);
 	if (newrel)
 	{
-		FreeBulkInsertState(bistate);
-
-		table_finish_bulk_insert(newrel, ti_options);
-
+		table_modify_end(mstate);
 		table_close(newrel, NoLock);
 	}
 }
-- 
2.34.1

From e60cb6e85ac4e319a3b619e24f66f6b44808fe70 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Sat, 27 Apr 2024 15:45:49 +0000
Subject: [PATCH v20 3/5] Optimize INSERT INTO ... SELECT with multi inserts

---
 src/backend/executor/nodeModifyTable.c | 170 ++++++++++++++++++++++---
 src/tools/pgindent/typedefs.list       |   1 +
 2 files changed, 153 insertions(+), 18 deletions(-)

diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c
index cee60d3659..cd044c9dee 100644
--- a/src/backend/executor/nodeModifyTable.c
+++ b/src/backend/executor/nodeModifyTable.c
@@ -114,6 +114,18 @@ typedef struct UpdateContext
 	LockTupleMode lockmode;
 } UpdateContext;
 
+typedef struct InsertModifyBufferFlushContext
+{
+	ResultRelInfo *resultRelInfo;
+	EState	   *estate;
+	ModifyTableState *mtstate;
+} InsertModifyBufferFlushContext;
+
+static InsertModifyBufferFlushContext *insert_modify_buffer_flush_context = NULL;
+static TableModifyState *table_modify_state = NULL;
+
+static void InsertModifyBufferFlushCallback(void *context,
+											TupleTableSlot *slot);
 
 static void ExecBatchInsert(ModifyTableState *mtstate,
 							ResultRelInfo *resultRelInfo,
@@ -726,6 +738,55 @@ ExecGetUpdateNewTuple(ResultRelInfo *relinfo,
 	return ExecProject(newProj);
 }
 
+static void
+InsertModifyBufferFlushCallback(void *context, TupleTableSlot *slot)
+{
+	InsertModifyBufferFlushContext *ctx = (InsertModifyBufferFlushContext *) context;
+	ResultRelInfo *resultRelInfo = ctx->resultRelInfo;
+	EState	   *estate = ctx->estate;
+	ModifyTableState *mtstate = ctx->mtstate;
+
+	/* Quick exit if no indexes or no triggers */
+	if (!(resultRelInfo->ri_NumIndices > 0 ||
+		  (resultRelInfo->ri_TrigDesc != NULL &&
+		   (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
+			resultRelInfo->ri_TrigDesc->trig_insert_new_table))))
+		return;
+
+	/* Caller must take care of opening and closing the indices */
+
+	/*
+	 * If there are any indexes, update them for all the inserted tuples, and
+	 * run AFTER ROW INSERT triggers.
+	 */
+	if (resultRelInfo->ri_NumIndices > 0)
+	{
+		List	   *recheckIndexes;
+
+		recheckIndexes =
+			ExecInsertIndexTuples(resultRelInfo,
+								  slot, estate, false,
+								  false, NULL, NIL, false);
+		ExecARInsertTriggers(estate, resultRelInfo,
+							 slot, recheckIndexes,
+							 mtstate->mt_transition_capture);
+		list_free(recheckIndexes);
+	}
+
+	/*
+	 * There's no indexes, but see if we need to run AFTER ROW INSERT triggers
+	 * anyway.
+	 */
+	else if (resultRelInfo->ri_TrigDesc != NULL &&
+			 (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
+			  resultRelInfo->ri_TrigDesc->trig_insert_new_table))
+	{
+		ExecARInsertTriggers(estate, resultRelInfo,
+							 slot, NIL,
+							 mtstate->mt_transition_capture);
+	}
+}
+
 /* ----------------------------------------------------------------
  *		ExecInsert
  *
@@ -751,7 +812,8 @@ ExecInsert(ModifyTableContext *context,
 		   TupleTableSlot *slot,
 		   bool canSetTag,
 		   TupleTableSlot **inserted_tuple,
-		   ResultRelInfo **insert_destrel)
+		   ResultRelInfo **insert_destrel,
+		   bool canMultiInsert)
 {
 	ModifyTableState *mtstate = context->mtstate;
 	EState	   *estate = context->estate;
@@ -764,6 +826,7 @@ ExecInsert(ModifyTableContext *context,
 	OnConflictAction onconflict = node->onConflictAction;
 	PartitionTupleRouting *proute = mtstate->mt_partition_tuple_routing;
 	MemoryContext oldContext;
+	bool		ar_insert_triggers_executed = false;
 
 	/*
 	 * If the input result relation is a partitioned table, find the leaf
@@ -1126,17 +1189,53 @@ ExecInsert(ModifyTableContext *context,
 		}
 		else
 		{
-			/* insert the tuple normally */
-			table_tuple_insert(resultRelationDesc, slot,
-							   estate->es_output_cid,
-							   0, NULL);
+			if (canMultiInsert &&
+				proute == NULL &&
+				resultRelInfo->ri_WithCheckOptions == NIL &&
+				resultRelInfo->ri_projectReturning == NULL)
+			{
+				if (insert_modify_buffer_flush_context == NULL)
+				{
+					insert_modify_buffer_flush_context =
+						(InsertModifyBufferFlushContext *) palloc0(sizeof(InsertModifyBufferFlushContext));
+					insert_modify_buffer_flush_context->resultRelInfo = resultRelInfo;
+					insert_modify_buffer_flush_context->estate = estate;
+					insert_modify_buffer_flush_context->mtstate = mtstate;
+				}
 
-			/* insert index entries for tuple */
-			if (resultRelInfo->ri_NumIndices > 0)
-				recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
-													   slot, estate, false,
-													   false, NULL, NIL,
-													   false);
+				if (table_modify_state == NULL)
+				{
+					table_modify_state = table_modify_begin(resultRelInfo->ri_RelationDesc,
+															TM_FLAG_MULTI_INSERTS,
+															estate->es_output_cid,
+															0,
+															InsertModifyBufferFlushCallback,
+															insert_modify_buffer_flush_context);
+				}
+
+				table_modify_buffer_insert(table_modify_state, slot);
+				ar_insert_triggers_executed = true;
+			}
+			else
+			{
+				/* insert the tuple normally */
+				table_tuple_insert(resultRelationDesc, slot,
+								   estate->es_output_cid,
+								   0, NULL);
+
+				/* insert index entries for tuple */
+				if (resultRelInfo->ri_NumIndices > 0)
+					recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
+														   slot, estate, false,
+														   false, NULL, NIL,
+														   false);
+
+				ExecARInsertTriggers(estate, resultRelInfo, slot, recheckIndexes,
+									 mtstate->mt_transition_capture);
+
+				list_free(recheckIndexes);
+				ar_insert_triggers_executed = true;
+			}
 		}
 	}
 
@@ -1170,10 +1269,12 @@ ExecInsert(ModifyTableContext *context,
 	}
 
 	/* AFTER ROW INSERT Triggers */
-	ExecARInsertTriggers(estate, resultRelInfo, slot, recheckIndexes,
-						 ar_insert_trig_tcs);
-
-	list_free(recheckIndexes);
+	if (!ar_insert_triggers_executed)
+	{
+		ExecARInsertTriggers(estate, resultRelInfo, slot, recheckIndexes,
+							 ar_insert_trig_tcs);
+		list_free(recheckIndexes);
+	}
 
 	/*
 	 * Check any WITH CHECK OPTION constraints from parent views.  We are
@@ -1869,7 +1970,7 @@ ExecCrossPartitionUpdate(ModifyTableContext *context,
 	/* Tuple routing starts from the root table. */
 	context->cpUpdateReturningSlot =
 		ExecInsert(context, mtstate->rootResultRelInfo, slot, canSetTag,
-				   inserted_tuple, insert_destrel);
+				   inserted_tuple, insert_destrel, false);
 
 	/*
 	 * Reset the transition state that may possibly have been written by
@@ -3364,7 +3465,7 @@ ExecMergeNotMatched(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
 				mtstate->mt_merge_action = action;
 
 				rslot = ExecInsert(context, mtstate->rootResultRelInfo,
-								   newslot, canSetTag, NULL, NULL);
+								   newslot, canSetTag, NULL, NULL, false);
 				mtstate->mt_merge_inserted += 1;
 				break;
 			case CMD_NOTHING:
@@ -3749,6 +3850,10 @@ ExecModifyTable(PlanState *pstate)
 	HeapTupleData oldtupdata;
 	HeapTuple	oldtuple;
 	ItemPointer tupleid;
+	bool		canMultiInsert = false;
+
+	table_modify_state = NULL;
+	insert_modify_buffer_flush_context = NULL;
 
 	CHECK_FOR_INTERRUPTS();
 
@@ -3844,6 +3949,10 @@ ExecModifyTable(PlanState *pstate)
 		if (TupIsNull(context.planSlot))
 			break;
 
+		if (operation == CMD_INSERT &&
+			nodeTag(subplanstate) == T_SeqScanState)
+			canMultiInsert = true;
+
 		/*
 		 * When there are multiple result relations, each tuple contains a
 		 * junk column that gives the OID of the rel from which it came.
@@ -4057,7 +4166,7 @@ ExecModifyTable(PlanState *pstate)
 					ExecInitInsertProjection(node, resultRelInfo);
 				slot = ExecGetInsertNewTuple(resultRelInfo, context.planSlot);
 				slot = ExecInsert(&context, resultRelInfo, slot,
-								  node->canSetTag, NULL, NULL);
+								  node->canSetTag, NULL, NULL, canMultiInsert);
 				break;
 
 			case CMD_UPDATE:
@@ -4116,6 +4225,17 @@ ExecModifyTable(PlanState *pstate)
 			return slot;
 	}
 
+	if (table_modify_state != NULL)
+	{
+		Assert(operation == CMD_INSERT);
+
+		table_modify_end(table_modify_state);
+		table_modify_state = NULL;
+
+		pfree(insert_modify_buffer_flush_context);
+		insert_modify_buffer_flush_context = NULL;
+	}
+
 	/*
 	 * Insert remaining tuples for batch insert.
 	 */
@@ -4228,6 +4348,9 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
 	mtstate->mt_merge_updated = 0;
 	mtstate->mt_merge_deleted = 0;
 
+	table_modify_state = NULL;
+	insert_modify_buffer_flush_context = NULL;
+
 	/*----------
 	 * Resolve the target relation. This is the same as:
 	 *
@@ -4681,6 +4804,17 @@ ExecEndModifyTable(ModifyTableState *node)
 {
 	int			i;
 
+	if (table_modify_state != NULL)
+	{
+		Assert(node->operation == CMD_INSERT);
+
+		table_modify_end(table_modify_state);
+		table_modify_state = NULL;
+
+		pfree(insert_modify_buffer_flush_context);
+		insert_modify_buffer_flush_context = NULL;
+	}
+
 	/*
 	 * Allow any FDWs to shut down
 	 */
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 11744f2ccc..57aabf51d8 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1226,6 +1226,7 @@ InjectionPointEntry
 InjectionPointSharedState
 InlineCodeBlock
 InProgressIO
+InsertModifyBufferFlushContext
 InsertStmt
 Instrumentation
 Int128AggState
-- 
2.34.1

From 4040ada4886220bb979edab83f7f88d32679dcbd Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Sat, 27 Apr 2024 15:48:25 +0000
Subject: [PATCH v20 4/5] Optimize Logical Replication apply with multi inserts

---
 src/backend/executor/execReplication.c   |  39 +++
 src/backend/replication/logical/proto.c  |  24 ++
 src/backend/replication/logical/worker.c | 351 ++++++++++++++++++++++-
 src/include/executor/executor.h          |   4 +
 src/include/replication/logicalproto.h   |   2 +
 src/tools/pgindent/typedefs.list         |   2 +
 6 files changed, 409 insertions(+), 13 deletions(-)

diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c
index d0a89cd577..fae1375537 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -544,6 +544,45 @@ ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo,
 	}
 }
 
+void
+ExecRelationMultiInsert(TableModifyState *MultiInsertState,
+						ResultRelInfo *resultRelInfo,
+						EState *estate, TupleTableSlot *slot)
+{
+	bool		skip_tuple = false;
+	Relation	rel = resultRelInfo->ri_RelationDesc;
+
+	/* For now we support only tables. */
+	Assert(rel->rd_rel->relkind == RELKIND_RELATION);
+
+	CheckCmdReplicaIdentity(rel, CMD_INSERT);
+
+	/* BEFORE ROW INSERT Triggers */
+	if (resultRelInfo->ri_TrigDesc &&
+		resultRelInfo->ri_TrigDesc->trig_insert_before_row)
+	{
+		if (!ExecBRInsertTriggers(estate, resultRelInfo, slot))
+			skip_tuple = true;	/* "do nothing" */
+	}
+
+	if (!skip_tuple)
+	{
+		/* Compute stored generated columns */
+		if (rel->rd_att->constr &&
+			rel->rd_att->constr->has_generated_stored)
+			ExecComputeStoredGenerated(resultRelInfo, estate, slot,
+									   CMD_INSERT);
+
+		/* Check the constraints of the tuple */
+		if (rel->rd_att->constr)
+			ExecConstraints(resultRelInfo, slot, estate);
+		if (rel->rd_rel->relispartition)
+			ExecPartitionCheck(resultRelInfo, slot, estate, true);
+
+		table_modify_buffer_insert(MultiInsertState, slot);
+	}
+}
+
 /*
  * Find the searchslot tuple and update it with data in the slot,
  * update the indexes, and execute any constraints and per-row triggers.
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 95c09c9516..46d38aebd2 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -427,6 +427,30 @@ logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
 	logicalrep_write_tuple(out, rel, newslot, binary, columns);
 }
 
+LogicalRepRelId
+logicalrep_read_relid(StringInfo in)
+{
+	LogicalRepRelId relid;
+
+	/* read the relation id */
+	relid = pq_getmsgint(in, 4);
+
+	return relid;
+}
+
+void
+logicalrep_read_insert_v2(StringInfo in, LogicalRepTupleData *newtup)
+{
+	char		action;
+
+	action = pq_getmsgbyte(in);
+	if (action != 'N')
+		elog(ERROR, "expected new tuple but got %d",
+			 action);
+
+	logicalrep_read_tuple(in, newtup);
+}
+
 /*
  * Read INSERT from stream.
  *
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index b5a80fe3e8..d62772f590 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -148,7 +148,6 @@
 #include <unistd.h>
 
 #include "access/table.h"
-#include "access/tableam.h"
 #include "access/twophase.h"
 #include "access/xact.h"
 #include "catalog/indexing.h"
@@ -416,6 +415,30 @@ static inline void reset_apply_error_context_info(void);
 static TransApplyAction get_transaction_apply_action(TransactionId xid,
 													 ParallelApplyWorkerInfo **winfo);
 
+typedef enum LRMultiInsertReturnStatus
+{
+	LR_MULTI_INSERT_NONE = 0,
+	LR_MULTI_INSERT_REL_SKIPPED,
+	LR_MULTI_INSERT_DISALLOWED,
+	LR_MULTI_INSERT_DONE,
+} LRMultiInsertReturnStatus;
+
+static TableModifyState *MultiInsertState = NULL;
+static LogicalRepRelMapEntry *LastRel = NULL;
+static LogicalRepRelId LastMultiInsertRelId = InvalidOid;
+static ApplyExecutionData *LastEData = NULL;
+static TupleTableSlot *LastRemoteSlot = NULL;
+
+typedef struct LRModifyBufferFlushContext
+{
+	ResultRelInfo *resultRelInfo;
+	EState	   *estate;
+} LRModifyBufferFlushContext;
+
+static LRModifyBufferFlushContext *modify_buffer_flush_context = NULL;
+static void LRModifyBufferFlushCallback(void *context, TupleTableSlot *slot);
+static void FinishMultiInserts(void);
+
 /*
  * Form the origin name for the subscription.
  *
@@ -1017,6 +1040,8 @@ apply_handle_commit(StringInfo s)
 {
 	LogicalRepCommitData commit_data;
 
+	FinishMultiInserts();
+
 	logicalrep_read_commit(s, &commit_data);
 
 	if (commit_data.commit_lsn != remote_final_lsn)
@@ -1043,6 +1068,8 @@ apply_handle_begin_prepare(StringInfo s)
 {
 	LogicalRepPreparedTxnData begin_data;
 
+	FinishMultiInserts();
+
 	/* Tablesync should never receive prepare. */
 	if (am_tablesync_worker())
 		ereport(ERROR,
@@ -1109,6 +1136,8 @@ apply_handle_prepare(StringInfo s)
 {
 	LogicalRepPreparedTxnData prepare_data;
 
+	FinishMultiInserts();
+
 	logicalrep_read_prepare(s, &prepare_data);
 
 	if (prepare_data.prepare_lsn != remote_final_lsn)
@@ -1171,6 +1200,8 @@ apply_handle_commit_prepared(StringInfo s)
 	LogicalRepCommitPreparedTxnData prepare_data;
 	char		gid[GIDSIZE];
 
+	FinishMultiInserts();
+
 	logicalrep_read_commit_prepared(s, &prepare_data);
 	set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_lsn);
 
@@ -1220,6 +1251,8 @@ apply_handle_rollback_prepared(StringInfo s)
 	LogicalRepRollbackPreparedTxnData rollback_data;
 	char		gid[GIDSIZE];
 
+	FinishMultiInserts();
+
 	logicalrep_read_rollback_prepared(s, &rollback_data);
 	set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_end_lsn);
 
@@ -1277,6 +1310,8 @@ apply_handle_stream_prepare(StringInfo s)
 	/* Save the message before it is consumed. */
 	StringInfoData original_msg = *s;
 
+	FinishMultiInserts();
+
 	if (in_streamed_transaction)
 		ereport(ERROR,
 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
@@ -1304,6 +1339,8 @@ apply_handle_stream_prepare(StringInfo s)
 			apply_spooled_messages(MyLogicalRepWorker->stream_fileset,
 								   prepare_data.xid, prepare_data.prepare_lsn);
 
+			FinishMultiInserts();
+
 			/* Mark the transaction as prepared. */
 			apply_handle_prepare_internal(&prepare_data);
 
@@ -1407,6 +1444,8 @@ apply_handle_stream_prepare(StringInfo s)
 static void
 apply_handle_origin(StringInfo s)
 {
+	FinishMultiInserts();
+
 	/*
 	 * ORIGIN message can only come inside streaming transaction or inside
 	 * remote transaction and before any actual writes.
@@ -1473,6 +1512,8 @@ apply_handle_stream_start(StringInfo s)
 	/* Save the message before it is consumed. */
 	StringInfoData original_msg = *s;
 
+	FinishMultiInserts();
+
 	if (in_streamed_transaction)
 		ereport(ERROR,
 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
@@ -1628,6 +1669,8 @@ apply_handle_stream_stop(StringInfo s)
 	ParallelApplyWorkerInfo *winfo;
 	TransApplyAction apply_action;
 
+	FinishMultiInserts();
+
 	if (!in_streamed_transaction)
 		ereport(ERROR,
 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
@@ -1821,6 +1864,8 @@ apply_handle_stream_abort(StringInfo s)
 	StringInfoData original_msg = *s;
 	bool		toplevel_xact;
 
+	FinishMultiInserts();
+
 	if (in_streamed_transaction)
 		ereport(ERROR,
 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
@@ -2138,6 +2183,8 @@ apply_handle_stream_commit(StringInfo s)
 	/* Save the message before it is consumed. */
 	StringInfoData original_msg = *s;
 
+	FinishMultiInserts();
+
 	if (in_streamed_transaction)
 		ereport(ERROR,
 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
@@ -2159,6 +2206,8 @@ apply_handle_stream_commit(StringInfo s)
 			apply_spooled_messages(MyLogicalRepWorker->stream_fileset, xid,
 								   commit_data.commit_lsn);
 
+			FinishMultiInserts();
+
 			apply_handle_commit_internal(&commit_data);
 
 			/* Unlink the files with serialized changes and subxact info. */
@@ -2302,6 +2351,8 @@ apply_handle_relation(StringInfo s)
 {
 	LogicalRepRelation *rel;
 
+	FinishMultiInserts();
+
 	if (handle_streamed_transaction(LOGICAL_REP_MSG_RELATION, s))
 		return;
 
@@ -2325,6 +2376,8 @@ apply_handle_type(StringInfo s)
 {
 	LogicalRepTyp typ;
 
+	FinishMultiInserts();
+
 	if (handle_streamed_transaction(LOGICAL_REP_MSG_TYPE, s))
 		return;
 
@@ -2363,16 +2416,126 @@ TargetPrivilegesCheck(Relation rel, AclMode mode)
 						RelationGetRelationName(rel))));
 }
 
-/*
- * Handle INSERT message.
- */
+static void
+FinishMultiInserts(void)
+{
+	LogicalRepMsgType saved_command;
+
+	if (MultiInsertState == NULL)
+		return;
+
+	Assert(OidIsValid(LastMultiInsertRelId));
+	Assert(LastEData != NULL);
+
+	/* Set relation for error callback */
+	apply_error_callback_arg.rel = LastRel;
+
+	/* Set current command for error callback */
+	saved_command = apply_error_callback_arg.command;
+	apply_error_callback_arg.command = LOGICAL_REP_MSG_INSERT;
+
+	ExecDropSingleTupleTableSlot(LastRemoteSlot);
+	LastRemoteSlot = NULL;
+
+	table_modify_end(MultiInsertState);
+	MultiInsertState = NULL;
+	LastMultiInsertRelId = InvalidOid;
+
+	pfree(modify_buffer_flush_context);
+	modify_buffer_flush_context = NULL;
+
+	ExecCloseIndices(LastEData->targetRelInfo);
+
+	finish_edata(LastEData);
+	LastEData = NULL;
+
+	/* Reset relation for error callback */
+	apply_error_callback_arg.rel = NULL;
+
+	/* Reset the current command */
+	apply_error_callback_arg.command = saved_command;
+
+	logicalrep_rel_close(LastRel, NoLock);
+	LastRel = NULL;
+
+	end_replication_step();
+}
 
 static void
-apply_handle_insert(StringInfo s)
+LRModifyBufferFlushCallback(void *context, TupleTableSlot *slot)
+{
+	LRModifyBufferFlushContext *ctx = (LRModifyBufferFlushContext *) context;
+	ResultRelInfo *resultRelInfo = ctx->resultRelInfo;
+	EState	   *estate = ctx->estate;
+	LogicalRepMsgType saved_command;
+
+	/* Quick exit if no indexes or no triggers */
+	if (!(resultRelInfo->ri_NumIndices > 0 ||
+		  (resultRelInfo->ri_TrigDesc != NULL &&
+		   (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
+			resultRelInfo->ri_TrigDesc->trig_insert_new_table))))
+		return;
+
+	/* Set relation for error callback */
+	apply_error_callback_arg.rel = LastRel;
+
+	/* Set current command for error callback */
+	saved_command = apply_error_callback_arg.command;
+	apply_error_callback_arg.command = LOGICAL_REP_MSG_INSERT;
+
+	/* Caller must take care of opening and closing the indices */
+
+	/*
+	 * If there are any indexes, update them for all the inserted tuples, and
+	 * run AFTER ROW INSERT triggers.
+	 */
+	if (resultRelInfo->ri_NumIndices > 0)
+	{
+		List	   *recheckIndexes;
+
+		recheckIndexes =
+			ExecInsertIndexTuples(resultRelInfo,
+								  slot, estate, false,
+								  false, NULL, NIL, false);
+
+		ExecARInsertTriggers(estate, resultRelInfo,
+							 slot, recheckIndexes,
+							 NULL);
+
+		list_free(recheckIndexes);
+	}
+
+	/*
+	 * There's no indexes, but see if we need to run AFTER ROW INSERT triggers
+	 * anyway.
+	 */
+	else if (resultRelInfo->ri_TrigDesc != NULL &&
+			 (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
+			  resultRelInfo->ri_TrigDesc->trig_insert_new_table))
+	{
+		ExecARInsertTriggers(estate, resultRelInfo,
+							 slot, NIL,
+							 NULL);
+	}
+
+	/*
+	 * XXX we should in theory pass a TransitionCaptureState object to the
+	 * above to capture transition tuples, but after statement triggers don't
+	 * actually get fired by replication yet anyway
+	 */
+
+	/* Reset relation for error callback */
+	apply_error_callback_arg.rel = NULL;
+
+	/* Reset the current command */
+	apply_error_callback_arg.command = saved_command;
+}
+
+static LRMultiInsertReturnStatus
+do_multi_inserts(StringInfo s, LogicalRepRelId *relid)
 {
 	LogicalRepRelMapEntry *rel;
 	LogicalRepTupleData newtup;
-	LogicalRepRelId relid;
 	UserContext ucxt;
 	ApplyExecutionData *edata;
 	EState	   *estate;
@@ -2380,17 +2543,143 @@ apply_handle_insert(StringInfo s)
 	MemoryContext oldctx;
 	bool		run_as_owner;
 
+	if (MultiInsertState == NULL)
+		begin_replication_step();
+
+	*relid = logicalrep_read_relid(s);
+
+	if (MultiInsertState != NULL &&
+		(LastMultiInsertRelId != InvalidOid &&
+		 *relid != InvalidOid &&
+		 LastMultiInsertRelId != *relid))
+		FinishMultiInserts();
+
+	if (MultiInsertState == NULL)
+		rel = logicalrep_rel_open(*relid, RowExclusiveLock);
+	else
+		rel = LastRel;
+
+	if (!should_apply_changes_for_rel(rel))
+	{
+		Assert(MultiInsertState == NULL);
+
+		/*
+		 * The relation can't become interesting in the middle of the
+		 * transaction so it's safe to unlock it.
+		 */
+		logicalrep_rel_close(rel, RowExclusiveLock);
+		end_replication_step();
+		return LR_MULTI_INSERT_REL_SKIPPED;
+	}
+
+	/* For a partitioned table, let's not do multi inserts. */
+	if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+	{
+		Assert(MultiInsertState == NULL);
+
+		/*
+		 * The relation can't become interesting in the middle of the
+		 * transaction so it's safe to unlock it.
+		 */
+		logicalrep_rel_close(rel, RowExclusiveLock);
+		end_replication_step();
+		return LR_MULTI_INSERT_DISALLOWED;
+	}
+
 	/*
-	 * Quick return if we are skipping data modification changes or handling
-	 * streamed transactions.
+	 * Make sure that any user-supplied code runs as the table owner, unless
+	 * the user has opted out of that behavior.
 	 */
-	if (is_skipping_changes() ||
-		handle_streamed_transaction(LOGICAL_REP_MSG_INSERT, s))
-		return;
+	run_as_owner = MySubscription->runasowner;
+	if (!run_as_owner)
+		SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
+
+	/* Set relation for error callback */
+	apply_error_callback_arg.rel = rel;
+
+	if (MultiInsertState == NULL)
+	{
+		oldctx = MemoryContextSwitchTo(TopTransactionContext);
+
+		/* Initialize the executor state. */
+		LastEData = edata = create_edata_for_relation(rel);
+		estate = edata->estate;
+
+		LastRemoteSlot = remoteslot = MakeTupleTableSlot(RelationGetDescr(rel->localrel),
+														 &TTSOpsVirtual);
+
+		modify_buffer_flush_context = (LRModifyBufferFlushContext *) palloc(sizeof(LRModifyBufferFlushContext));
+		modify_buffer_flush_context->resultRelInfo = edata->targetRelInfo;
+		modify_buffer_flush_context->estate = estate;
+
+		MultiInsertState = table_modify_begin(edata->targetRelInfo->ri_RelationDesc,
+											  TM_FLAG_MULTI_INSERTS |
+											  TM_FLAG_BAS_BULKWRITE,
+											  GetCurrentCommandId(true),
+											  0,
+											  LRModifyBufferFlushCallback,
+											  modify_buffer_flush_context);
+		LastRel = rel;
+		LastMultiInsertRelId = *relid;
+
+		/* We must open indexes here. */
+		ExecOpenIndices(edata->targetRelInfo, false);
+
+		MemoryContextSwitchTo(oldctx);
+	}
+	else
+	{
+		CommandId	cid;
+
+		edata = LastEData;
+		estate = edata->estate;
+		ResetExprContext(GetPerTupleExprContext(estate));
+		ExecClearTuple(LastRemoteSlot);
+		remoteslot = LastRemoteSlot;
+		cid = GetCurrentCommandId(true);
+		MultiInsertState->cid = cid;
+		estate->es_output_cid = cid;
+	}
+
+	/* Process and store remote tuple in the slot */
+	logicalrep_read_insert_v2(s, &newtup);
+	oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
+	slot_store_data(remoteslot, rel, &newtup);
+	slot_fill_defaults(rel, estate, remoteslot);
+	MemoryContextSwitchTo(oldctx);
+
+	TargetPrivilegesCheck(edata->targetRelInfo->ri_RelationDesc, ACL_INSERT);
+	ExecRelationMultiInsert(MultiInsertState, edata->targetRelInfo, estate, remoteslot);
+
+	/* Reset relation for error callback */
+	apply_error_callback_arg.rel = NULL;
+
+	if (!run_as_owner)
+		RestoreUserContext(&ucxt);
+
+	Assert(MultiInsertState != NULL);
+
+	CommandCounterIncrement();
+
+	return LR_MULTI_INSERT_DONE;
+}
+
+static bool
+do_single_inserts(StringInfo s, LogicalRepRelId relid)
+{
+	LogicalRepRelMapEntry *rel;
+	LogicalRepTupleData newtup;
+	UserContext ucxt;
+	ApplyExecutionData *edata;
+	EState	   *estate;
+	TupleTableSlot *remoteslot;
+	MemoryContext oldctx;
+	bool		run_as_owner;
+
+	Assert(relid != InvalidOid);
 
 	begin_replication_step();
 
-	relid = logicalrep_read_insert(s, &newtup);
 	rel = logicalrep_rel_open(relid, RowExclusiveLock);
 	if (!should_apply_changes_for_rel(rel))
 	{
@@ -2400,7 +2689,7 @@ apply_handle_insert(StringInfo s)
 		 */
 		logicalrep_rel_close(rel, RowExclusiveLock);
 		end_replication_step();
-		return;
+		return false;
 	}
 
 	/*
@@ -2422,6 +2711,7 @@ apply_handle_insert(StringInfo s)
 										&TTSOpsVirtual);
 
 	/* Process and store remote tuple in the slot */
+	logicalrep_read_insert_v2(s, &newtup);
 	oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
 	slot_store_data(remoteslot, rel, &newtup);
 	slot_fill_defaults(rel, estate, remoteslot);
@@ -2446,6 +2736,35 @@ apply_handle_insert(StringInfo s)
 	logicalrep_rel_close(rel, NoLock);
 
 	end_replication_step();
+
+	return true;
+}
+
+/*
+ * Handle INSERT message.
+ */
+static void
+apply_handle_insert(StringInfo s)
+{
+	LRMultiInsertReturnStatus mi_status;
+	LogicalRepRelId relid;
+
+	/*
+	 * Quick return if we are skipping data modification changes or handling
+	 * streamed transactions.
+	 */
+	if (is_skipping_changes() ||
+		handle_streamed_transaction(LOGICAL_REP_MSG_INSERT, s))
+		return;
+
+	mi_status = do_multi_inserts(s, &relid);
+	if (mi_status == LR_MULTI_INSERT_REL_SKIPPED ||
+		mi_status == LR_MULTI_INSERT_DONE)
+		return;
+
+	do_single_inserts(s, relid);
+
+	return;
 }
 
 /*
@@ -2532,6 +2851,8 @@ apply_handle_update(StringInfo s)
 	MemoryContext oldctx;
 	bool		run_as_owner;
 
+	FinishMultiInserts();
+
 	/*
 	 * Quick return if we are skipping data modification changes or handling
 	 * streamed transactions.
@@ -2713,6 +3034,8 @@ apply_handle_delete(StringInfo s)
 	MemoryContext oldctx;
 	bool		run_as_owner;
 
+	FinishMultiInserts();
+
 	/*
 	 * Quick return if we are skipping data modification changes or handling
 	 * streamed transactions.
@@ -3154,6 +3477,8 @@ apply_handle_truncate(StringInfo s)
 	ListCell   *lc;
 	LOCKMODE	lockmode = AccessExclusiveLock;
 
+	FinishMultiInserts();
+
 	/*
 	 * Quick return if we are skipping data modification changes or handling
 	 * streamed transactions.
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index 9770752ea3..8f10ea977b 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -14,6 +14,7 @@
 #ifndef EXECUTOR_H
 #define EXECUTOR_H
 
+#include "access/tableam.h"
 #include "executor/execdesc.h"
 #include "fmgr.h"
 #include "nodes/lockoptions.h"
@@ -656,6 +657,9 @@ extern bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode,
 
 extern void ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo,
 									 EState *estate, TupleTableSlot *slot);
+extern void ExecRelationMultiInsert(TableModifyState *MultiInsertState,
+									ResultRelInfo *resultRelInfo,
+									EState *estate, TupleTableSlot *slot);
 extern void ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo,
 									 EState *estate, EPQState *epqstate,
 									 TupleTableSlot *searchslot, TupleTableSlot *slot);
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index c409638a2e..3f3a7f0a31 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -226,6 +226,8 @@ extern void logicalrep_write_insert(StringInfo out, TransactionId xid,
 									Relation rel,
 									TupleTableSlot *newslot,
 									bool binary, Bitmapset *columns);
+extern LogicalRepRelId logicalrep_read_relid(StringInfo in);
+extern void logicalrep_read_insert_v2(StringInfo in, LogicalRepTupleData *newtup);
 extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup);
 extern void logicalrep_write_update(StringInfo out, TransactionId xid,
 									Relation rel,
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 57aabf51d8..9582503bb4 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1454,6 +1454,8 @@ LPTHREAD_START_ROUTINE
 LPTSTR
 LPVOID
 LPWSTR
+LRModifyBufferFlushContext
+LRMultiInsertReturnStatus
 LSEG
 LUID
 LVRelState
-- 
2.34.1

From 684d846c53dd47e8b1654d8a58e8cca0194bdf12 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Mon, 29 Apr 2024 05:35:58 +0000
Subject: [PATCH v20 5/5] Use new multi insert Table AM for COPY FROM

---
 contrib/test_decoding/expected/stream.out |   2 +-
 src/backend/commands/copyfrom.c           | 235 ++++++++++++++--------
 src/include/commands/copyfrom_internal.h  |   4 +-
 src/tools/pgindent/typedefs.list          |   1 +
 4 files changed, 160 insertions(+), 82 deletions(-)

diff --git a/contrib/test_decoding/expected/stream.out b/contrib/test_decoding/expected/stream.out
index 4ab2d47bf8..c19facb3c9 100644
--- a/contrib/test_decoding/expected/stream.out
+++ b/contrib/test_decoding/expected/stream.out
@@ -101,10 +101,10 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'incl
  streaming change for transaction
  streaming change for transaction
  streaming change for transaction
- streaming change for transaction
  closing a streamed block for transaction
  opening a streamed block for transaction
  streaming change for transaction
+ streaming change for transaction
  closing a streamed block for transaction
  committing streamed transaction
 (17 rows)
diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index ce4d62e707..403adfe481 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -71,14 +71,25 @@
 /* Trim the list of buffers back down to this number after flushing */
 #define MAX_PARTITION_BUFFERS	32
 
+typedef struct CopyModifyBufferFlushContext
+{
+	CopyFromState cstate;
+	ResultRelInfo *resultRelInfo;
+	EState	   *estate;
+} CopyModifyBufferFlushContext;
+
 /* Stores multi-insert data related to a single relation in CopyFrom. */
 typedef struct CopyMultiInsertBuffer
 {
-	TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */
+	TableModifyState *mstate;	/* Table insert state; NULL if foreign table */
+	TupleTableSlot **slots;		/* Array to store tuples */
 	ResultRelInfo *resultRelInfo;	/* ResultRelInfo for 'relid' */
-	BulkInsertState bistate;	/* BulkInsertState for this rel if plain
-								 * table; NULL if foreign table */
+	TupleTableSlot *multislot;
+	CopyModifyBufferFlushContext *modify_buffer_flush_context;
 	int			nused;			/* number of 'slots' containing tuples */
+	int			currslotno;		/* Current buffered slot number that's being
+								 * flushed; Used to get correct cur_lineno for
+								 * errors while in flush callback. */
 	uint64		linenos[MAX_BUFFERED_TUPLES];	/* Line # of tuple in copy
 												 * stream */
 } CopyMultiInsertBuffer;
@@ -99,6 +110,7 @@ typedef struct CopyMultiInsertInfo
 	int			ti_options;		/* table insert options */
 } CopyMultiInsertInfo;
 
+static void CopyModifyBufferFlushCallback(void *context, TupleTableSlot *slot);
 
 /* non-export function prototypes */
 static void ClosePipeFromProgram(CopyFromState cstate);
@@ -218,14 +230,38 @@ CopyLimitPrintoutLength(const char *str)
  * ResultRelInfo.
  */
 static CopyMultiInsertBuffer *
-CopyMultiInsertBufferInit(ResultRelInfo *rri)
+CopyMultiInsertBufferInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
+						  CopyFromState cstate, EState *estate)
 {
 	CopyMultiInsertBuffer *buffer;
 
 	buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer));
-	memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
+
+	if (rri->ri_FdwRoutine == NULL)
+	{
+		buffer->modify_buffer_flush_context = (CopyModifyBufferFlushContext *) palloc(sizeof(CopyModifyBufferFlushContext));
+		buffer->modify_buffer_flush_context->cstate = cstate;
+		buffer->modify_buffer_flush_context->resultRelInfo = rri;
+		buffer->modify_buffer_flush_context->estate = estate;
+
+		buffer->mstate = table_modify_begin(rri->ri_RelationDesc,
+											TM_FLAG_MULTI_INSERTS |
+											TM_FLAG_BAS_BULKWRITE,
+											miinfo->mycid,
+											miinfo->ti_options,
+											CopyModifyBufferFlushCallback,
+											buffer->modify_buffer_flush_context);
+		buffer->slots = NULL;
+		buffer->multislot = NULL;
+	}
+	else
+	{
+		buffer->mstate = NULL;
+		buffer->slots = palloc0(sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
+		buffer->multislot = NULL;
+	}
+
 	buffer->resultRelInfo = rri;
-	buffer->bistate = (rri->ri_FdwRoutine == NULL) ? GetBulkInsertState() : NULL;
 	buffer->nused = 0;
 
 	return buffer;
@@ -236,11 +272,12 @@ CopyMultiInsertBufferInit(ResultRelInfo *rri)
  */
 static inline void
 CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo,
-							   ResultRelInfo *rri)
+							   ResultRelInfo *rri, CopyFromState cstate,
+							   EState *estate)
 {
 	CopyMultiInsertBuffer *buffer;
 
-	buffer = CopyMultiInsertBufferInit(rri);
+	buffer = CopyMultiInsertBufferInit(miinfo, rri, cstate, estate);
 
 	/* Setup back-link so we can easily find this buffer again */
 	rri->ri_CopyMultiInsertBuffer = buffer;
@@ -273,7 +310,7 @@ CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
 	 * tuples their way for the first time.
 	 */
 	if (rri->ri_RelationDesc->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
-		CopyMultiInsertInfoSetupBuffer(miinfo, rri);
+		CopyMultiInsertInfoSetupBuffer(miinfo, rri, cstate, estate);
 }
 
 /*
@@ -317,8 +354,6 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
 		int			batch_size = resultRelInfo->ri_BatchSize;
 		int			sent = 0;
 
-		Assert(buffer->bistate == NULL);
-
 		/* Ensure that the FDW supports batching and it's enabled */
 		Assert(resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert);
 		Assert(batch_size > 1);
@@ -390,13 +425,8 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
 	}
 	else
 	{
-		CommandId	mycid = miinfo->mycid;
-		int			ti_options = miinfo->ti_options;
 		bool		line_buf_valid = cstate->line_buf_valid;
 		uint64		save_cur_lineno = cstate->cur_lineno;
-		MemoryContext oldcontext;
-
-		Assert(buffer->bistate != NULL);
 
 		/*
 		 * Print error context information correctly, if one of the operations
@@ -404,56 +434,18 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
 		 */
 		cstate->line_buf_valid = false;
 
-		/*
-		 * table_multi_insert may leak memory, so switch to short-lived memory
-		 * context before calling it.
-		 */
-		oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
-		table_multi_insert(resultRelInfo->ri_RelationDesc,
-						   slots,
-						   nused,
-						   mycid,
-						   ti_options,
-						   buffer->bistate);
-		MemoryContextSwitchTo(oldcontext);
+		Assert(buffer->currslotno <= buffer->nused);
+		buffer->currslotno = 0;
 
-		for (i = 0; i < nused; i++)
-		{
-			/*
-			 * If there are any indexes, update them for all the inserted
-			 * tuples, and run AFTER ROW INSERT triggers.
-			 */
-			if (resultRelInfo->ri_NumIndices > 0)
-			{
-				List	   *recheckIndexes;
-
-				cstate->cur_lineno = buffer->linenos[i];
-				recheckIndexes =
-					ExecInsertIndexTuples(resultRelInfo,
-										  buffer->slots[i], estate, false,
-										  false, NULL, NIL, false);
-				ExecARInsertTriggers(estate, resultRelInfo,
-									 slots[i], recheckIndexes,
-									 cstate->transition_capture);
-				list_free(recheckIndexes);
-			}
+		table_modify_buffer_flush(buffer->mstate);
 
-			/*
-			 * There's no indexes, but see if we need to run AFTER ROW INSERT
-			 * triggers anyway.
-			 */
-			else if (resultRelInfo->ri_TrigDesc != NULL &&
-					 (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
-					  resultRelInfo->ri_TrigDesc->trig_insert_new_table))
-			{
-				cstate->cur_lineno = buffer->linenos[i];
-				ExecARInsertTriggers(estate, resultRelInfo,
-									 slots[i], NIL,
-									 cstate->transition_capture);
-			}
+		Assert(buffer->currslotno <= buffer->nused);
+		buffer->currslotno = 0;
 
-			ExecClearTuple(slots[i]);
-		}
+		/*
+		 * Indexes are updated and AFTER ROW INSERT triggers (if any) are run
+		 * in the flush callback CopyModifyBufferFlushCallback.
+		 */
 
 		/* Update the row counter and progress of the COPY command */
 		*processed += nused;
@@ -469,6 +461,60 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
 	buffer->nused = 0;
 }
 
+static void
+CopyModifyBufferFlushCallback(void *context, TupleTableSlot *slot)
+{
+	CopyModifyBufferFlushContext *ctx = (CopyModifyBufferFlushContext *) context;
+	CopyFromState cstate = ctx->cstate;
+	ResultRelInfo *resultRelInfo = ctx->resultRelInfo;
+	EState	   *estate = ctx->estate;
+	CopyMultiInsertBuffer *buffer = resultRelInfo->ri_CopyMultiInsertBuffer;
+
+	/* Quick exit if no indexes or no triggers */
+	if (!(resultRelInfo->ri_NumIndices > 0 ||
+		  (resultRelInfo->ri_TrigDesc != NULL &&
+		   (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
+			resultRelInfo->ri_TrigDesc->trig_insert_new_table))))
+		return;
+
+	/* Caller must take care of opening and closing the indices */
+
+	/*
+	 * If there are any indexes, update them for all the inserted tuples, and
+	 * run AFTER ROW INSERT triggers.
+	 */
+	if (resultRelInfo->ri_NumIndices > 0)
+	{
+		List	   *recheckIndexes;
+
+		cstate->cur_lineno = buffer->linenos[buffer->currslotno++];
+		recheckIndexes =
+			ExecInsertIndexTuples(resultRelInfo,
+								  slot, estate, false,
+								  false, NULL, NIL, false);
+		ExecARInsertTriggers(estate, resultRelInfo,
+							 slot, recheckIndexes,
+							 cstate->transition_capture);
+		list_free(recheckIndexes);
+	}
+
+	/*
+	 * There's no indexes, but see if we need to run AFTER ROW INSERT triggers
+	 * anyway.
+	 */
+	else if (resultRelInfo->ri_TrigDesc != NULL &&
+			 (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
+			  resultRelInfo->ri_TrigDesc->trig_insert_new_table))
+	{
+		cstate->cur_lineno = buffer->linenos[buffer->currslotno++];
+		ExecARInsertTriggers(estate, resultRelInfo,
+							 slot, NIL,
+							 cstate->transition_capture);
+	}
+
+	Assert(buffer->currslotno <= buffer->nused);
+}
+
 /*
  * Drop used slots and free member for this buffer.
  *
@@ -489,19 +535,18 @@ CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo,
 
 	if (resultRelInfo->ri_FdwRoutine == NULL)
 	{
-		Assert(buffer->bistate != NULL);
-		FreeBulkInsertState(buffer->bistate);
+		table_modify_end(buffer->mstate);
+		ExecDropSingleTupleTableSlot(buffer->multislot);
+		pfree(buffer->modify_buffer_flush_context);
 	}
 	else
-		Assert(buffer->bistate == NULL);
-
-	/* Since we only create slots on demand, just drop the non-null ones. */
-	for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
-		ExecDropSingleTupleTableSlot(buffer->slots[i]);
+	{
+		/* Since we only create slots on demand, just drop the non-null ones. */
+		for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
+			ExecDropSingleTupleTableSlot(buffer->slots[i]);
 
-	if (resultRelInfo->ri_FdwRoutine == NULL)
-		table_finish_bulk_insert(resultRelInfo->ri_RelationDesc,
-								 miinfo->ti_options);
+		pfree(buffer->slots);
+	}
 
 	pfree(buffer);
 }
@@ -588,13 +633,32 @@ CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo,
 {
 	CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
 	int			nused = buffer->nused;
+	TupleTableSlot *slot;
 
 	Assert(buffer != NULL);
 	Assert(nused < MAX_BUFFERED_TUPLES);
 
-	if (buffer->slots[nused] == NULL)
-		buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL);
-	return buffer->slots[nused];
+	if (rri->ri_FdwRoutine == NULL)
+	{
+		if (buffer->multislot == NULL)
+			buffer->multislot = MakeTupleTableSlot(RelationGetDescr(rri->ri_RelationDesc),
+												   &TTSOpsVirtual);
+
+		/* Caller must clear the slot */
+		slot = buffer->multislot;
+	}
+	else
+	{
+		if (buffer->slots[nused] == NULL)
+		{
+			slot = table_slot_create(rri->ri_RelationDesc, NULL);
+			buffer->slots[nused] = slot;
+		}
+		else
+			slot = buffer->slots[nused];
+	}
+
+	return slot;
 }
 
 /*
@@ -608,7 +672,11 @@ CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
 	CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
 
 	Assert(buffer != NULL);
-	Assert(slot == buffer->slots[buffer->nused]);
+
+#ifdef USE_ASSERT_CHECKING
+	if (rri->ri_FdwRoutine != NULL)
+		Assert(slot == buffer->slots[buffer->nused]);
+#endif
 
 	/* Store the line number so we can properly report any errors later */
 	buffer->linenos[buffer->nused] = lineno;
@@ -616,6 +684,14 @@ CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
 	/* Record this slot as being used */
 	buffer->nused++;
 
+	if (rri->ri_FdwRoutine == NULL)
+	{
+		Assert(slot == buffer->multislot);
+		buffer->currslotno = 0;
+
+		table_modify_buffer_insert(buffer->mstate, slot);
+	}
+
 	/* Update how many tuples are stored and their size */
 	miinfo->bufferedTuples++;
 	miinfo->bufferedBytes += tuplen;
@@ -830,7 +906,7 @@ CopyFrom(CopyFromState cstate)
 	/*
 	 * It's generally more efficient to prepare a bunch of tuples for
 	 * insertion, and insert them in one
-	 * table_multi_insert()/ExecForeignBatchInsert() call, than call
+	 * table_modify_buffer_insert()/ExecForeignBatchInsert() call, than call
 	 * table_tuple_insert()/ExecForeignInsert() separately for every tuple.
 	 * However, there are a number of reasons why we might not be able to do
 	 * this.  These are explained below.
@@ -1080,7 +1156,8 @@ CopyFrom(CopyFromState cstate)
 				{
 					if (resultRelInfo->ri_CopyMultiInsertBuffer == NULL)
 						CopyMultiInsertInfoSetupBuffer(&multiInsertInfo,
-													   resultRelInfo);
+													   resultRelInfo, cstate,
+													   estate);
 				}
 				else if (insertMethod == CIM_MULTI_CONDITIONAL &&
 						 !CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h
index cad52fcc78..14addbc6f6 100644
--- a/src/include/commands/copyfrom_internal.h
+++ b/src/include/commands/copyfrom_internal.h
@@ -46,9 +46,9 @@ typedef enum EolType
 typedef enum CopyInsertMethod
 {
 	CIM_SINGLE,					/* use table_tuple_insert or ExecForeignInsert */
-	CIM_MULTI,					/* always use table_multi_insert or
+	CIM_MULTI,					/* always use table_modify_buffer_insert or
 								 * ExecForeignBatchInsert */
-	CIM_MULTI_CONDITIONAL,		/* use table_multi_insert or
+	CIM_MULTI_CONDITIONAL,		/* use table_modify_buffer_insert or
 								 * ExecForeignBatchInsert only if valid */
 } CopyInsertMethod;
 
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 9582503bb4..0f0ad30188 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -486,6 +486,7 @@ CopyHeaderChoice
 CopyInsertMethod
 CopyMethod
 CopyLogVerbosityChoice
+CopyModifyBufferFlushContext
 CopyMultiInsertBuffer
 CopyMultiInsertInfo
 CopyOnErrorChoice
-- 
2.34.1

Reply via email to