From c8277e6f5e9a72baebe993b2241d34a7d427473d Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <rupiredd@amazon.com>
Date: Wed, 30 Oct 2024 17:13:20 +0000
Subject: [PATCH v25 1/3] Introduce new table AM for multi-inserts

Until now, it's the COPY ... FROM command using multi inserts
(i.e. buffer some tuples and inserts them to table at once).
Basic idea of multi-inserts is that less WAL and reduced buffer
locking. Multi-inserts is faster than calling heap_insert() in a
loop, because when multiple tuples can be inserted on a single
page, we can write just a single WAL record covering all of them,
and only need to lock/unlock the page once.

Various other commands can benefit from this multi-inserts logic
[Reusable].

Also, there's a need to have these multi-inserts AMs (Access
Methods) as scan-like API [Usability]. With this, various table
AMs can define their own buffering and flushing strategy
[Flexibility] based on the way they store the data in the
underlying storage (e.g. columnar).

Author: Bharath Rupireddy
Reviewed-by: Jeff Davis
Discussion: https://www.postgresql.org/message-id/20200924024128.kyk3r5g7dnu3fxxx@alap3.anarazel.de
Discussion: https://www.postgresql.org/message-id/CALj2ACVi9eTRYR%3Dgdca5wxtj3Kk_9q9qVccxsS1hngTGOCjPwQ%40mail.gmail.com
Discussion: https://www.postgresql.org/message-id/8633171cb034aafc260fdf37df04b6c779aa1e2f.camel%40j-davis.com
---
 src/backend/access/heap/heapam.c         | 211 ++++++++++++++++++++++-
 src/backend/access/heap/heapam_handler.c |   6 +
 src/backend/access/table/tableamapi.c    |   5 +
 src/include/access/heapam.h              |  39 +++++
 src/include/access/tableam.h             |  81 +++++++++
 src/tools/pgindent/typedefs.list         |   3 +
 6 files changed, 344 insertions(+), 1 deletion(-)

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 1748eafa10..69b21cf12c 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -50,6 +50,7 @@
 #include "storage/procarray.h"
 #include "utils/datum.h"
 #include "utils/inval.h"
+#include "utils/memutils.h"
 #include "utils/spccache.h"
 
 
@@ -102,7 +103,7 @@ static int	bottomup_sort_and_shrink(TM_IndexDeleteOp *delstate);
 static XLogRecPtr log_heap_new_cid(Relation relation, HeapTuple tup);
 static HeapTuple ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_required,
 										bool *copy);
-
+static void heap_modify_insert_end(TableModifyState *state);
 
 /*
  * Each tuple lock mode has a corresponding heavyweight lock, and one or two
@@ -2603,6 +2604,214 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples,
 	pgstat_count_heap_insert(relation, ntuples);
 }
 
+/*
+ * Initialize heap modify state.
+ */
+TableModifyState *
+heap_modify_begin(Relation rel,
+				  CommandId cid,
+				  int options,
+				  TableModifyBufferFlushCb buffer_flush_cb,
+				  void *buffer_flush_ctx)
+{
+	TableModifyState *state;
+	MemoryContext context;
+	MemoryContext oldcontext;
+
+	context = AllocSetContextCreate(TopTransactionContext,
+									"heap_modify memory context",
+									ALLOCSET_DEFAULT_SIZES);
+
+	oldcontext = MemoryContextSwitchTo(context);
+	state = palloc(sizeof(TableModifyState));
+	state->rel = rel;
+	state->cid = cid;
+	state->options = options;
+	state->mem_ctx = context;
+	state->buffer_flush_cb = buffer_flush_cb;
+	state->buffer_flush_ctx = buffer_flush_ctx;
+	state->data = NULL;			/* To be set lazily */
+	MemoryContextSwitchTo(oldcontext);
+
+	return state;
+}
+
+/*
+ * Store passed-in tuple into in-memory buffered slots. When full, insert
+ * multiple tuples from the buffers into heap.
+ */
+void
+heap_modify_buffer_insert(TableModifyState *state,
+						  TupleTableSlot *slot)
+{
+	TupleTableSlot *dstslot;
+	HeapInsertState *istate;
+	HeapMultiInsertState *mistate;
+	MemoryContext oldcontext;
+
+	oldcontext = MemoryContextSwitchTo(state->mem_ctx);
+
+	/* First time through, initialize heap insert state */
+	if (state->data == NULL)
+	{
+		istate = (HeapInsertState *) palloc(sizeof(HeapInsertState));
+		istate->bistate = NULL;
+		istate->mistate = NULL;
+		state->data = istate;
+		mistate =
+			(HeapMultiInsertState *) palloc(sizeof(HeapMultiInsertState));
+		mistate->slots =
+			(TupleTableSlot **) palloc0(sizeof(TupleTableSlot *) * HEAP_MAX_BUFFERED_SLOTS);
+		mistate->cur_slots = 0;
+		istate->mistate = mistate;
+
+		/*
+		 * heap_multi_insert() can leak memory. So switch to this memory
+		 * context before every heap_multi_insert() call and reset when
+		 * finished.
+		 */
+		mistate->mem_ctx = AllocSetContextCreate(CurrentMemoryContext,
+												 "heap_multi_insert memory context",
+												 ALLOCSET_DEFAULT_SIZES);
+
+		if ((state->options & HEAP_INSERT_BAS_BULKWRITE) != 0)
+			istate->bistate = GetBulkInsertState();
+	}
+
+	istate = (HeapInsertState *) state->data;
+	Assert(istate->mistate != NULL);
+	mistate = istate->mistate;
+	dstslot = mistate->slots[mistate->cur_slots];
+
+	if (dstslot == NULL)
+	{
+		/*
+		 * We use virtual tuple slots buffered slots for leveraging the
+		 * optimization it provides to minimize physical data copying. The
+		 * virtual slot gets materialized when we copy (via below
+		 * ExecCopySlot) the tuples from the source slot which can be of any
+		 * type. This way, it is ensured that the tuple storage doesn't depend
+		 * on external memory, because all the datums that aren't passed by
+		 * value are copied into the slot's memory context.
+		 */
+		dstslot = MakeTupleTableSlot(RelationGetDescr(state->rel),
+									 &TTSOpsVirtual);
+
+		mistate->slots[mistate->cur_slots] = dstslot;
+	}
+
+	Assert(TTS_IS_VIRTUAL(dstslot));
+
+	/*
+	 * Note that the copy clears the previous destination slot contents, so no
+	 * need to explicitly ExecClearTuple() here.
+	 */
+	ExecCopySlot(dstslot, slot);
+
+	mistate->cur_slots++;
+
+	if (mistate->cur_slots >= HEAP_MAX_BUFFERED_SLOTS)
+		heap_modify_buffer_flush(state);
+
+	MemoryContextSwitchTo(oldcontext);
+}
+
+/*
+ * Insert multiple tuples from in-memory buffered slots into heap.
+ */
+void
+heap_modify_buffer_flush(TableModifyState *state)
+{
+	HeapInsertState *istate;
+	HeapMultiInsertState *mistate;
+	MemoryContext oldcontext;
+
+	/* Quick exit if we haven't inserted anything yet */
+	if (state->data == NULL)
+		return;
+
+	istate = (HeapInsertState *) state->data;
+	Assert(istate->mistate != NULL);
+	mistate = istate->mistate;
+
+	/* Quick exit if we have flushed already */
+	if (mistate->cur_slots == 0)
+		return;
+
+	/*
+	 * heap_multi_insert() can leak memory, so switch to short-lived memory
+	 * context before calling it.
+	 */
+	oldcontext = MemoryContextSwitchTo(mistate->mem_ctx);
+	heap_multi_insert(state->rel,
+					  mistate->slots,
+					  mistate->cur_slots,
+					  state->cid,
+					  state->options,
+					  istate->bistate);
+	MemoryContextSwitchTo(oldcontext);
+	MemoryContextReset(mistate->mem_ctx);
+
+	/*
+	 * Invoke caller-supplied buffer flush callback after inserting rows from
+	 * the buffers to heap.
+	 */
+	if (state->buffer_flush_cb != NULL)
+	{
+		for (int i = 0; i < mistate->cur_slots; i++)
+		{
+			state->buffer_flush_cb(state->buffer_flush_ctx,
+								   mistate->slots[i]);
+		}
+	}
+
+	mistate->cur_slots = 0;
+}
+
+/*
+ * Heap insert specific function used for performing work at the end like
+ * flushing remaining buffered tuples, cleaning up the insert state and tuple
+ * table slots used for buffered tuples etc.
+ */
+static void
+heap_modify_insert_end(TableModifyState *state)
+{
+	HeapInsertState *istate;
+
+	/* Quick exit if we haven't inserted anything yet */
+	if (state->data == NULL)
+		return;
+
+	istate = (HeapInsertState *) state->data;
+
+	if (istate->mistate != NULL)
+	{
+		HeapMultiInsertState *mistate = istate->mistate;
+
+		heap_modify_buffer_flush(state);
+
+		Assert(mistate->cur_slots == 0);
+
+		for (int i = 0; i < HEAP_MAX_BUFFERED_SLOTS && mistate->slots[i] != NULL; i++)
+			ExecDropSingleTupleTableSlot(mistate->slots[i]);
+
+		MemoryContextDelete(mistate->mem_ctx);
+	}
+
+	if (istate->bistate != NULL)
+		FreeBulkInsertState(istate->bistate);
+}
+
+/*
+ * Clean heap modify state.
+ */
+void
+heap_modify_end(TableModifyState *state)
+{
+	heap_modify_insert_end(state);
+	MemoryContextDelete(state->mem_ctx);
+}
+
 /*
  *	simple_heap_insert - insert a tuple
  *
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index a8d95e0f1c..d2ef6b4b78 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -2644,6 +2644,12 @@ static const TableAmRoutine heapam_methods = {
 	.tuple_insert_speculative = heapam_tuple_insert_speculative,
 	.tuple_complete_speculative = heapam_tuple_complete_speculative,
 	.multi_insert = heap_multi_insert,
+
+	.tuple_modify_begin = heap_modify_begin,
+	.tuple_modify_buffer_insert = heap_modify_buffer_insert,
+	.tuple_modify_buffer_flush = heap_modify_buffer_flush,
+	.tuple_modify_end = heap_modify_end,
+
 	.tuple_delete = heapam_tuple_delete,
 	.tuple_update = heapam_tuple_update,
 	.tuple_lock = heapam_tuple_lock,
diff --git a/src/backend/access/table/tableamapi.c b/src/backend/access/table/tableamapi.c
index e9b598256f..772f29b1b5 100644
--- a/src/backend/access/table/tableamapi.c
+++ b/src/backend/access/table/tableamapi.c
@@ -97,6 +97,11 @@ GetTableAmRoutine(Oid amhandler)
 	Assert(routine->scan_sample_next_block != NULL);
 	Assert(routine->scan_sample_next_tuple != NULL);
 
+	Assert(routine->tuple_modify_begin != NULL);
+	Assert(routine->tuple_modify_buffer_insert != NULL);
+	Assert(routine->tuple_modify_buffer_flush != NULL);
+	Assert(routine->tuple_modify_end != NULL);
+
 	return routine;
 }
 
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 96cf82f97b..a9722ce947 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -37,6 +37,7 @@
 #define HEAP_INSERT_FROZEN		TABLE_INSERT_FROZEN
 #define HEAP_INSERT_NO_LOGICAL	TABLE_INSERT_NO_LOGICAL
 #define HEAP_INSERT_SPECULATIVE 0x0010
+#define HEAP_INSERT_BAS_BULKWRITE	TABLE_INSERT_BAS_BULKWRITE
 
 /* "options" flag bits for heap_page_prune_and_freeze */
 #define HEAP_PAGE_PRUNE_MARK_UNUSED_NOW		(1 << 0)
@@ -272,6 +273,33 @@ typedef enum
 	PRUNE_VACUUM_CLEANUP,		/* VACUUM 2nd heap pass */
 } PruneReason;
 
+/*
+ * Maximum number of slots that multi-insert buffers can hold.
+ *
+ * Caution: Don't make this too big, as we could end up with this many tuples
+ * stored in multi insert buffer.
+ */
+#define HEAP_MAX_BUFFERED_SLOTS		1000
+
+typedef struct HeapMultiInsertState
+{
+	/* Array of buffered slots */
+	TupleTableSlot **slots;
+
+	/* Number of buffered slots currently held */
+	int			cur_slots;
+
+	/* Memory context for dealing with multi inserts */
+	MemoryContext mem_ctx;
+} HeapMultiInsertState;
+
+typedef struct HeapInsertState
+{
+	struct BulkInsertStateData *bistate;
+	HeapMultiInsertState *mistate;
+} HeapInsertState;
+
+
 /* ----------------
  *		function prototypes for heap access method
  *
@@ -322,6 +350,17 @@ extern void heap_insert(Relation relation, HeapTuple tup, CommandId cid,
 extern void heap_multi_insert(Relation relation, struct TupleTableSlot **slots,
 							  int ntuples, CommandId cid, int options,
 							  BulkInsertState bistate);
+
+extern TableModifyState *heap_modify_begin(Relation rel,
+										   CommandId cid,
+										   int options,
+										   TableModifyBufferFlushCb buffer_flush_cb,
+										   void *buffer_flush_ctx);
+extern void heap_modify_buffer_insert(TableModifyState *state,
+									  TupleTableSlot *slot);
+extern void heap_modify_buffer_flush(TableModifyState *state);
+extern void heap_modify_end(TableModifyState *state);
+
 extern TM_Result heap_delete(Relation relation, ItemPointer tid,
 							 CommandId cid, Snapshot crosscheck, bool wait,
 							 struct TM_FailureData *tmfd, bool changingPart);
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index adb478a93c..57b71eef38 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -254,11 +254,42 @@ typedef struct TM_IndexDeleteOp
 	TM_IndexStatus *status;
 } TM_IndexDeleteOp;
 
+struct TableModifyState;
+
+/* Callback invoked upon flushing each buffered tuple */
+typedef void (*TableModifyBufferFlushCb) (void *context,
+										  TupleTableSlot *slot);
+
+/* Holds table modify state */
+typedef struct TableModifyState
+{
+	/* These fields are used for inserts for now */
+
+	Relation	rel;			/* Relation to insert to */
+	CommandId	cid;			/* Command ID for insert */
+	int			options;		/* TABLE_INSERT options */
+
+	/* Memory context for dealing with modify state variables */
+	MemoryContext mem_ctx;
+
+	/* Flush callback and its context used for multi inserts */
+	TableModifyBufferFlushCb buffer_flush_cb;
+	void	   *buffer_flush_ctx;
+
+	/* Table AM specific data */
+	void	   *data;
+} TableModifyState;
+
 /* "options" flag bits for table_tuple_insert */
 /* TABLE_INSERT_SKIP_WAL was 0x0001; RelationNeedsWAL() now governs */
 #define TABLE_INSERT_SKIP_FSM		0x0002
 #define TABLE_INSERT_FROZEN			0x0004
 #define TABLE_INSERT_NO_LOGICAL		0x0008
+/*
+ * Use BAS_BULKWRITE buffer access strategy. 0x0010 is for
+ * HEAP_INSERT_SPECULATIVE.
+ */
+#define TABLE_INSERT_BAS_BULKWRITE	0x0020
 
 /* flag bits for table_tuple_lock */
 /* Follow tuples whose update is in progress if lock modes don't conflict  */
@@ -577,6 +608,21 @@ typedef struct TableAmRoutine
 	void		(*finish_bulk_insert) (Relation rel, int options);
 
 
+	/* ------------------------------------------------------------------------
+	 * Table Modify related functions.
+	 * ------------------------------------------------------------------------
+	 */
+	TableModifyState *(*tuple_modify_begin) (Relation rel,
+											 CommandId cid,
+											 int options,
+											 TableModifyBufferFlushCb buffer_flush_cb,
+											 void *buffer_flush_ctx);
+	void		(*tuple_modify_buffer_insert) (TableModifyState *state,
+											   TupleTableSlot *slot);
+	void		(*tuple_modify_buffer_flush) (TableModifyState *state);
+	void		(*tuple_modify_end) (TableModifyState *state);
+
+
 	/* ------------------------------------------------------------------------
 	 * DDL related functionality.
 	 * ------------------------------------------------------------------------
@@ -1608,6 +1654,41 @@ table_finish_bulk_insert(Relation rel, int options)
 		rel->rd_tableam->finish_bulk_insert(rel, options);
 }
 
+/* ------------------------------------------------------------------------
+ * Table Modify related functions.
+ * ------------------------------------------------------------------------
+ */
+static inline TableModifyState *
+table_modify_begin(Relation rel,
+				   CommandId cid,
+				   int options,
+				   TableModifyBufferFlushCb buffer_flush_cb,
+				   void *buffer_flush_ctx)
+{
+	return rel->rd_tableam->tuple_modify_begin(rel,
+											   cid,
+											   options,
+											   buffer_flush_cb,
+											   buffer_flush_ctx);
+}
+
+static inline void
+table_modify_buffer_insert(TableModifyState *state, TupleTableSlot *slot)
+{
+	state->rel->rd_tableam->tuple_modify_buffer_insert(state, slot);
+}
+
+static inline void
+table_modify_buffer_flush(TableModifyState *state)
+{
+	state->rel->rd_tableam->tuple_modify_buffer_flush(state);
+}
+
+static inline void
+table_modify_end(TableModifyState *state)
+{
+	state->rel->rd_tableam->tuple_modify_end(state);
+}
 
 /* ------------------------------------------------------------------------
  * DDL related functionality.
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 171a7dd5d2..e7ddf29c16 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1147,6 +1147,8 @@ HeadlineJsonState
 HeadlineParsedText
 HeadlineWordEntry
 HeapCheckContext
+HeapInsertState
+HeapMultiInsertState
 HeapPageFreeze
 HeapScanDesc
 HeapTuple
@@ -2873,6 +2875,7 @@ TableFuncScanState
 TableFuncType
 TableInfo
 TableLikeClause
+TableModifyState
 TableSampleClause
 TableScanDesc
 TableScanDescData
-- 
2.40.1

