On Thu, Oct 22, 2020 at 01:29:53PM +0100, Simon Riggs wrote:
> On Fri, 16 Oct 2020 at 22:05, Justin Pryzby <pry...@telsasoft.com> wrote:
> 
> > > > I made this conditional on BEGIN BULK/SET bulk, so I'll solicit 
> > > > comments on that.
> 
> I think it would be better if this was self-tuning. So that we don't
> allocate a bulkinsert state until we've done say 100 (?) rows
> inserted.

I made it an optional, non-default behavior in response to the legitimate
concern for performance regression for the cases where a loader needs to be as
fast as possible - as compared with our case, where we want instead to optimize
for our reports by making the loaders responsible for their own writes, rather
than leaving behind many dirty pages, and clobbering the cache, too.

Also, INSERT SELECT doesn't immediately help us (telsasoft), since we use
INSERT .. VALUES () .. ON CONFLICT.  This would handle that case, which is
great, even though that wasn't a design goal.  It could also be an integer GUC
to allow configuring the size of the ring buffer.

> You should also use table_multi_insert() since that will give further
> performance gains by reducing block access overheads. Switching from
> single row to multi-row should also only happen once we've loaded a
> few rows, so we don't introduce overahads for smaller SQL statements.

Good idea...multi_insert (which reduces the overhead of individual inserts) is
mostly independent from BulkInsert state (which uses a ring-buffer to avoid
dirtying the cache).  I made this 0002.

This makes INSERT SELECT several times faster, and not clobber the cache too.

Time: 4700.606 ms (00:04.701)
   123 |          1
    37 |          2
    20 |          3
    11 |          4
  4537 |          5
 11656 |           

Time: 1125.302 ms (00:01.125)
  2171 |          1
    37 |          2
    20 |          3
    11 |          4
   111 |          5
 14034 |           

When enabled, this passes nearly all regression tests, and all but 2 of the
changes are easily understood.  The 2nd patch still needs work.

-- 
Justin
>From 16057608bd58f54a5e365433ded18757aca8ec48 Mon Sep 17 00:00:00 2001
From: Justin Pryzby <pryz...@telsasoft.com>
Date: Fri, 8 May 2020 02:17:32 -0500
Subject: [PATCH v5 1/2] Allow INSERT SELECT to use a BulkInsertState

---
 src/backend/executor/nodeModifyTable.c | 22 ++++++++++++++++++++--
 src/backend/parser/gram.y              |  7 ++++++-
 src/backend/tcop/utility.c             |  4 ++++
 src/backend/utils/misc/guc.c           | 11 +++++++++++
 src/include/executor/nodeModifyTable.h |  2 ++
 src/include/nodes/execnodes.h          |  3 +++
 src/include/parser/kwlist.h            |  1 +
 src/test/regress/expected/insert.out   | 23 +++++++++++++++++++++++
 src/test/regress/sql/insert.sql        | 13 +++++++++++++
 9 files changed, 83 insertions(+), 3 deletions(-)

diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c
index 29e07b7228..26ff964105 100644
--- a/src/backend/executor/nodeModifyTable.c
+++ b/src/backend/executor/nodeModifyTable.c
@@ -72,6 +72,8 @@ static TupleTableSlot *ExecPrepareTupleRouting(ModifyTableState *mtstate,
 											   ResultRelInfo *targetRelInfo,
 											   TupleTableSlot *slot,
 											   ResultRelInfo **partRelInfo);
+/* guc */
+bool insert_in_bulk = false;
 
 /*
  * Verify that the tuples to be produced by INSERT or UPDATE match the
@@ -594,7 +596,7 @@ ExecInsert(ModifyTableState *mtstate,
 			table_tuple_insert_speculative(resultRelationDesc, slot,
 										   estate->es_output_cid,
 										   0,
-										   NULL,
+										   NULL, /* Bulk insert not supported */
 										   specToken);
 
 			/* insert index entries for tuple */
@@ -631,10 +633,17 @@ ExecInsert(ModifyTableState *mtstate,
 		}
 		else
 		{
+			if (proute && mtstate->prevResultRelInfo != resultRelInfo)
+			{
+				if (mtstate->bistate)
+					ReleaseBulkInsertStatePin(mtstate->bistate);
+				mtstate->prevResultRelInfo = resultRelInfo;
+			}
+
 			/* insert the tuple normally */
 			table_tuple_insert(resultRelationDesc, slot,
 							   estate->es_output_cid,
-							   0, NULL);
+							   0, mtstate->bistate);
 
 			/* insert index entries for tuple */
 			if (resultRelInfo->ri_NumIndices > 0)
@@ -2232,6 +2241,9 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
 
 	mtstate->mt_arowmarks = (List **) palloc0(sizeof(List *) * nplans);
 	mtstate->mt_nplans = nplans;
+	mtstate->bistate = NULL;
+	if (operation == CMD_INSERT && insert_in_bulk)
+		mtstate->bistate = GetBulkInsertState();
 
 	/* set up epqstate with dummy subplan data for the moment */
 	EvalPlanQualInit(&mtstate->mt_epqstate, estate, NULL, NIL, node->epqParam);
@@ -2698,6 +2710,12 @@ ExecEndModifyTable(ModifyTableState *node)
 														   resultRelInfo);
 	}
 
+	if (node->bistate)
+	{
+		FreeBulkInsertState(node->bistate);
+		table_finish_bulk_insert(node->rootResultRelInfo->ri_RelationDesc, 0);
+	}
+
 	/*
 	 * Close all the partitioned tables, leaf partitions, and their indices
 	 * and release the slot used for tuple routing, if set.
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 480d168346..1e3c23f723 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -631,7 +631,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 	ASSERTION ASSIGNMENT ASYMMETRIC AT ATTACH ATTRIBUTE AUTHORIZATION
 
 	BACKWARD BEFORE BEGIN_P BETWEEN BIGINT BINARY BIT
-	BOOLEAN_P BOTH BY
+	BOOLEAN_P BOTH BULK BY
 
 	CACHE CALL CALLED CASCADE CASCADED CASE CAST CATALOG_P CHAIN CHAR_P
 	CHARACTER CHARACTERISTICS CHECK CHECKPOINT CLASS CLOSE
@@ -9873,6 +9873,9 @@ transaction_mode_item:
 			| NOT DEFERRABLE
 					{ $$ = makeDefElem("transaction_deferrable",
 									   makeIntConst(false, @1), @1); }
+			| BULK
+					{ $$ = makeDefElem("bulk",
+									   makeIntConst(true, @1), @1); }
 		;
 
 /* Syntax with commas is SQL-spec, without commas is Postgres historical */
@@ -15079,6 +15082,7 @@ unreserved_keyword:
 			| BACKWARD
 			| BEFORE
 			| BEGIN_P
+			| BULK
 			| BY
 			| CACHE
 			| CALL
@@ -15590,6 +15594,7 @@ bare_label_keyword:
 			| BIT
 			| BOOLEAN_P
 			| BOTH
+			| BULK
 			| BY
 			| CACHE
 			| CALL
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 9a35147b26..cb3933b0e9 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -611,6 +611,10 @@ standard_ProcessUtility(PlannedStmt *pstmt,
 									SetPGVariable("transaction_deferrable",
 												  list_make1(item->arg),
 												  true);
+								else if (strcmp(item->defname, "bulk") == 0)
+									SetPGVariable("bulk_insert",
+												  list_make1(item->arg),
+												  true);
 							}
 						}
 						break;
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index a62d64eaa4..5f2fc9004b 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -54,6 +54,7 @@
 #include "libpq/pqformat.h"
 #include "miscadmin.h"
 #include "optimizer/cost.h"
+#include "executor/nodeModifyTable.h"
 #include "optimizer/geqo.h"
 #include "optimizer/optimizer.h"
 #include "optimizer/paths.h"
@@ -2036,6 +2037,16 @@ static struct config_bool ConfigureNamesBool[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"bulk_insert", PGC_USERSET, CLIENT_CONN_STATEMENT,
+			gettext_noop("Sets the transaction to bulk insert mode."),
+			gettext_noop("A ring buffer of limited size will be used."),
+		},
+		&insert_in_bulk,
+		false,
+		NULL, NULL, NULL
+	},
+
 	/* End-of-list marker */
 	{
 		{NULL, 0, 0, NULL, NULL}, NULL, false, NULL, NULL, NULL
diff --git a/src/include/executor/nodeModifyTable.h b/src/include/executor/nodeModifyTable.h
index 46a2dc9511..09c312a052 100644
--- a/src/include/executor/nodeModifyTable.h
+++ b/src/include/executor/nodeModifyTable.h
@@ -15,6 +15,8 @@
 
 #include "nodes/execnodes.h"
 
+extern PGDLLIMPORT bool insert_in_bulk;
+
 extern void ExecComputeStoredGenerated(ResultRelInfo *resultRelInfo,
 									   EState *estate, TupleTableSlot *slot,
 									   CmdType cmdtype);
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 6c0a7d68d6..e034562877 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -14,6 +14,7 @@
 #ifndef EXECNODES_H
 #define EXECNODES_H
 
+#include "access/heapam.h"
 #include "access/tupconvert.h"
 #include "executor/instrument.h"
 #include "fmgr.h"
@@ -1176,6 +1177,8 @@ typedef struct ModifyTableState
 	List	  **mt_arowmarks;	/* per-subplan ExecAuxRowMark lists */
 	EPQState	mt_epqstate;	/* for evaluating EvalPlanQual rechecks */
 	bool		fireBSTriggers; /* do we need to fire stmt triggers? */
+	BulkInsertState	bistate;	/* state for bulk insert like INSERT SELECT */
+	ResultRelInfo	*prevResultRelInfo; /* last child inserted with bistate */
 
 	/*
 	 * Slot for storing tuples in the root partitioned table's rowtype during
diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h
index 71dcdf2889..0991da11e7 100644
--- a/src/include/parser/kwlist.h
+++ b/src/include/parser/kwlist.h
@@ -60,6 +60,7 @@ PG_KEYWORD("binary", BINARY, TYPE_FUNC_NAME_KEYWORD, BARE_LABEL)
 PG_KEYWORD("bit", BIT, COL_NAME_KEYWORD, BARE_LABEL)
 PG_KEYWORD("boolean", BOOLEAN_P, COL_NAME_KEYWORD, BARE_LABEL)
 PG_KEYWORD("both", BOTH, RESERVED_KEYWORD, BARE_LABEL)
+PG_KEYWORD("bulk", BULK, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("by", BY, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("cache", CACHE, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("call", CALL, UNRESERVED_KEYWORD, BARE_LABEL)
diff --git a/src/test/regress/expected/insert.out b/src/test/regress/expected/insert.out
index da50ee3b67..da0dae6240 100644
--- a/src/test/regress/expected/insert.out
+++ b/src/test/regress/expected/insert.out
@@ -462,6 +462,29 @@ Partitions: part_aa_bb FOR VALUES IN ('aa', 'bb'),
             part_xx_yy FOR VALUES IN ('xx', 'yy'), PARTITIONED,
             part_default DEFAULT, PARTITIONED
 
+-- bulk inserts
+truncate hash_parted;
+begin bulk;
+create index on hash_parted(a);
+-- make sure small inserts are flushed
+insert into hash_parted values(11);
+insert into hpart0 values(12);
+select * from hash_parted;
+ a  
+----
+ 12
+ 11
+(2 rows)
+
+-- exercise bulk insert to partitions
+insert into hash_parted select generate_series(1,9999);
+select count(1) from hash_parted;
+ count 
+-------
+ 10001
+(1 row)
+
+commit;
 -- cleanup
 drop table range_parted, list_parted;
 drop table hash_parted;
diff --git a/src/test/regress/sql/insert.sql b/src/test/regress/sql/insert.sql
index 963faa1614..d3a94f053b 100644
--- a/src/test/regress/sql/insert.sql
+++ b/src/test/regress/sql/insert.sql
@@ -280,6 +280,19 @@ from hash_parted order by part;
 -- partitions
 \d+ list_parted
 
+-- bulk inserts
+truncate hash_parted;
+begin bulk;
+create index on hash_parted(a);
+-- make sure small inserts are flushed
+insert into hash_parted values(11);
+insert into hpart0 values(12);
+select * from hash_parted;
+-- exercise bulk insert to partitions
+insert into hash_parted select generate_series(1,9999);
+select count(1) from hash_parted;
+commit;
+
 -- cleanup
 drop table range_parted, list_parted;
 drop table hash_parted;
-- 
2.17.0

>From 5df306987950859ff532347ea5d6a6be954a25be Mon Sep 17 00:00:00 2001
From: Justin Pryzby <pryz...@telsasoft.com>
Date: Sat, 24 Oct 2020 22:49:01 -0500
Subject: [PATCH v5 2/2] Make INSERT SELECT use multi_insert

TODO: CTAS and matview ?

See also: 86b85044e823a304d2a265abc030254d39efe7df
---
 src/backend/commands/copy.c            | 185 +------------------------
 src/backend/executor/nodeModifyTable.c | 167 +++++++++++++++++++---
 src/include/commands/copy.h            | 185 +++++++++++++++++++++++++
 src/include/nodes/execnodes.h          |   9 +-
 src/test/regress/expected/insert.out   |  16 +++
 src/test/regress/sql/insert.sql        |   5 +
 6 files changed, 358 insertions(+), 209 deletions(-)

diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 36ddcdccdb..b613e6e43d 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -31,7 +31,6 @@
 #include "commands/defrem.h"
 #include "commands/trigger.h"
 #include "executor/execPartition.h"
-#include "executor/executor.h"
 #include "executor/nodeModifyTable.h"
 #include "executor/tuptable.h"
 #include "foreign/fdwapi.h"
@@ -241,54 +240,6 @@ typedef struct
 } DR_copy;
 
 
-/*
- * No more than this many tuples per CopyMultiInsertBuffer
- *
- * Caution: Don't make this too big, as we could end up with this many
- * CopyMultiInsertBuffer items stored in CopyMultiInsertInfo's
- * multiInsertBuffers list.  Increasing this can cause quadratic growth in
- * memory requirements during copies into partitioned tables with a large
- * number of partitions.
- */
-#define MAX_BUFFERED_TUPLES		1000
-
-/*
- * Flush buffers if there are >= this many bytes, as counted by the input
- * size, of tuples stored.
- */
-#define MAX_BUFFERED_BYTES		65535
-
-/* Trim the list of buffers back down to this number after flushing */
-#define MAX_PARTITION_BUFFERS	32
-
-/* Stores multi-insert data related to a single relation in CopyFrom. */
-typedef struct CopyMultiInsertBuffer
-{
-	TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */
-	ResultRelInfo *resultRelInfo;	/* ResultRelInfo for 'relid' */
-	BulkInsertState bistate;	/* BulkInsertState for this rel */
-	int			nused;			/* number of 'slots' containing tuples */
-	uint64		linenos[MAX_BUFFERED_TUPLES];	/* Line # of tuple in copy
-												 * stream */
-} CopyMultiInsertBuffer;
-
-/*
- * Stores one or many CopyMultiInsertBuffers and details about the size and
- * number of tuples which are stored in them.  This allows multiple buffers to
- * exist at once when COPYing into a partitioned table.
- */
-typedef struct CopyMultiInsertInfo
-{
-	List	   *multiInsertBuffers; /* List of tracked CopyMultiInsertBuffers */
-	int			bufferedTuples; /* number of tuples buffered over all buffers */
-	int			bufferedBytes;	/* number of bytes from all buffered tuples */
-	CopyState	cstate;			/* Copy state for this CopyMultiInsertInfo */
-	EState	   *estate;			/* Executor state used for COPY */
-	CommandId	mycid;			/* Command Id used for COPY */
-	int			ti_options;		/* table insert options */
-} CopyMultiInsertInfo;
-
-
 /*
  * These macros centralize code used to process line_buf and raw_buf buffers.
  * They are macros because they often do continue/break control and to avoid
@@ -2386,48 +2337,13 @@ limit_printout_length(const char *str)
 	return res;
 }
 
-/*
- * Allocate memory and initialize a new CopyMultiInsertBuffer for this
- * ResultRelInfo.
- */
-static CopyMultiInsertBuffer *
-CopyMultiInsertBufferInit(ResultRelInfo *rri)
-{
-	CopyMultiInsertBuffer *buffer;
-
-	buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer));
-	memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
-	buffer->resultRelInfo = rri;
-	buffer->bistate = GetBulkInsertState();
-	buffer->nused = 0;
-
-	return buffer;
-}
-
-/*
- * Make a new buffer for this ResultRelInfo.
- */
-static inline void
-CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo,
-							   ResultRelInfo *rri)
-{
-	CopyMultiInsertBuffer *buffer;
-
-	buffer = CopyMultiInsertBufferInit(rri);
-
-	/* Setup back-link so we can easily find this buffer again */
-	rri->ri_CopyMultiInsertBuffer = buffer;
-	/* Record that we're tracking this buffer */
-	miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
-}
-
 /*
  * Initialize an already allocated CopyMultiInsertInfo.
  *
  * If rri is a non-partitioned table then a CopyMultiInsertBuffer is set up
  * for that table.
  */
-static void
+void
 CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
 						CopyState cstate, EState *estate, CommandId mycid,
 						int ti_options)
@@ -2449,27 +2365,6 @@ CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
 		CopyMultiInsertInfoSetupBuffer(miinfo, rri);
 }
 
-/*
- * Returns true if the buffers are full
- */
-static inline bool
-CopyMultiInsertInfoIsFull(CopyMultiInsertInfo *miinfo)
-{
-	if (miinfo->bufferedTuples >= MAX_BUFFERED_TUPLES ||
-		miinfo->bufferedBytes >= MAX_BUFFERED_BYTES)
-		return true;
-	return false;
-}
-
-/*
- * Returns true if we have no buffered tuples
- */
-static inline bool
-CopyMultiInsertInfoIsEmpty(CopyMultiInsertInfo *miinfo)
-{
-	return miinfo->bufferedTuples == 0;
-}
-
 /*
  * Write the tuples stored in 'buffer' out to the table.
  */
@@ -2554,35 +2449,6 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
 	cstate->cur_lineno = save_cur_lineno;
 }
 
-/*
- * Drop used slots and free member for this buffer.
- *
- * The buffer must be flushed before cleanup.
- */
-static inline void
-CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo,
-							 CopyMultiInsertBuffer *buffer)
-{
-	int			i;
-
-	/* Ensure buffer was flushed */
-	Assert(buffer->nused == 0);
-
-	/* Remove back-link to ourself */
-	buffer->resultRelInfo->ri_CopyMultiInsertBuffer = NULL;
-
-	FreeBulkInsertState(buffer->bistate);
-
-	/* Since we only create slots on demand, just drop the non-null ones. */
-	for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
-		ExecDropSingleTupleTableSlot(buffer->slots[i]);
-
-	table_finish_bulk_insert(buffer->resultRelInfo->ri_RelationDesc,
-							 miinfo->ti_options);
-
-	pfree(buffer);
-}
-
 /*
  * Write out all stored tuples in all buffers out to the tables.
  *
@@ -2639,7 +2505,7 @@ CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri)
 /*
  * Cleanup allocated buffers and free memory
  */
-static inline void
+void
 CopyMultiInsertInfoCleanup(CopyMultiInsertInfo *miinfo)
 {
 	ListCell   *lc;
@@ -2650,53 +2516,6 @@ CopyMultiInsertInfoCleanup(CopyMultiInsertInfo *miinfo)
 	list_free(miinfo->multiInsertBuffers);
 }
 
-/*
- * Get the next TupleTableSlot that the next tuple should be stored in.
- *
- * Callers must ensure that the buffer is not full.
- *
- * Note: 'miinfo' is unused but has been included for consistency with the
- * other functions in this area.
- */
-static inline TupleTableSlot *
-CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo,
-								ResultRelInfo *rri)
-{
-	CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
-	int			nused = buffer->nused;
-
-	Assert(buffer != NULL);
-	Assert(nused < MAX_BUFFERED_TUPLES);
-
-	if (buffer->slots[nused] == NULL)
-		buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL);
-	return buffer->slots[nused];
-}
-
-/*
- * Record the previously reserved TupleTableSlot that was reserved by
- * CopyMultiInsertInfoNextFreeSlot as being consumed.
- */
-static inline void
-CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
-						 TupleTableSlot *slot, int tuplen, uint64 lineno)
-{
-	CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
-
-	Assert(buffer != NULL);
-	Assert(slot == buffer->slots[buffer->nused]);
-
-	/* Store the line number so we can properly report any errors later */
-	buffer->linenos[buffer->nused] = lineno;
-
-	/* Record this slot as being used */
-	buffer->nused++;
-
-	/* Update how many tuples are stored and their size */
-	miinfo->bufferedTuples++;
-	miinfo->bufferedBytes += tuplen;
-}
-
 /*
  * Copy FROM file to relation.
  */
diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c
index 26ff964105..1437acfc22 100644
--- a/src/backend/executor/nodeModifyTable.c
+++ b/src/backend/executor/nodeModifyTable.c
@@ -43,6 +43,7 @@
 #include "access/xact.h"
 #include "catalog/catalog.h"
 #include "commands/trigger.h"
+#include "commands/copy.h"
 #include "executor/execPartition.h"
 #include "executor/executor.h"
 #include "executor/nodeModifyTable.h"
@@ -362,6 +363,65 @@ ExecComputeStoredGenerated(ResultRelInfo *resultRelInfo,
 	MemoryContextSwitchTo(oldContext);
 }
 
+/*
+ * This is copied from CopyMultiInsertInfoFlush and modified to avoid
+ * dragging in CopyState..
+ */
+static void
+InsertMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo)
+{
+	MemoryContext oldcontext;
+	ModifyTableState *mtstate = (ModifyTableState*)miinfo->cstate;
+
+	ListCell   *lc;
+	foreach(lc, miinfo->multiInsertBuffers)
+	{
+		CopyMultiInsertBuffer *buffer = (CopyMultiInsertBuffer *) lfirst(lc);
+		ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
+
+		oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(miinfo->estate));
+		table_multi_insert(resultRelInfo->ri_RelationDesc,
+				buffer->slots,
+				buffer->nused,
+				miinfo->mycid,
+				miinfo->ti_options,
+				buffer->bistate);
+		MemoryContextSwitchTo(oldcontext);
+
+		for (int i = 0; i < buffer->nused; ++i)
+		{
+			if (resultRelInfo->ri_NumIndices > 0)
+			{
+				List *recheckIndexes;
+				recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
+					buffer->slots[i], miinfo->estate,
+					false, NULL, NIL);
+				ExecARInsertTriggers(miinfo->estate, resultRelInfo,
+						buffer->slots[i], recheckIndexes,
+						mtstate->mt_transition_capture);
+				list_free(recheckIndexes);
+			}
+			else if (resultRelInfo->ri_TrigDesc != NULL &&
+					(resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
+					 resultRelInfo->ri_TrigDesc->trig_insert_new_table))
+			{
+				ExecARInsertTriggers(miinfo->estate, resultRelInfo,
+						buffer->slots[i], NIL,
+						mtstate->mt_transition_capture);
+			}
+
+			ExecClearTuple(buffer->slots[i]);
+		}
+		buffer->nused = 0;
+	}
+
+	// TODO: MAX_PARTITION_BUFFERS
+
+	miinfo->bufferedTuples = 0;
+	miinfo->bufferedBytes = 0;
+}
+
+
 /* ----------------------------------------------------------------
  *		ExecInsert
  *
@@ -391,6 +451,7 @@ ExecInsert(ModifyTableState *mtstate,
 	ModifyTable *node = (ModifyTable *) mtstate->ps.plan;
 	OnConflictAction onconflict = node->onConflictAction;
 	PartitionTupleRouting *proute = mtstate->mt_partition_tuple_routing;
+	TupleTableSlot *batchslot = NULL;
 
 	/*
 	 * If the input result relation is a partitioned table, find the leaf
@@ -631,6 +692,34 @@ ExecInsert(ModifyTableState *mtstate,
 
 			/* Since there was no insertion conflict, we're done */
 		}
+		else if (mtstate->miinfo != NULL &&
+			/* Has unsupported trigger? */
+				(resultRelInfo->ri_TrigDesc == NULL ||
+				 (
+				  // !resultRelInfo->ri_TrigDesc->trig_insert_before_row
+				  !resultRelInfo->ri_TrigDesc->trig_insert_after_row
+				  // && !resultRelInfo->ri_TrigDesc->trig_insert_instead_row
+				  && !resultRelInfo->ri_TrigDesc->trig_insert_after_statement
+				 )
+				) &&
+			/* Has Fdw? */
+				!resultRelInfo->ri_FdwRoutine
+				// !cstate->volatile_defexprs)
+				// !(contain_volatile_functions(cstate->whereClause))
+			)
+		{
+			if (resultRelInfo->ri_CopyMultiInsertBuffer == NULL)
+				CopyMultiInsertInfoSetupBuffer(mtstate->miinfo,
+						resultRelInfo);
+
+			batchslot = CopyMultiInsertInfoNextFreeSlot(mtstate->miinfo, resultRelInfo);
+			ExecCopySlot(batchslot, slot);
+
+			CopyMultiInsertInfoStore(mtstate->miinfo, resultRelInfo, batchslot, 0, 0); // XXX: tuplen/lineno
+
+			if (CopyMultiInsertInfoIsFull(mtstate->miinfo))
+				InsertMultiInsertInfoFlush(mtstate->miinfo);
+		}
 		else
 		{
 			if (proute && mtstate->prevResultRelInfo != resultRelInfo)
@@ -640,6 +729,13 @@ ExecInsert(ModifyTableState *mtstate,
 				mtstate->prevResultRelInfo = resultRelInfo;
 			}
 
+			/*
+			 * Flush pending inserts if this partition can't use
+			 * batching, so rows are visible to triggers etc.
+			 */
+			if (mtstate->miinfo)
+				InsertMultiInsertInfoFlush(mtstate->miinfo);
+
 			/* insert the tuple normally */
 			table_tuple_insert(resultRelationDesc, slot,
 							   estate->es_output_cid,
@@ -659,32 +755,36 @@ ExecInsert(ModifyTableState *mtstate,
 		setLastTid(&slot->tts_tid);
 	}
 
-	/*
-	 * If this insert is the result of a partition key update that moved the
-	 * tuple to a new partition, put this row into the transition NEW TABLE,
-	 * if there is one. We need to do this separately for DELETE and INSERT
-	 * because they happen on different tables.
-	 */
-	ar_insert_trig_tcs = mtstate->mt_transition_capture;
-	if (mtstate->operation == CMD_UPDATE && mtstate->mt_transition_capture
-		&& mtstate->mt_transition_capture->tcs_update_new_table)
+	/* Triggers were already run in the batch insert case */
+	if (batchslot == NULL)
 	{
-		ExecARUpdateTriggers(estate, resultRelInfo, NULL,
-							 NULL,
-							 slot,
-							 NULL,
-							 mtstate->mt_transition_capture);
-
 		/*
-		 * We've already captured the NEW TABLE row, so make sure any AR
-		 * INSERT trigger fired below doesn't capture it again.
+		 * If this insert is the result of a partition key update that moved the
+		 * tuple to a new partition, put this row into the transition NEW TABLE,
+		 * if there is one. We need to do this separately for DELETE and INSERT
+		 * because they happen on different tables.
 		 */
-		ar_insert_trig_tcs = NULL;
-	}
+		ar_insert_trig_tcs = mtstate->mt_transition_capture;
+		if (mtstate->operation == CMD_UPDATE && mtstate->mt_transition_capture
+			&& mtstate->mt_transition_capture->tcs_update_new_table)
+		{
+			ExecARUpdateTriggers(estate, resultRelInfo, NULL,
+								 NULL,
+								 slot,
+								 NULL,
+								 mtstate->mt_transition_capture);
 
-	/* AFTER ROW INSERT Triggers */
-	ExecARInsertTriggers(estate, resultRelInfo, slot, recheckIndexes,
-						 ar_insert_trig_tcs);
+			/*
+			 * We've already captured the NEW TABLE row, so make sure any AR
+			 * INSERT trigger fired below doesn't capture it again.
+			 */
+			ar_insert_trig_tcs = NULL;
+		}
+
+		/* AFTER ROW INSERT Triggers */
+		ExecARInsertTriggers(estate, resultRelInfo, slot, recheckIndexes,
+							 ar_insert_trig_tcs);
+	}
 
 	list_free(recheckIndexes);
 
@@ -2242,9 +2342,25 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
 	mtstate->mt_arowmarks = (List **) palloc0(sizeof(List *) * nplans);
 	mtstate->mt_nplans = nplans;
 	mtstate->bistate = NULL;
+	mtstate->miinfo = NULL;
 	if (operation == CMD_INSERT && insert_in_bulk)
+	{
 		mtstate->bistate = GetBulkInsertState();
 
+		/*
+		 * For partitioned tables we don't support multi-inserts when there
+		 * are any statement level insert triggers.
+		 */
+		if (node->rootRelation == 0 ||
+			 !mtstate->rootResultRelInfo->ri_TrigDesc ||
+			 !mtstate->rootResultRelInfo->ri_TrigDesc->trig_insert_new_table)
+		{
+			mtstate->miinfo = calloc(sizeof(*mtstate->miinfo), 1);
+			CopyMultiInsertInfoInit(mtstate->miinfo, mtstate->rootResultRelInfo,
+					(void*)mtstate, estate, GetCurrentCommandId(true), 0);
+		}
+	}
+
 	/* set up epqstate with dummy subplan data for the moment */
 	EvalPlanQualInit(&mtstate->mt_epqstate, estate, NULL, NIL, node->epqParam);
 	mtstate->fireBSTriggers = true;
@@ -2716,6 +2832,13 @@ ExecEndModifyTable(ModifyTableState *node)
 		table_finish_bulk_insert(node->rootResultRelInfo->ri_RelationDesc, 0);
 	}
 
+	if (node->miinfo)
+	{
+		if (!CopyMultiInsertInfoIsEmpty(node->miinfo))
+			 InsertMultiInsertInfoFlush(node->miinfo);
+		CopyMultiInsertInfoCleanup(node->miinfo);
+	}
+
 	/*
 	 * Close all the partitioned tables, leaf partitions, and their indices
 	 * and release the slot used for tuple routing, if set.
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index c639833565..8b03f69cf5 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -14,6 +14,7 @@
 #ifndef COPY_H
 #define COPY_H
 
+#include "executor/executor.h"
 #include "nodes/execnodes.h"
 #include "nodes/parsenodes.h"
 #include "parser/parse_node.h"
@@ -41,4 +42,188 @@ extern uint64 CopyFrom(CopyState cstate);
 
 extern DestReceiver *CreateCopyDestReceiver(void);
 
+/*
+ * No more than this many tuples per CopyMultiInsertBuffer
+ *
+ * Caution: Don't make this too big, as we could end up with this many
+ * CopyMultiInsertBuffer items stored in CopyMultiInsertInfo's
+ * multiInsertBuffers list.  Increasing this can cause quadratic growth in
+ * memory requirements during copies into partitioned tables with a large
+ * number of partitions.
+ */
+#define MAX_BUFFERED_TUPLES				1000
+
+/*
+ * Flush buffers if there are >= this many bytes, as counted by the input
+ * size, of tuples stored.
+ */
+	#define MAX_BUFFERED_BYTES			65535
+
+/* Trim the list of buffers back down to this number after flushing */
+#define MAX_PARTITION_BUFFERS			32
+
+/* Stores multi-insert data related to a single relation in CopyFrom/INSERT. */
+typedef struct CopyMultiInsertBuffer
+{
+	TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */
+	ResultRelInfo *resultRelInfo;	/* ResultRelInfo for 'relid' */
+	BulkInsertState bistate;		/* BulkInsertState for this rel */
+	int				nused;			/* number of 'slots' containing tuples */
+	uint64			linenos[MAX_BUFFERED_TUPLES];	/* Line # of tuple in copy
+							 * stream */
+} CopyMultiInsertBuffer;
+
+/*
+ * Stores one or many CopyMultiInsertBuffers and details about the size and
+ * number of tuples which are stored in them.  This allows multiple buffers to
+ * exist at once when COPYing into a partitioned table.
+ */
+typedef struct CopyMultiInsertInfo
+{
+	List		*multiInsertBuffers; /* List of tracked CopyMultiInsertBuffers */
+	int			bufferedTuples; /* number of tuples buffered over all buffers */
+	int			bufferedBytes;  /* number of bytes from all buffered tuples */
+	CopyState	cstate;			/* Copy state for this CopyMultiInsertInfo (not used for insert) */
+	EState		*estate;		/* Executor state used for COPY/INSERT */
+	CommandId	mycid;			/* Command Id used for COPY/INSERT */
+	int			ti_options;	/* table insert options */
+} CopyMultiInsertInfo;
+
+void CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
+						CopyState cstate, EState *estate, CommandId mycid,
+						int ti_options);
+void CopyMultiInsertInfoCleanup(CopyMultiInsertInfo *miinfo);
+
+/*
+ * Allocate memory and initialize a new CopyMultiInsertBuffer for this
+ * ResultRelInfo.
+ */
+static inline CopyMultiInsertBuffer *
+CopyMultiInsertBufferInit(ResultRelInfo *rri)
+{
+	CopyMultiInsertBuffer *buffer;
+
+	buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer));
+	memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
+	buffer->resultRelInfo = rri;
+	buffer->bistate = GetBulkInsertState();
+	buffer->nused = 0;
+
+	return buffer;
+}
+
+/*
+ * Make a new buffer for this ResultRelInfo.
+ */
+static inline void
+CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo,
+							   ResultRelInfo *rri)
+{
+	CopyMultiInsertBuffer *buffer;
+
+	buffer = CopyMultiInsertBufferInit(rri);
+
+	/* Setup back-link so we can easily find this buffer again */
+	rri->ri_CopyMultiInsertBuffer = buffer;
+	/* Record that we're tracking this buffer */
+	miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
+}
+
+/*
+ * Returns true if the buffers are full
+ */
+static inline bool
+CopyMultiInsertInfoIsFull(CopyMultiInsertInfo *miinfo)
+{
+	if (miinfo->bufferedTuples >= MAX_BUFFERED_TUPLES ||
+		miinfo->bufferedBytes >= MAX_BUFFERED_BYTES)
+		return true;
+	return false;
+}
+
+/*
+ * Returns true if we have no buffered tuples
+ */
+static inline bool
+CopyMultiInsertInfoIsEmpty(CopyMultiInsertInfo *miinfo)
+{
+	return miinfo->bufferedTuples == 0;
+}
+
+/*
+ * Drop used slots and free member for this buffer.
+ *
+ * The buffer must be flushed before cleanup.
+ */
+static inline void
+CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo,
+							 CopyMultiInsertBuffer *buffer)
+{
+	int			i;
+
+	/* Ensure buffer was flushed */
+	Assert(buffer->nused == 0);
+
+	/* Remove back-link to ourself */
+	buffer->resultRelInfo->ri_CopyMultiInsertBuffer = NULL;
+
+	FreeBulkInsertState(buffer->bistate);
+
+	/* Since we only create slots on demand, just drop the non-null ones. */
+	for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
+		ExecDropSingleTupleTableSlot(buffer->slots[i]);
+
+	table_finish_bulk_insert(buffer->resultRelInfo->ri_RelationDesc,
+							 miinfo->ti_options);
+
+	pfree(buffer);
+}
+
+/*
+ * Get the next TupleTableSlot that the next tuple should be stored in.
+ *
+ * Callers must ensure that the buffer is not full.
+ *
+ * Note: 'miinfo' is unused but has been included for consistency with the
+ * other functions in this area.
+ */
+static inline TupleTableSlot *
+CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo,
+								ResultRelInfo *rri)
+{
+	CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
+	int			nused = buffer->nused;
+
+	Assert(buffer != NULL);
+	Assert(nused < MAX_BUFFERED_TUPLES);
+
+	if (buffer->slots[nused] == NULL)
+		buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL);
+	return buffer->slots[nused];
+}
+
+/*
+ * Record the previously reserved TupleTableSlot that was reserved by
+ * CopyMultiInsertInfoNextFreeSlot as being consumed.
+ */
+static inline void
+CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
+						 TupleTableSlot *slot, int tuplen, uint64 lineno)
+{
+	CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
+
+	Assert(buffer != NULL);
+	Assert(slot == buffer->slots[buffer->nused]);
+
+	/* Store the line number so we can properly report any errors later */
+	buffer->linenos[buffer->nused] = lineno;
+
+	/* Record this slot as being used */
+	buffer->nused++;
+
+	/* Update how many tuples are stored and their size */
+	miinfo->bufferedTuples++;
+	miinfo->bufferedBytes += tuplen;
+}
+
 #endif							/* COPY_H */
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index e034562877..bcb9986013 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -40,8 +40,8 @@ struct ExprState;
 struct ExprContext;
 struct RangeTblEntry;			/* avoid including parsenodes.h here */
 struct ExprEvalStep;			/* avoid including execExpr.h everywhere */
-struct CopyMultiInsertBuffer;
-
+// struct CopyMultiInsertBuffer;
+// struct CopyMultiInsertInfo;
 
 /* ----------------
  *		ExprState node
@@ -499,7 +499,7 @@ typedef struct ResultRelInfo
 	 */
 	TupleConversionMap *ri_ChildToRootMap;
 
-	/* for use by copy.c when performing multi-inserts */
+	/* for use by copy.c/modifyTable when performing multi-inserts */
 	struct CopyMultiInsertBuffer *ri_CopyMultiInsertBuffer;
 } ResultRelInfo;
 
@@ -1177,8 +1177,9 @@ typedef struct ModifyTableState
 	List	  **mt_arowmarks;	/* per-subplan ExecAuxRowMark lists */
 	EPQState	mt_epqstate;	/* for evaluating EvalPlanQual rechecks */
 	bool		fireBSTriggers; /* do we need to fire stmt triggers? */
-	BulkInsertState	bistate;	/* state for bulk insert like INSERT SELECT */
+	BulkInsertState	bistate;	/* state for bulk insert like INSERT SELECT, when miinfo cannot be used */
 	ResultRelInfo	*prevResultRelInfo; /* last child inserted with bistate */
+	struct CopyMultiInsertInfo *miinfo;
 
 	/*
 	 * Slot for storing tuples in the root partitioned table's rowtype during
diff --git a/src/test/regress/expected/insert.out b/src/test/regress/expected/insert.out
index da0dae6240..e0c83d7427 100644
--- a/src/test/regress/expected/insert.out
+++ b/src/test/regress/expected/insert.out
@@ -485,6 +485,22 @@ select count(1) from hash_parted;
 (1 row)
 
 commit;
+-- test that index was updated
+vacuum analyze hash_parted;
+explain(costs off)
+select * from hash_parted where a=13;
+                        QUERY PLAN                        
+----------------------------------------------------------
+ Index Only Scan using hpart1_a_idx on hpart1 hash_parted
+   Index Cond: (a = 13)
+(2 rows)
+
+select * from hash_parted where a=13;
+ a  
+----
+ 13
+(1 row)
+
 -- cleanup
 drop table range_parted, list_parted;
 drop table hash_parted;
diff --git a/src/test/regress/sql/insert.sql b/src/test/regress/sql/insert.sql
index d3a94f053b..99ec18d9a2 100644
--- a/src/test/regress/sql/insert.sql
+++ b/src/test/regress/sql/insert.sql
@@ -292,6 +292,11 @@ select * from hash_parted;
 insert into hash_parted select generate_series(1,9999);
 select count(1) from hash_parted;
 commit;
+-- test that index was updated
+vacuum analyze hash_parted;
+explain(costs off)
+select * from hash_parted where a=13;
+select * from hash_parted where a=13;
 
 -- cleanup
 drop table range_parted, list_parted;
-- 
2.17.0

Reply via email to