On Wed, Mar 27, 2024 at 1:42 PM Jeff Davis <[email protected]> wrote:
>
> On Wed, 2024-03-27 at 01:19 +0530, Bharath Rupireddy wrote:
> >
> > Similarly, with this new AM, the onus lies on the table AM
> > implementers to provide an implementation for these new AMs even if
> > they just do single inserts.
>
> Why not fall back to using the plain tuple_insert? Surely some table
> AMs might be simple and limited, and we shouldn't break them just
> because they don't implement the new APIs.
Hm. That might complicate table_modify_begin,
table_modify_buffer_insert and table_modify_end a bit. What do we put
in TableModifyState then? Do we create the bulk insert state
(BulkInsertStateData) outside? I think to give a better interface, can
we let TAM implementers support these new APIs in their own way? If
this sounds rather intrusive, we can just implement the fallback to
tuple_insert if these new API are not supported in the caller, for
example, do something like below in createas.c and matview.c.
Thoughts?
if (table_modify_buffer_insert() is defined)
table_modify_buffer_insert(...);
else
{
myState->bistate = GetBulkInsertState();
table_tuple_insert(...);
}
> > table_multi_insert needs to be there for sure as COPY ... FROM uses
> > it.
>
> After we have these new APIs fully in place and used by COPY, what will
> happen to those other APIs? Will they be deprecated or will there be a
> reason to keep them?
Deprecated perhaps?
Please find the attached v16 patches for further review.
--
Bharath Rupireddy
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com
From 85410b429917cf388c4b58883ddc304118c73143 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <[email protected]>
Date: Sun, 31 Mar 2024 15:34:16 +0000
Subject: [PATCH v16 1/2] Introduce new table modify access methods
---
src/backend/access/heap/heapam.c | 189 ++++++++++++++++++++++-
src/backend/access/heap/heapam_handler.c | 5 +
src/backend/access/table/tableamapi.c | 4 +
src/include/access/heapam.h | 41 +++++
src/include/access/tableam.h | 58 +++++++
src/tools/pgindent/typedefs.list | 3 +
6 files changed, 299 insertions(+), 1 deletion(-)
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index b661d9811e..69f8c597d8 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -64,6 +64,7 @@
#include "storage/standby.h"
#include "utils/datum.h"
#include "utils/inval.h"
+#include "utils/memutils.h"
#include "utils/relcache.h"
#include "utils/snapmgr.h"
#include "utils/spccache.h"
@@ -107,7 +108,8 @@ static int bottomup_sort_and_shrink(TM_IndexDeleteOp *delstate);
static XLogRecPtr log_heap_new_cid(Relation relation, HeapTuple tup);
static HeapTuple ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_required,
bool *copy);
-
+static void heap_modify_buffer_flush(TableModifyState *state);
+static void heap_modify_insert_end(TableModifyState *state);
/*
* Each tuple lock mode has a corresponding heavyweight lock, and one or two
@@ -2441,6 +2443,191 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples,
*insert_indexes = true;
}
+/*
+ * Initialize heap modify state.
+ */
+TableModifyState *
+heap_modify_begin(Relation rel, int modify_flags, CommandId cid,
+ int options)
+{
+ TableModifyState *state;
+ MemoryContext context;
+ MemoryContext oldcontext;
+
+ context = AllocSetContextCreate(CurrentMemoryContext,
+ "heap_modify memory context",
+ ALLOCSET_DEFAULT_SIZES);
+
+ oldcontext = MemoryContextSwitchTo(context);
+ state = palloc0(sizeof(TableModifyState));
+ state->rel = rel;
+ state->modify_flags = modify_flags;
+ state->mctx = context;
+ state->cid = cid;
+ state->options = options;
+ state->insert_indexes = false;
+ state->modify_end_cb = NULL; /* To be installed lazily */
+ MemoryContextSwitchTo(oldcontext);
+
+ return state;
+}
+
+/*
+ * Store passed-in tuple into in-memory buffered slots. When full, insert
+ * multiple tuples from the buffers into heap.
+ */
+void
+heap_modify_buffer_insert(TableModifyState *state,
+ TupleTableSlot *slot)
+{
+ TupleTableSlot *dstslot;
+ HeapInsertState *istate;
+ HeapMultiInsertState *mistate;
+ MemoryContext oldcontext;
+
+ oldcontext = MemoryContextSwitchTo(state->mctx);
+
+ /* First time through, initialize heap insert state */
+ if (state->data == NULL)
+ {
+ istate = (HeapInsertState *) palloc0(sizeof(HeapInsertState));
+ istate->bistate = NULL;
+ istate->mistate = NULL;
+ state->data = istate;
+
+ if ((state->modify_flags & TM_FLAG_MULTI_INSERTS) != 0)
+ {
+ mistate = (HeapMultiInsertState *) palloc0(sizeof(HeapMultiInsertState));
+ mistate->slots = (TupleTableSlot **) palloc0(sizeof(TupleTableSlot *) * HEAP_MAX_BUFFERED_SLOTS);
+ istate->mistate = mistate;
+ }
+
+ if ((state->modify_flags & TM_FLAG_BAS_BULKWRITE) != 0)
+ istate->bistate = GetBulkInsertState();
+
+ state->modify_end_cb = heap_modify_insert_end;
+ }
+
+ istate = (HeapInsertState *) state->data;
+ Assert(istate->mistate != NULL);
+ mistate = istate->mistate;
+ Assert(istate->bistate != NULL);
+
+ dstslot = mistate->slots[mistate->cur_slots];
+ if (dstslot == NULL)
+ {
+ /*
+ * We use virtual tuple slots buffered slots for leveraging the
+ * optimization it provides to minimize physical data copying. The
+ * virtual slot gets materialized when we copy (via below
+ * ExecCopySlot) the tuples from the source slot which can be of any
+ * type. This way, it is ensured that the tuple storage doesn't depend
+ * on external memory, because all the datums that aren't passed by
+ * value are copied into the slot's memory context.
+ */
+ dstslot = MakeTupleTableSlot(RelationGetDescr(state->rel),
+ &TTSOpsVirtual);
+ mistate->slots[mistate->cur_slots] = dstslot;
+ }
+
+ ExecClearTuple(dstslot);
+ ExecCopySlot(dstslot, slot);
+
+ mistate->cur_slots++;
+
+ /*
+ * Memory allocated for the whole tuple is in slot's memory context, so
+ * use it keep track of the total space occupied by all buffered tuples.
+ */
+ if (TTS_SHOULDFREE(dstslot))
+ mistate->cur_size += MemoryContextMemAllocated(dstslot->tts_mcxt, false);
+
+ if (mistate->cur_slots >= HEAP_MAX_BUFFERED_SLOTS ||
+ mistate->cur_size >= HEAP_MAX_BUFFERED_BYTES)
+ heap_modify_buffer_flush(state);
+
+ MemoryContextSwitchTo(oldcontext);
+}
+
+/*
+ * Insert multiple tuples from in-memory buffered slots into heap.
+ */
+static void
+heap_modify_buffer_flush(TableModifyState *state)
+{
+ HeapInsertState *istate;
+ HeapMultiInsertState *mistate;
+ MemoryContext oldcontext;
+
+ /* Quick exit if we haven't inserted anything yet */
+ if (state->data == NULL)
+ return;
+
+ istate = (HeapInsertState *) state->data;
+ Assert(istate->mistate != NULL);
+ mistate = istate->mistate;
+ Assert(istate->bistate != NULL);
+
+ if (mistate->cur_slots == 0)
+ return;
+
+ oldcontext = MemoryContextSwitchTo(state->mctx);
+
+ heap_multi_insert(state->rel, mistate->slots, mistate->cur_slots,
+ state->cid, state->options, istate->bistate,
+ &state->insert_indexes);
+
+ mistate->cur_slots = 0;
+ mistate->cur_size = 0;
+
+ MemoryContextSwitchTo(oldcontext);
+}
+
+/*
+ * Heap insert specific callback used for performing work at the end like
+ * flushing buffered tuples if any, cleaning up the insert state and buffered
+ * slots.
+ */
+static void
+heap_modify_insert_end(TableModifyState *state)
+{
+ HeapInsertState *istate;
+
+ /* Quick exit if we haven't inserted anything yet */
+ if (state->data == NULL)
+ return;
+
+ istate = (HeapInsertState *) state->data;
+
+ if (istate->mistate != NULL)
+ {
+ HeapMultiInsertState *mistate = istate->mistate;
+
+ heap_modify_buffer_flush(state);
+
+ Assert(mistate->cur_slots == 0 &&
+ mistate->cur_size == 0);
+
+ for (int i = 0; i < HEAP_MAX_BUFFERED_SLOTS && mistate->slots[i] != NULL; i++)
+ ExecDropSingleTupleTableSlot(mistate->slots[i]);
+ }
+
+ if (istate->bistate != NULL)
+ FreeBulkInsertState(istate->bistate);
+}
+
+/*
+ * Clean heap modify state.
+ */
+void
+heap_modify_end(TableModifyState *state)
+{
+ if (state->modify_end_cb != NULL)
+ state->modify_end_cb(state);
+
+ MemoryContextDelete(state->mctx);
+}
+
/*
* simple_heap_insert - insert a tuple
*
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index 41a4bb0981..3d38da635d 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -2649,6 +2649,11 @@ static const TableAmRoutine heapam_methods = {
.tuple_insert_speculative = heapam_tuple_insert_speculative,
.tuple_complete_speculative = heapam_tuple_complete_speculative,
.multi_insert = heap_multi_insert,
+
+ .tuple_modify_begin = heap_modify_begin,
+ .tuple_modify_buffer_insert = heap_modify_buffer_insert,
+ .tuple_modify_end = heap_modify_end,
+
.tuple_delete = heapam_tuple_delete,
.tuple_update = heapam_tuple_update,
.tuple_lock = heapam_tuple_lock,
diff --git a/src/backend/access/table/tableamapi.c b/src/backend/access/table/tableamapi.c
index d9e23ef317..80d923bbdc 100644
--- a/src/backend/access/table/tableamapi.c
+++ b/src/backend/access/table/tableamapi.c
@@ -66,6 +66,10 @@ GetTableAmRoutine(Oid amhandler)
Assert(routine->tuple_insert != NULL);
+ Assert(routine->tuple_modify_begin != NULL);
+ Assert(routine->tuple_modify_buffer_insert != NULL);
+ Assert(routine->tuple_modify_end != NULL);
+
/*
* Could be made optional, but would require throwing error during
* parse-analysis.
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 32a3fbce96..4adfc1fb35 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -233,6 +233,36 @@ htsv_get_valid_status(int status)
return (HTSV_Result) status;
}
+/*
+ * Maximum number of slots that multi-insert buffers can hold.
+ *
+ * Caution: Don't make this too big, as we could end up with this many tuples
+ * stored in multi insert buffer.
+ */
+#define HEAP_MAX_BUFFERED_SLOTS 1000
+
+/* Maximum size of all tuples that multi-insert buffers can hold */
+#define HEAP_MAX_BUFFERED_BYTES 65535
+
+typedef struct HeapMultiInsertState
+{
+ /* Array of buffered slots */
+ TupleTableSlot **slots;
+
+ /* Number of buffered slots currently held */
+ int cur_slots;
+
+ /* Approximate size of all tuples currently held in buffered slots */
+ Size cur_size;
+} HeapMultiInsertState;
+
+typedef struct HeapInsertState
+{
+ struct BulkInsertStateData *bistate;
+ HeapMultiInsertState *mistate;
+} HeapInsertState;
+
+
/* ----------------
* function prototypes for heap access method
*
@@ -283,6 +313,17 @@ extern void heap_insert(Relation relation, HeapTuple tup, CommandId cid,
extern void heap_multi_insert(Relation relation, struct TupleTableSlot **slots,
int ntuples, CommandId cid, int options,
BulkInsertState bistate, bool *insert_indexes);
+
+extern TableModifyState *heap_modify_begin(Relation rel,
+ int modify_flags,
+ CommandId cid,
+ int options);
+
+extern void heap_modify_buffer_insert(TableModifyState *state,
+ TupleTableSlot *slot);
+
+extern void heap_modify_end(TableModifyState *state);
+
extern TM_Result heap_delete(Relation relation, ItemPointer tid,
CommandId cid, Snapshot crosscheck, int options,
struct TM_FailureData *tmfd, bool changingPart,
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index cf76fc29d4..de50f51078 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -248,6 +248,35 @@ typedef struct TM_IndexDeleteOp
TM_IndexStatus *status;
} TM_IndexDeleteOp;
+/* Table modify flags */
+
+/* Use multi inserts, i.e. buffer multiple tuples and insert them at once */
+#define TM_FLAG_MULTI_INSERTS 0x000001
+
+/* Use BAS_BULKWRITE buffer access strategy */
+#define TM_FLAG_BAS_BULKWRITE 0x000002
+
+struct TableModifyState;
+
+/* Table AM specific callback that gets called in table_modify_end() */
+typedef void (*TableModifyEndCP) (struct TableModifyState *state);
+
+/* Holds table modify state */
+typedef struct TableModifyState
+{
+ Relation rel;
+ int modify_flags;
+ MemoryContext mctx;
+ CommandId cid;
+ int options;
+ bool insert_indexes;
+
+ /* Table AM specific data starts here */
+ void *data;
+
+ TableModifyEndCP modify_end_cb;
+} TableModifyState;
+
/* "options" flag bits for table_tuple_insert */
/* TABLE_INSERT_SKIP_WAL was 0x0001; RelationNeedsWAL() now governs */
#define TABLE_INSERT_SKIP_FSM 0x0002
@@ -534,6 +563,16 @@ typedef struct TableAmRoutine
CommandId cid, int options, struct BulkInsertStateData *bistate,
bool *insert_indexes);
+ TableModifyState *(*tuple_modify_begin) (Relation rel,
+ int modify_flags,
+ CommandId cid,
+ int options);
+
+ void (*tuple_modify_buffer_insert) (TableModifyState *state,
+ TupleTableSlot *slot);
+
+ void (*tuple_modify_end) (TableModifyState *state);
+
/* see table_tuple_delete() for reference about parameters */
TM_Result (*tuple_delete) (Relation rel,
ItemPointer tid,
@@ -1464,6 +1503,25 @@ table_multi_insert(Relation rel, TupleTableSlot **slots, int nslots,
cid, options, bistate, insert_indexes);
}
+static inline TableModifyState *
+table_modify_begin(Relation rel, int modify_flags, CommandId cid, int options)
+{
+ return rel->rd_tableam->tuple_modify_begin(rel, modify_flags,
+ cid, options);
+}
+
+static inline void
+table_modify_buffer_insert(TableModifyState *state, TupleTableSlot *slot)
+{
+ state->rel->rd_tableam->tuple_modify_buffer_insert(state, slot);
+}
+
+static inline void
+table_modify_end(TableModifyState *state)
+{
+ state->rel->rd_tableam->tuple_modify_end(state);
+}
+
/*
* Delete a tuple (and optionally lock the last tuple version).
*
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index a8d7bed411..f77c322709 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1122,6 +1122,8 @@ HeadlineJsonState
HeadlineParsedText
HeadlineWordEntry
HeapCheckContext
+HeapInsertState
+HeapMultiInsertState
HeapPageFreeze
HeapScanDesc
HeapTuple
@@ -2809,6 +2811,7 @@ TableFuncScan
TableFuncScanState
TableInfo
TableLikeClause
+TableModifyState
TableSampleClause
TableScanDesc
TableScanDescData
--
2.34.1
From 32050367825d5f8dbb1330cc4f8ef7818eb544ed Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <[email protected]>
Date: Sun, 31 Mar 2024 15:34:57 +0000
Subject: [PATCH v16 2/2] Optimize CTAS, CMV, RMV with multi inserts
---
src/backend/commands/createas.c | 27 +++++++++------------------
src/backend/commands/matview.c | 26 +++++++++-----------------
2 files changed, 18 insertions(+), 35 deletions(-)
diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c
index afd3dace07..00c1271f93 100644
--- a/src/backend/commands/createas.c
+++ b/src/backend/commands/createas.c
@@ -53,9 +53,7 @@ typedef struct
/* These fields are filled by intorel_startup: */
Relation rel; /* relation to write to */
ObjectAddress reladdr; /* address of rel, for ExecCreateTableAs */
- CommandId output_cid; /* cmin to insert in output tuples */
- int ti_options; /* table_tuple_insert performance options */
- BulkInsertState bistate; /* bulk insert state */
+ TableModifyState *mstate; /* table insert state */
} DR_intorel;
/* utility functions for CTAS definition creation */
@@ -552,17 +550,19 @@ intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
*/
myState->rel = intoRelationDesc;
myState->reladdr = intoRelationAddr;
- myState->output_cid = GetCurrentCommandId(true);
- myState->ti_options = TABLE_INSERT_SKIP_FSM;
/*
* If WITH NO DATA is specified, there is no need to set up the state for
* bulk inserts as there are no tuples to insert.
*/
if (!into->skipData)
- myState->bistate = GetBulkInsertState();
+ myState->mstate = table_modify_begin(intoRelationDesc,
+ TM_FLAG_MULTI_INSERTS |
+ TM_FLAG_BAS_BULKWRITE,
+ GetCurrentCommandId(true),
+ TABLE_INSERT_SKIP_FSM);
else
- myState->bistate = NULL;
+ myState->mstate = NULL;
/*
* Valid smgr_targblock implies something already wrote to the relation.
@@ -578,7 +578,6 @@ static bool
intorel_receive(TupleTableSlot *slot, DestReceiver *self)
{
DR_intorel *myState = (DR_intorel *) self;
- bool insertIndexes;
/* Nothing to insert if WITH NO DATA is specified. */
if (!myState->into->skipData)
@@ -591,12 +590,7 @@ intorel_receive(TupleTableSlot *slot, DestReceiver *self)
* would not be cheap either. This also doesn't allow accessing per-AM
* data (say a tuple's xmin), but since we don't do that here...
*/
- table_tuple_insert(myState->rel,
- slot,
- myState->output_cid,
- myState->ti_options,
- myState->bistate,
- &insertIndexes);
+ table_modify_buffer_insert(myState->mstate, slot);
}
/* We know this is a newly created relation, so there are no indexes */
@@ -614,10 +608,7 @@ intorel_shutdown(DestReceiver *self)
IntoClause *into = myState->into;
if (!into->skipData)
- {
- FreeBulkInsertState(myState->bistate);
- table_finish_bulk_insert(myState->rel, myState->ti_options);
- }
+ table_modify_end(myState->mstate);
/* close rel, but keep lock until commit */
table_close(myState->rel, NoLock);
diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c
index 9ec13d0984..f03aa1cff3 100644
--- a/src/backend/commands/matview.c
+++ b/src/backend/commands/matview.c
@@ -48,9 +48,7 @@ typedef struct
Oid transientoid; /* OID of new heap into which to store */
/* These fields are filled by transientrel_startup: */
Relation transientrel; /* relation to write to */
- CommandId output_cid; /* cmin to insert in output tuples */
- int ti_options; /* table_tuple_insert performance options */
- BulkInsertState bistate; /* bulk insert state */
+ TableModifyState *mstate; /* table insert state */
} DR_transientrel;
static int matview_maintenance_depth = 0;
@@ -458,9 +456,12 @@ transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
* Fill private fields of myState for use by later routines
*/
myState->transientrel = transientrel;
- myState->output_cid = GetCurrentCommandId(true);
- myState->ti_options = TABLE_INSERT_SKIP_FSM | TABLE_INSERT_FROZEN;
- myState->bistate = GetBulkInsertState();
+ myState->mstate = table_modify_begin(transientrel,
+ TM_FLAG_MULTI_INSERTS |
+ TM_FLAG_BAS_BULKWRITE,
+ GetCurrentCommandId(true),
+ TABLE_INSERT_SKIP_FSM |
+ TABLE_INSERT_FROZEN);
/*
* Valid smgr_targblock implies something already wrote to the relation.
@@ -476,7 +477,6 @@ static bool
transientrel_receive(TupleTableSlot *slot, DestReceiver *self)
{
DR_transientrel *myState = (DR_transientrel *) self;
- bool insertIndexes;
/*
* Note that the input slot might not be of the type of the target
@@ -486,13 +486,7 @@ transientrel_receive(TupleTableSlot *slot, DestReceiver *self)
* cheap either. This also doesn't allow accessing per-AM data (say a
* tuple's xmin), but since we don't do that here...
*/
-
- table_tuple_insert(myState->transientrel,
- slot,
- myState->output_cid,
- myState->ti_options,
- myState->bistate,
- &insertIndexes);
+ table_modify_buffer_insert(myState->mstate, slot);
/* We know this is a newly created relation, so there are no indexes */
@@ -507,9 +501,7 @@ transientrel_shutdown(DestReceiver *self)
{
DR_transientrel *myState = (DR_transientrel *) self;
- FreeBulkInsertState(myState->bistate);
-
- table_finish_bulk_insert(myState->transientrel, myState->ti_options);
+ table_modify_end(myState->mstate);
/* close transientrel, but keep lock until commit */
table_close(myState->transientrel, NoLock);
--
2.34.1