On Thu, Dec 03, 2020 at 10:59:34AM +0530, Bharath Rupireddy wrote:
> On Wed, Dec 2, 2020 at 10:24 PM Justin Pryzby <pry...@telsasoft.com> wrote:
> >
> > One loose end in this patch is how to check for volatile default 
> > expressions.
> 
> I think we should be doing all the necessary checks in the planner and
> have a flag in the planned stmt to indicate whether to go with multi
> insert or not. For the required checks, we can have a look at how the
> existing COPY decides to go with either CIM_MULTI or CIM_SINGLE.

Yes, you can see that I've copied the checks from copy.
Like copy, some checks are done once, in ExecInitModifyTable, outside of the
ExecModifyTable "loop".

This squishes some commits together.
And uses bistate for ON CONFLICT.
And attempts to use memory context for tuple size.

For the bufferedBytes check, I'm not sure what's best.  Copy flushes buffers
after 65k of input line length, but that's totally different from tuple slot
memory context size, which is what I used for insert.  Maybe COPY should also
use slot size?  Or maybe the threshold to flush needs to be set in miinfo,
rather than a #define, and differ between COPY and INSERT.

-- 
Justin
>From f83313efc8612a5e94f1f13a87d80fb0c393c7b0 Mon Sep 17 00:00:00 2001
From: Justin Pryzby <pryz...@telsasoft.com>
Date: Fri, 8 May 2020 02:17:32 -0500
Subject: [PATCH v8 1/4] INSERT SELECT to use BulkInsertState and multi_insert

Renames structures;
Move MultipleInsert functions from copyfrom.c to (tentatively) nodeModifyTable.h;
Move into MultiInsertInfo: transition_capture and cur_lineno (via cstate->miinfo);

Dynamically switch to multi-insert mode based on the number of insertions.
This is intended to accomodate 1) the original use case of INSERT using a small
ring buffer to avoid leaving behind dirty buffers; and, 2) Automatically using
multi-inserts for batch operations; 3) allow the old behavior of leaving behind
dirty buffers, which might allow INSERT to run more quickly, at the cost of
leaving behind many dirty buffers which other backends may have to write out.

XXX: for (1), the bulk-insert state is used even if not multi-insert, including
for a VALUES.

TODO: use cstate->miinfo.cur_lineno++ instead of mtstate->miinfo->ntuples
---
 src/backend/commands/copyfrom.c          | 394 +----------------------
 src/backend/commands/copyfromparse.c     |  10 +-
 src/backend/executor/execMain.c          |   2 +-
 src/backend/executor/execPartition.c     |   2 +-
 src/backend/executor/nodeModifyTable.c   | 196 +++++++++--
 src/backend/utils/misc/guc.c             |  10 +
 src/include/commands/copyfrom_internal.h |   5 +-
 src/include/executor/nodeModifyTable.h   | 370 +++++++++++++++++++++
 src/include/nodes/execnodes.h            |  16 +-
 src/test/regress/expected/insert.out     |  43 +++
 src/test/regress/sql/insert.sql          |  20 ++
 src/tools/pgindent/typedefs.list         |   4 +-
 12 files changed, 657 insertions(+), 415 deletions(-)

diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index 1b14e9a6eb..c4fe75df8e 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -44,54 +44,6 @@
 #include "utils/rel.h"
 #include "utils/snapmgr.h"
 
-/*
- * No more than this many tuples per CopyMultiInsertBuffer
- *
- * Caution: Don't make this too big, as we could end up with this many
- * CopyMultiInsertBuffer items stored in CopyMultiInsertInfo's
- * multiInsertBuffers list.  Increasing this can cause quadratic growth in
- * memory requirements during copies into partitioned tables with a large
- * number of partitions.
- */
-#define MAX_BUFFERED_TUPLES		1000
-
-/*
- * Flush buffers if there are >= this many bytes, as counted by the input
- * size, of tuples stored.
- */
-#define MAX_BUFFERED_BYTES		65535
-
-/* Trim the list of buffers back down to this number after flushing */
-#define MAX_PARTITION_BUFFERS	32
-
-/* Stores multi-insert data related to a single relation in CopyFrom. */
-typedef struct CopyMultiInsertBuffer
-{
-	TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */
-	ResultRelInfo *resultRelInfo;	/* ResultRelInfo for 'relid' */
-	BulkInsertState bistate;	/* BulkInsertState for this rel */
-	int			nused;			/* number of 'slots' containing tuples */
-	uint64		linenos[MAX_BUFFERED_TUPLES];	/* Line # of tuple in copy
-												 * stream */
-} CopyMultiInsertBuffer;
-
-/*
- * Stores one or many CopyMultiInsertBuffers and details about the size and
- * number of tuples which are stored in them.  This allows multiple buffers to
- * exist at once when COPYing into a partitioned table.
- */
-typedef struct CopyMultiInsertInfo
-{
-	List	   *multiInsertBuffers; /* List of tracked CopyMultiInsertBuffers */
-	int			bufferedTuples; /* number of tuples buffered over all buffers */
-	int			bufferedBytes;	/* number of bytes from all buffered tuples */
-	CopyFromState	cstate;			/* Copy state for this CopyMultiInsertInfo */
-	EState	   *estate;			/* Executor state used for COPY */
-	CommandId	mycid;			/* Command Id used for COPY */
-	int			ti_options;		/* table insert options */
-} CopyMultiInsertInfo;
-
-
 /* non-export function prototypes */
 static char *limit_printout_length(const char *str);
 
@@ -109,7 +61,7 @@ CopyFromErrorCallback(void *arg)
 	char		curlineno_str[32];
 
 	snprintf(curlineno_str, sizeof(curlineno_str), UINT64_FORMAT,
-			 cstate->cur_lineno);
+			 cstate->miinfo.cur_lineno);
 
 	if (cstate->opts.binary)
 	{
@@ -204,317 +156,6 @@ limit_printout_length(const char *str)
 	return res;
 }
 
-/*
- * Allocate memory and initialize a new CopyMultiInsertBuffer for this
- * ResultRelInfo.
- */
-static CopyMultiInsertBuffer *
-CopyMultiInsertBufferInit(ResultRelInfo *rri)
-{
-	CopyMultiInsertBuffer *buffer;
-
-	buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer));
-	memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
-	buffer->resultRelInfo = rri;
-	buffer->bistate = GetBulkInsertState();
-	buffer->nused = 0;
-
-	return buffer;
-}
-
-/*
- * Make a new buffer for this ResultRelInfo.
- */
-static inline void
-CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo,
-							   ResultRelInfo *rri)
-{
-	CopyMultiInsertBuffer *buffer;
-
-	buffer = CopyMultiInsertBufferInit(rri);
-
-	/* Setup back-link so we can easily find this buffer again */
-	rri->ri_CopyMultiInsertBuffer = buffer;
-	/* Record that we're tracking this buffer */
-	miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
-}
-
-/*
- * Initialize an already allocated CopyMultiInsertInfo.
- *
- * If rri is a non-partitioned table then a CopyMultiInsertBuffer is set up
- * for that table.
- */
-static void
-CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
-						CopyFromState cstate, EState *estate, CommandId mycid,
-						int ti_options)
-{
-	miinfo->multiInsertBuffers = NIL;
-	miinfo->bufferedTuples = 0;
-	miinfo->bufferedBytes = 0;
-	miinfo->cstate = cstate;
-	miinfo->estate = estate;
-	miinfo->mycid = mycid;
-	miinfo->ti_options = ti_options;
-
-	/*
-	 * Only setup the buffer when not dealing with a partitioned table.
-	 * Buffers for partitioned tables will just be setup when we need to send
-	 * tuples their way for the first time.
-	 */
-	if (rri->ri_RelationDesc->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
-		CopyMultiInsertInfoSetupBuffer(miinfo, rri);
-}
-
-/*
- * Returns true if the buffers are full
- */
-static inline bool
-CopyMultiInsertInfoIsFull(CopyMultiInsertInfo *miinfo)
-{
-	if (miinfo->bufferedTuples >= MAX_BUFFERED_TUPLES ||
-		miinfo->bufferedBytes >= MAX_BUFFERED_BYTES)
-		return true;
-	return false;
-}
-
-/*
- * Returns true if we have no buffered tuples
- */
-static inline bool
-CopyMultiInsertInfoIsEmpty(CopyMultiInsertInfo *miinfo)
-{
-	return miinfo->bufferedTuples == 0;
-}
-
-/*
- * Write the tuples stored in 'buffer' out to the table.
- */
-static inline void
-CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
-						   CopyMultiInsertBuffer *buffer)
-{
-	MemoryContext oldcontext;
-	int			i;
-	uint64		save_cur_lineno;
-	CopyFromState	cstate = miinfo->cstate;
-	EState	   *estate = miinfo->estate;
-	CommandId	mycid = miinfo->mycid;
-	int			ti_options = miinfo->ti_options;
-	bool		line_buf_valid = cstate->line_buf_valid;
-	int			nused = buffer->nused;
-	ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
-	TupleTableSlot **slots = buffer->slots;
-
-	/*
-	 * Print error context information correctly, if one of the operations
-	 * below fail.
-	 */
-	cstate->line_buf_valid = false;
-	save_cur_lineno = cstate->cur_lineno;
-
-	/*
-	 * table_multi_insert may leak memory, so switch to short-lived memory
-	 * context before calling it.
-	 */
-	oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
-	table_multi_insert(resultRelInfo->ri_RelationDesc,
-					   slots,
-					   nused,
-					   mycid,
-					   ti_options,
-					   buffer->bistate);
-	MemoryContextSwitchTo(oldcontext);
-
-	for (i = 0; i < nused; i++)
-	{
-		/*
-		 * If there are any indexes, update them for all the inserted tuples,
-		 * and run AFTER ROW INSERT triggers.
-		 */
-		if (resultRelInfo->ri_NumIndices > 0)
-		{
-			List	   *recheckIndexes;
-
-			cstate->cur_lineno = buffer->linenos[i];
-			recheckIndexes =
-				ExecInsertIndexTuples(resultRelInfo,
-									  buffer->slots[i], estate, false, NULL,
-									  NIL);
-			ExecARInsertTriggers(estate, resultRelInfo,
-								 slots[i], recheckIndexes,
-								 cstate->transition_capture);
-			list_free(recheckIndexes);
-		}
-
-		/*
-		 * There's no indexes, but see if we need to run AFTER ROW INSERT
-		 * triggers anyway.
-		 */
-		else if (resultRelInfo->ri_TrigDesc != NULL &&
-				 (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
-				  resultRelInfo->ri_TrigDesc->trig_insert_new_table))
-		{
-			cstate->cur_lineno = buffer->linenos[i];
-			ExecARInsertTriggers(estate, resultRelInfo,
-								 slots[i], NIL, cstate->transition_capture);
-		}
-
-		ExecClearTuple(slots[i]);
-	}
-
-	/* Mark that all slots are free */
-	buffer->nused = 0;
-
-	/* reset cur_lineno and line_buf_valid to what they were */
-	cstate->line_buf_valid = line_buf_valid;
-	cstate->cur_lineno = save_cur_lineno;
-}
-
-/*
- * Drop used slots and free member for this buffer.
- *
- * The buffer must be flushed before cleanup.
- */
-static inline void
-CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo,
-							 CopyMultiInsertBuffer *buffer)
-{
-	int			i;
-
-	/* Ensure buffer was flushed */
-	Assert(buffer->nused == 0);
-
-	/* Remove back-link to ourself */
-	buffer->resultRelInfo->ri_CopyMultiInsertBuffer = NULL;
-
-	FreeBulkInsertState(buffer->bistate);
-
-	/* Since we only create slots on demand, just drop the non-null ones. */
-	for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
-		ExecDropSingleTupleTableSlot(buffer->slots[i]);
-
-	table_finish_bulk_insert(buffer->resultRelInfo->ri_RelationDesc,
-							 miinfo->ti_options);
-
-	pfree(buffer);
-}
-
-/*
- * Write out all stored tuples in all buffers out to the tables.
- *
- * Once flushed we also trim the tracked buffers list down to size by removing
- * the buffers created earliest first.
- *
- * Callers should pass 'curr_rri' is the ResultRelInfo that's currently being
- * used.  When cleaning up old buffers we'll never remove the one for
- * 'curr_rri'.
- */
-static inline void
-CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri)
-{
-	ListCell   *lc;
-
-	foreach(lc, miinfo->multiInsertBuffers)
-	{
-		CopyMultiInsertBuffer *buffer = (CopyMultiInsertBuffer *) lfirst(lc);
-
-		CopyMultiInsertBufferFlush(miinfo, buffer);
-	}
-
-	miinfo->bufferedTuples = 0;
-	miinfo->bufferedBytes = 0;
-
-	/*
-	 * Trim the list of tracked buffers down if it exceeds the limit.  Here we
-	 * remove buffers starting with the ones we created first.  It seems less
-	 * likely that these older ones will be needed than the ones that were
-	 * just created.
-	 */
-	while (list_length(miinfo->multiInsertBuffers) > MAX_PARTITION_BUFFERS)
-	{
-		CopyMultiInsertBuffer *buffer;
-
-		buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
-
-		/*
-		 * We never want to remove the buffer that's currently being used, so
-		 * if we happen to find that then move it to the end of the list.
-		 */
-		if (buffer->resultRelInfo == curr_rri)
-		{
-			miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
-			miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
-			buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
-		}
-
-		CopyMultiInsertBufferCleanup(miinfo, buffer);
-		miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
-	}
-}
-
-/*
- * Cleanup allocated buffers and free memory
- */
-static inline void
-CopyMultiInsertInfoCleanup(CopyMultiInsertInfo *miinfo)
-{
-	ListCell   *lc;
-
-	foreach(lc, miinfo->multiInsertBuffers)
-		CopyMultiInsertBufferCleanup(miinfo, lfirst(lc));
-
-	list_free(miinfo->multiInsertBuffers);
-}
-
-/*
- * Get the next TupleTableSlot that the next tuple should be stored in.
- *
- * Callers must ensure that the buffer is not full.
- *
- * Note: 'miinfo' is unused but has been included for consistency with the
- * other functions in this area.
- */
-static inline TupleTableSlot *
-CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo,
-								ResultRelInfo *rri)
-{
-	CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
-	int			nused = buffer->nused;
-
-	Assert(buffer != NULL);
-	Assert(nused < MAX_BUFFERED_TUPLES);
-
-	if (buffer->slots[nused] == NULL)
-		buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL);
-	return buffer->slots[nused];
-}
-
-/*
- * Record the previously reserved TupleTableSlot that was reserved by
- * CopyMultiInsertInfoNextFreeSlot as being consumed.
- */
-static inline void
-CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
-						 TupleTableSlot *slot, int tuplen, uint64 lineno)
-{
-	CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
-
-	Assert(buffer != NULL);
-	Assert(slot == buffer->slots[buffer->nused]);
-
-	/* Store the line number so we can properly report any errors later */
-	buffer->linenos[buffer->nused] = lineno;
-
-	/* Record this slot as being used */
-	buffer->nused++;
-
-	/* Update how many tuples are stored and their size */
-	miinfo->bufferedTuples++;
-	miinfo->bufferedBytes += tuplen;
-}
-
 /*
  * Copy FROM file to relation.
  */
@@ -536,7 +177,6 @@ CopyFrom(CopyFromState cstate)
 	int			ti_options = 0; /* start with default options for insert */
 	BulkInsertState bistate = NULL;
 	CopyInsertMethod insertMethod;
-	CopyMultiInsertInfo multiInsertInfo = {0};	/* pacify compiler */
 	uint64		processed = 0;
 	bool		has_before_insert_row_trig;
 	bool		has_instead_insert_row_trig;
@@ -723,7 +363,7 @@ CopyFrom(CopyFromState cstate)
 		 * For partitioned tables we can't support multi-inserts when there
 		 * are any statement level insert triggers. It might be possible to
 		 * allow partitioned tables with such triggers in the future, but for
-		 * now, CopyMultiInsertInfoFlush expects that any before row insert
+		 * now, MultiInsertInfoFlush expects that any before row insert
 		 * and statement level insert triggers are on the same relation.
 		 */
 		insertMethod = CIM_SINGLE;
@@ -771,7 +411,8 @@ CopyFrom(CopyFromState cstate)
 		else
 			insertMethod = CIM_MULTI;
 
-		CopyMultiInsertInfoInit(&multiInsertInfo, resultRelInfo, cstate,
+		MultiInsertInfoInit(&cstate->miinfo, resultRelInfo,
+								cstate->transition_capture,
 								estate, mycid, ti_options);
 	}
 
@@ -834,7 +475,7 @@ CopyFrom(CopyFromState cstate)
 			Assert(resultRelInfo == target_resultRelInfo);
 			Assert(insertMethod == CIM_MULTI);
 
-			myslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
+			myslot = MultiInsertInfoNextFreeSlot(&cstate->miinfo,
 													 resultRelInfo);
 		}
 
@@ -903,18 +544,18 @@ CopyFrom(CopyFromState cstate)
 				/* Set the multi-insert buffer to use for this partition. */
 				if (leafpart_use_multi_insert)
 				{
-					if (resultRelInfo->ri_CopyMultiInsertBuffer == NULL)
-						CopyMultiInsertInfoSetupBuffer(&multiInsertInfo,
+					if (resultRelInfo->ri_MultiInsertBuffer == NULL)
+						MultiInsertInfoSetupBuffer(&cstate->miinfo,
 													   resultRelInfo);
 				}
 				else if (insertMethod == CIM_MULTI_CONDITIONAL &&
-						 !CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
+						 !MultiInsertInfoIsEmpty(&cstate->miinfo))
 				{
 					/*
 					 * Flush pending inserts if this partition can't use
 					 * batching, so rows are visible to triggers etc.
 					 */
-					CopyMultiInsertInfoFlush(&multiInsertInfo, resultRelInfo);
+					MultiInsertInfoFlush(&cstate->miinfo, resultRelInfo);
 				}
 
 				if (bistate != NULL)
@@ -960,7 +601,7 @@ CopyFrom(CopyFromState cstate)
 				/* no other path available for partitioned table */
 				Assert(insertMethod == CIM_MULTI_CONDITIONAL);
 
-				batchslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
+				batchslot = MultiInsertInfoNextFreeSlot(&cstate->miinfo,
 															resultRelInfo);
 
 				if (map != NULL)
@@ -1040,17 +681,17 @@ CopyFrom(CopyFromState cstate)
 					ExecMaterializeSlot(myslot);
 
 					/* Add this tuple to the tuple buffer */
-					CopyMultiInsertInfoStore(&multiInsertInfo,
+					MultiInsertInfoStore(&cstate->miinfo,
 											 resultRelInfo, myslot,
 											 cstate->line_buf.len,
-											 cstate->cur_lineno);
+											 cstate->miinfo.cur_lineno);
 
 					/*
 					 * If enough inserts have queued up, then flush all
 					 * buffers out to their tables.
 					 */
-					if (CopyMultiInsertInfoIsFull(&multiInsertInfo))
-						CopyMultiInsertInfoFlush(&multiInsertInfo, resultRelInfo);
+					if (MultiInsertInfoIsFull(&cstate->miinfo))
+						MultiInsertInfoFlush(&cstate->miinfo, resultRelInfo);
 				}
 				else
 				{
@@ -1109,8 +750,8 @@ CopyFrom(CopyFromState cstate)
 	/* Flush any remaining buffered tuples */
 	if (insertMethod != CIM_SINGLE)
 	{
-		if (!CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
-			CopyMultiInsertInfoFlush(&multiInsertInfo, NULL);
+		if (!MultiInsertInfoIsEmpty(&cstate->miinfo))
+			MultiInsertInfoFlush(&cstate->miinfo, NULL);
 	}
 
 	/* Done, clean up */
@@ -1144,7 +785,7 @@ CopyFrom(CopyFromState cstate)
 
 	/* Tear down the multi-insert buffer data */
 	if (insertMethod != CIM_SINGLE)
-		CopyMultiInsertInfoCleanup(&multiInsertInfo);
+		MultiInsertInfoCleanup(&cstate->miinfo);
 
 	/* Close all the partitioned tables, leaf partitions, and their indices */
 	if (proute)
@@ -1323,7 +964,6 @@ BeginCopyFrom(ParseState *pstate,
 	cstate->reached_eof = false;
 	cstate->eol_type = EOL_UNKNOWN;
 	cstate->cur_relname = RelationGetRelationName(cstate->rel);
-	cstate->cur_lineno = 0;
 	cstate->cur_attname = NULL;
 	cstate->cur_attval = NULL;
 
diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c
index 34ed3cfcd5..606268be04 100644
--- a/src/backend/commands/copyfromparse.c
+++ b/src/backend/commands/copyfromparse.c
@@ -456,14 +456,14 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields)
 	Assert(!cstate->opts.binary);
 
 	/* on input just throw the header line away */
-	if (cstate->cur_lineno == 0 && cstate->opts.header_line)
+	if (cstate->miinfo.cur_lineno == 0 && cstate->opts.header_line)
 	{
-		cstate->cur_lineno++;
+		cstate->miinfo.cur_lineno++;
 		if (CopyReadLine(cstate))
 			return false;		/* done */
 	}
 
-	cstate->cur_lineno++;
+	cstate->miinfo.cur_lineno++;
 
 	/* Actually read the line into memory here */
 	done = CopyReadLine(cstate);
@@ -604,7 +604,7 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext,
 		int16		fld_count;
 		ListCell   *cur;
 
-		cstate->cur_lineno++;
+		cstate->miinfo.cur_lineno++;
 
 		if (!CopyGetInt16(cstate, &fld_count))
 		{
@@ -912,7 +912,7 @@ CopyReadLineText(CopyFromState cstate)
 			 * at all --- is cur_lineno a physical or logical count?)
 			 */
 			if (in_quote && c == (cstate->eol_type == EOL_NL ? '\n' : '\r'))
-				cstate->cur_lineno++;
+				cstate->miinfo.cur_lineno++;
 		}
 
 		/* Process \r */
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index 7179f589f9..855a89b570 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -1247,7 +1247,7 @@ InitResultRelInfo(ResultRelInfo *resultRelInfo,
 													 * ExecInitRoutingInfo */
 	resultRelInfo->ri_PartitionTupleSlot = NULL;	/* ditto */
 	resultRelInfo->ri_ChildToRootMap = NULL;
-	resultRelInfo->ri_CopyMultiInsertBuffer = NULL;
+	resultRelInfo->ri_MultiInsertBuffer = NULL;
 }
 
 /*
diff --git a/src/backend/executor/execPartition.c b/src/backend/executor/execPartition.c
index 86594bd056..1f8ba785db 100644
--- a/src/backend/executor/execPartition.c
+++ b/src/backend/executor/execPartition.c
@@ -994,7 +994,7 @@ ExecInitRoutingInfo(ModifyTableState *mtstate,
 		partRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL)
 		partRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate, partRelInfo);
 
-	partRelInfo->ri_CopyMultiInsertBuffer = NULL;
+	partRelInfo->ri_MultiInsertBuffer = NULL;
 
 	/*
 	 * Keep track of it in the PartitionTupleRouting->partitions array.
diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c
index e0f24283b8..3428d9f48a 100644
--- a/src/backend/executor/nodeModifyTable.c
+++ b/src/backend/executor/nodeModifyTable.c
@@ -43,6 +43,7 @@
 #include "access/xact.h"
 #include "catalog/catalog.h"
 #include "commands/trigger.h"
+#include "commands/copy.h"
 #include "executor/execPartition.h"
 #include "executor/executor.h"
 #include "executor/nodeModifyTable.h"
@@ -72,6 +73,8 @@ static TupleTableSlot *ExecPrepareTupleRouting(ModifyTableState *mtstate,
 											   ResultRelInfo *targetRelInfo,
 											   TupleTableSlot *slot,
 											   ResultRelInfo **partRelInfo);
+/* guc */
+int bulk_insert_ntuples = 1000;
 
 /*
  * Verify that the tuples to be produced by INSERT or UPDATE match the
@@ -389,6 +392,8 @@ ExecInsert(ModifyTableState *mtstate,
 	ModifyTable *node = (ModifyTable *) mtstate->ps.plan;
 	OnConflictAction onconflict = node->onConflictAction;
 	PartitionTupleRouting *proute = mtstate->mt_partition_tuple_routing;
+	TupleTableSlot *batchslot = NULL;
+	bool	use_multi_insert = false;
 
 	/*
 	 * If the input result relation is a partitioned table, find the leaf
@@ -408,6 +413,66 @@ ExecInsert(ModifyTableState *mtstate,
 
 	resultRelationDesc = resultRelInfo->ri_RelationDesc;
 
+	/* Use bulk insert after a threshold number of tuples */
+	// XXX: maybe this should only be done if it's not a partitioned table or
+	// if the partitions don't support miinfo, which uses its own bistates
+	mtstate->ntuples++;
+	if (mtstate->bistate == NULL &&
+			mtstate->operation == CMD_INSERT &&
+			mtstate->ntuples > bulk_insert_ntuples &&
+			bulk_insert_ntuples >= 0)
+	{
+		elog(DEBUG1, "enabling bulk insert");
+		mtstate->bistate = GetBulkInsertState();
+	}
+
+	if (!mtstate->miinfo)
+	{
+		/*
+		 * If multi-inserts aren't possible for this statement at all, so don't
+		 * check further
+		 */
+	} else if (proute == NULL)
+	{
+		if (mtstate->miinfo->ntuples++ >= bulk_insert_ntuples &&
+			bulk_insert_ntuples >= 0)
+			use_multi_insert = true;
+	}
+	else
+	{
+		/*
+		 * If a partitioned table itself allows multi-insert, and bistate
+		 * indicates we've inserted the threshold number of tuples, check if
+		 * the partition also supports it.
+		 */
+
+		/* Determine which triggers exist on this partition */
+		// XXX copyfrom.c only checks triggers when the partition changes,
+		// so maybe use_multi_insert should be in mtstate ?
+		bool has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
+				resultRelInfo->ri_TrigDesc->trig_insert_before_row);
+
+		bool has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
+				resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
+
+		/*
+		 * Disable multi-inserts when the partition has BEFORE/INSTEAD
+		 * OF triggers, or if the partition is a foreign partition.
+		 * The number of tuples eligible for multi-insert is tracked separately
+		 * from the total number of tuples in case it's not supported for some
+		 * partitions.
+		 */
+		if (!has_before_insert_row_trig &&
+			!has_instead_insert_row_trig &&
+			resultRelInfo->ri_FdwRoutine == NULL &&
+			mtstate->miinfo->ntuples++ >= bulk_insert_ntuples &&
+			bulk_insert_ntuples >= 0)
+			use_multi_insert = true;
+	}
+
+	if (use_multi_insert && mtstate->miinfo->ntuples - 1 == bulk_insert_ntuples)
+		elog(DEBUG1, "enabling multi insert");
+
 	/*
 	 * BEFORE ROW INSERT Triggers.
 	 *
@@ -594,7 +659,7 @@ ExecInsert(ModifyTableState *mtstate,
 			table_tuple_insert_speculative(resultRelationDesc, slot,
 										   estate->es_output_cid,
 										   0,
-										   NULL,
+										   mtstate->bistate,
 										   specToken);
 
 			/* insert index entries for tuple */
@@ -629,12 +694,39 @@ ExecInsert(ModifyTableState *mtstate,
 
 			/* Since there was no insertion conflict, we're done */
 		}
+		else if (use_multi_insert)
+		{
+			if (resultRelInfo->ri_MultiInsertBuffer == NULL)
+				MultiInsertInfoSetupBuffer(mtstate->miinfo, resultRelInfo);
+
+			batchslot = MultiInsertInfoNextFreeSlot(mtstate->miinfo, resultRelInfo);
+			ExecCopySlot(batchslot, slot);
+
+			MultiInsertInfoStore(mtstate->miinfo, resultRelInfo, batchslot, 0, 0); // XXX: tuplen/lineno
+
+			if (MultiInsertInfoIsFull(mtstate->miinfo))
+				MultiInsertInfoFlush(mtstate->miinfo, resultRelInfo);
+		}
 		else
 		{
+			if (proute && mtstate->prevResultRelInfo != resultRelInfo)
+			{
+				if (mtstate->bistate)
+					ReleaseBulkInsertStatePin(mtstate->bistate);
+				mtstate->prevResultRelInfo = resultRelInfo;
+			}
+
+			/*
+			 * Flush pending inserts if this partition can't use
+			 * batching, so rows are visible to triggers etc.
+			 */
+			if (mtstate->miinfo)
+				MultiInsertInfoFlush(mtstate->miinfo, resultRelInfo);
+
 			/* insert the tuple normally */
 			table_tuple_insert(resultRelationDesc, slot,
 							   estate->es_output_cid,
-							   0, NULL);
+							   0, mtstate->bistate);
 
 			/* insert index entries for tuple */
 			if (resultRelInfo->ri_NumIndices > 0)
@@ -647,32 +739,36 @@ ExecInsert(ModifyTableState *mtstate,
 	if (canSetTag)
 		(estate->es_processed)++;
 
-	/*
-	 * If this insert is the result of a partition key update that moved the
-	 * tuple to a new partition, put this row into the transition NEW TABLE,
-	 * if there is one. We need to do this separately for DELETE and INSERT
-	 * because they happen on different tables.
-	 */
-	ar_insert_trig_tcs = mtstate->mt_transition_capture;
-	if (mtstate->operation == CMD_UPDATE && mtstate->mt_transition_capture
-		&& mtstate->mt_transition_capture->tcs_update_new_table)
+	/* Triggers were already run in the batch insert case */
+	if (batchslot == NULL)
 	{
-		ExecARUpdateTriggers(estate, resultRelInfo, NULL,
-							 NULL,
-							 slot,
-							 NULL,
-							 mtstate->mt_transition_capture);
-
 		/*
-		 * We've already captured the NEW TABLE row, so make sure any AR
-		 * INSERT trigger fired below doesn't capture it again.
+		 * If this insert is the result of a partition key update that moved the
+		 * tuple to a new partition, put this row into the transition NEW TABLE,
+		 * if there is one. We need to do this separately for DELETE and INSERT
+		 * because they happen on different tables.
 		 */
-		ar_insert_trig_tcs = NULL;
-	}
+		ar_insert_trig_tcs = mtstate->mt_transition_capture;
+		if (mtstate->operation == CMD_UPDATE && mtstate->mt_transition_capture
+			&& mtstate->mt_transition_capture->tcs_update_new_table)
+		{
+			ExecARUpdateTriggers(estate, resultRelInfo, NULL,
+								 NULL,
+								 slot,
+								 NULL,
+								 mtstate->mt_transition_capture);
+
+			/*
+			 * We've already captured the NEW TABLE row, so make sure any AR
+			 * INSERT trigger fired below doesn't capture it again.
+			 */
+			ar_insert_trig_tcs = NULL;
+		}
 
-	/* AFTER ROW INSERT Triggers */
-	ExecARInsertTriggers(estate, resultRelInfo, slot, recheckIndexes,
-						 ar_insert_trig_tcs);
+		/* AFTER ROW INSERT Triggers */
+		ExecARInsertTriggers(estate, resultRelInfo, slot, recheckIndexes,
+							 ar_insert_trig_tcs);
+	}
 
 	list_free(recheckIndexes);
 
@@ -2229,6 +2325,45 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
 
 	mtstate->mt_arowmarks = (List **) palloc0(sizeof(List *) * nplans);
 	mtstate->mt_nplans = nplans;
+	mtstate->bistate = NULL;
+
+	/*
+	 * Set miinfo if it can support multi-insert. This is the equivalent of
+	 * CIM_MULTI_* et al in copyfrom.c
+	 */
+
+	if (operation != CMD_INSERT ||
+			node->onConflictAction != ONCONFLICT_NONE)
+		mtstate->miinfo = NULL;
+	else if (mtstate->rootResultRelInfo->ri_TrigDesc != NULL &&
+			(mtstate->rootResultRelInfo->ri_TrigDesc->trig_insert_before_row ||
+			 // mtstate->rootResultRelInfo->ri_TrigDesc->trig_insert_after_row || // XXX or any row level triggers at all?
+			 mtstate->rootResultRelInfo->ri_TrigDesc->trig_insert_instead_row))
+		/*
+		 * Can't support multi-inserts when there are any BEFORE/INSTEAD OF
+		 * triggers on the table.
+		 */
+		mtstate->miinfo = NULL;
+	else if (node->rootRelation > 0 &&
+			mtstate->rootResultRelInfo->ri_TrigDesc != NULL &&
+			 mtstate->rootResultRelInfo->ri_TrigDesc->trig_insert_new_table)
+		/*
+		 * For partitioned tables we can't support multi-inserts when there
+		 * are any statement level insert triggers.
+		 */
+		mtstate->miinfo = NULL;
+	else if (mtstate->rootResultRelInfo->ri_FdwRoutine != NULL
+			/* || cstate->volatile_defexprs */ )
+		// XXX contain_volatile_functions_not_nextval((Node *) defexpr);
+		/* Can't support multi-inserts to foreign tables or if there are any */
+		mtstate->miinfo = NULL;
+	else
+	{
+		mtstate->miinfo = calloc(sizeof(*mtstate->miinfo), 1);
+		MultiInsertInfoInit(mtstate->miinfo, mtstate->rootResultRelInfo,
+				mtstate->mt_transition_capture,
+				estate, GetCurrentCommandId(true), 0);
+	}
 
 	/* set up epqstate with dummy subplan data for the moment */
 	EvalPlanQualInit(&mtstate->mt_epqstate, estate, NULL, NIL, node->epqParam);
@@ -2695,6 +2830,19 @@ ExecEndModifyTable(ModifyTableState *node)
 														   resultRelInfo);
 	}
 
+	if (node->bistate)
+	{
+		FreeBulkInsertState(node->bistate);
+		table_finish_bulk_insert(node->rootResultRelInfo->ri_RelationDesc, 0);
+	}
+
+	if (node->miinfo)
+	{
+		if (!MultiInsertInfoIsEmpty(node->miinfo))
+			 MultiInsertInfoFlush(node->miinfo, node->resultRelInfo); // root ?
+		MultiInsertInfoCleanup(node->miinfo);
+	}
+
 	/*
 	 * Close all the partitioned tables, leaf partitions, and their indices
 	 * and release the slot used for tuple routing, if set.
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 635d91d50a..1401217616 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -54,6 +54,7 @@
 #include "libpq/pqformat.h"
 #include "miscadmin.h"
 #include "optimizer/cost.h"
+#include "executor/nodeModifyTable.h"
 #include "optimizer/geqo.h"
 #include "optimizer/optimizer.h"
 #include "optimizer/paths.h"
@@ -3399,6 +3400,15 @@ static struct config_int ConfigureNamesInt[] =
 		check_huge_page_size, NULL, NULL
 	},
 
+	{
+		{"bulk_insert_ntuples", PGC_USERSET, CLIENT_CONN_STATEMENT,
+			gettext_noop("Enable bulk insertions after this number of tuples."),
+			gettext_noop("A ring buffer of limited size will be used and updates done in batch"),
+		},
+		&bulk_insert_ntuples,
+		1000, -1, INT_MAX,
+	},
+
 	/* End-of-list marker */
 	{
 		{NULL, 0, 0, NULL, NULL}, NULL, 0, 0, 0, NULL, NULL, NULL
diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h
index c15ea803c3..c0603e13ea 100644
--- a/src/include/commands/copyfrom_internal.h
+++ b/src/include/commands/copyfrom_internal.h
@@ -15,6 +15,7 @@
 #define COPYFROM_INTERNAL_H
 
 #include "commands/copy.h"
+#include "executor/nodeModifyTable.h"
 #include "commands/trigger.h"
 
 /*
@@ -92,10 +93,12 @@ typedef struct CopyFromStateData
 
 	/* these are just for error messages, see CopyFromErrorCallback */
 	const char *cur_relname;	/* table name for error messages */
-	uint64		cur_lineno;		/* line number for error messages */
 	const char *cur_attname;	/* current att for error messages */
 	const char *cur_attval;		/* current att value for error messages */
 
+	/* For bulk inserts and for error callback */
+	MultiInsertInfo miinfo;
+
 	/*
 	 * Working state
 	 */
diff --git a/src/include/executor/nodeModifyTable.h b/src/include/executor/nodeModifyTable.h
index 46a2dc9511..71de7cf80e 100644
--- a/src/include/executor/nodeModifyTable.h
+++ b/src/include/executor/nodeModifyTable.h
@@ -13,8 +13,12 @@
 #ifndef NODEMODIFYTABLE_H
 #define NODEMODIFYTABLE_H
 
+#include "commands/trigger.h"
+#include "executor/executor.h" // XXX
 #include "nodes/execnodes.h"
 
+extern PGDLLIMPORT int bulk_insert_ntuples;
+
 extern void ExecComputeStoredGenerated(ResultRelInfo *resultRelInfo,
 									   EState *estate, TupleTableSlot *slot,
 									   CmdType cmdtype);
@@ -23,4 +27,370 @@ extern ModifyTableState *ExecInitModifyTable(ModifyTable *node, EState *estate,
 extern void ExecEndModifyTable(ModifyTableState *node);
 extern void ExecReScanModifyTable(ModifyTableState *node);
 
+/* Bulk insert stuff which used to live in copy.c */
+
+/*
+ * No more than this many tuples per MultiInsertBuffer
+ *
+ * Caution: Don't make this too big, as we could end up with this many
+ * MultiInsertBuffer items stored in MultiInsertInfo's
+ * multiInsertBuffers list.  Increasing this can cause quadratic growth in
+ * memory requirements during copies into partitioned tables with a large
+ * number of partitions.
+ */
+#define MAX_BUFFERED_TUPLES		1000
+
+/*
+ * Flush buffers if there are >= this many bytes, as counted by the input
+ * size, of tuples stored.
+ */
+#define MAX_BUFFERED_BYTES		65535
+
+/* Trim the list of buffers back down to this number after flushing */
+#define MAX_PARTITION_BUFFERS	32
+
+/* Stores multi-insert data related to a single relation in CopyFrom. */
+typedef struct MultiInsertBuffer
+{
+	TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */
+	ResultRelInfo *resultRelInfo;	/* ResultRelInfo for 'relid' */
+	BulkInsertState bistate;	/* BulkInsertState for this rel */
+	int			nused;			/* number of 'slots' containing tuples */
+	uint64		linenos[MAX_BUFFERED_TUPLES];	/* Line # of tuple in copy
+												 * stream */
+} MultiInsertBuffer;
+
+/*
+ * Stores one or many MultiInsertBuffers and details about the size and
+ * number of tuples which are stored in them.  This allows multiple buffers to
+ * exist at once when COPY/INSERTing into a partitioned table.
+ */
+typedef struct MultiInsertInfo
+{
+	List	   *multiInsertBuffers; /* List of tracked MultiInsertBuffers */
+	int			bufferedTuples; /* number of tuples buffered over all buffers */
+	int			bufferedBytes;	/* number of bytes from all buffered tuples */
+	TransitionCaptureState	*transition_capture;
+	EState	   *estate;			/* Executor state */
+	CommandId	mycid;			/* Command Id */
+	int			ti_options;		/* table insert options */
+	size_t		ntuples;		/* Number of rows *eligible* for multi-insert */
+
+	/* Line number for errors in copyfrom.c */
+	uint64		cur_lineno;
+	bool		line_buf_valid;
+} MultiInsertInfo;
+
+
+/*
+ * Allocate memory and initialize a new MultiInsertBuffer for this
+ * ResultRelInfo.
+ */
+static MultiInsertBuffer *
+MultiInsertBufferInit(ResultRelInfo *rri)
+{
+	MultiInsertBuffer *buffer;
+
+	buffer = (MultiInsertBuffer *) palloc(sizeof(MultiInsertBuffer));
+	memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
+	buffer->resultRelInfo = rri;
+	buffer->bistate = GetBulkInsertState();
+	buffer->nused = 0;
+
+	return buffer;
+}
+
+/*
+ * Make a new buffer for this ResultRelInfo.
+ */
+static inline void
+MultiInsertInfoSetupBuffer(MultiInsertInfo *miinfo,
+							   ResultRelInfo *rri)
+{
+	MultiInsertBuffer *buffer;
+
+	buffer = MultiInsertBufferInit(rri);
+
+	/* Setup back-link so we can easily find this buffer again */
+	rri->ri_MultiInsertBuffer = buffer;
+	/* Record that we're tracking this buffer */
+	miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
+}
+
+/*
+ * Initialize an already allocated MultiInsertInfo.
+ *
+ * If rri is a non-partitioned table then a MultiInsertBuffer is set up
+ * for that table.
+ */
+static inline void
+MultiInsertInfoInit(MultiInsertInfo *miinfo, ResultRelInfo *rri,
+						TransitionCaptureState *transition_capture,
+						EState *estate, CommandId mycid, int ti_options)
+{
+	miinfo->multiInsertBuffers = NIL;
+	miinfo->bufferedTuples = 0;
+	miinfo->bufferedBytes = 0;
+	miinfo->transition_capture = transition_capture;
+	miinfo->estate = estate;
+	miinfo->mycid = mycid;
+	miinfo->ti_options = ti_options;
+	miinfo->cur_lineno = 0;
+
+	/*
+	 * Only setup the buffer when not dealing with a partitioned table.
+	 * Buffers for partitioned tables will just be setup when we need to send
+	 * tuples their way for the first time.
+	 */
+	if (rri->ri_RelationDesc->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
+		MultiInsertInfoSetupBuffer(miinfo, rri);
+}
+
+/*
+ * Returns true if the buffers are full
+ */
+static inline bool
+MultiInsertInfoIsFull(MultiInsertInfo *miinfo)
+{
+	if (miinfo->bufferedTuples >= MAX_BUFFERED_TUPLES ||
+		miinfo->bufferedBytes >= MAX_BUFFERED_BYTES)
+		return true;
+	return false;
+}
+
+/*
+ * Returns true if we have no buffered tuples
+ */
+static inline bool
+MultiInsertInfoIsEmpty(MultiInsertInfo *miinfo)
+{
+	return miinfo->bufferedTuples == 0;
+}
+
+/*
+ * Write the tuples stored in 'buffer' out to the table.
+ */
+static inline void
+MultiInsertBufferFlush(MultiInsertInfo *miinfo,
+						   MultiInsertBuffer *buffer)
+{
+	MemoryContext oldcontext;
+	int			i;
+	uint64		save_cur_lineno;
+	EState	   *estate = miinfo->estate;
+	CommandId	mycid = miinfo->mycid;
+	int			ti_options = miinfo->ti_options;
+	bool		line_buf_valid = miinfo->line_buf_valid;
+	int			nused = buffer->nused;
+	ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
+	TupleTableSlot **slots = buffer->slots;
+
+	/*
+	 * Print error context information correctly, if one of the operations
+	 * below fail.
+	 */
+	miinfo->line_buf_valid = false;
+	save_cur_lineno = miinfo->cur_lineno;
+
+	/*
+	 * table_multi_insert may leak memory, so switch to short-lived memory
+	 * context before calling it.
+	 */
+	oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); // XXX requires executor.h
+	table_multi_insert(resultRelInfo->ri_RelationDesc,
+					   slots,
+					   nused,
+					   mycid,
+					   ti_options,
+					   buffer->bistate);
+	MemoryContextSwitchTo(oldcontext);
+
+	for (i = 0; i < nused; i++)
+	{
+		/*
+		 * If there are any indexes, update them for all the inserted tuples,
+		 * and run AFTER ROW INSERT triggers.
+		 */
+		if (resultRelInfo->ri_NumIndices > 0)
+		{
+			List	   *recheckIndexes;
+
+			miinfo->cur_lineno = buffer->linenos[i];
+			recheckIndexes =
+				ExecInsertIndexTuples(resultRelInfo,
+									  buffer->slots[i], estate, false, NULL,
+									  NIL);
+			ExecARInsertTriggers(estate, resultRelInfo,
+								 slots[i], recheckIndexes,
+								 miinfo->transition_capture);
+			list_free(recheckIndexes);
+		}
+
+		/*
+		 * There's no indexes, but see if we need to run AFTER ROW INSERT
+		 * triggers anyway.
+		 */
+		else if (resultRelInfo->ri_TrigDesc != NULL &&
+				 (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
+				  resultRelInfo->ri_TrigDesc->trig_insert_new_table))
+		{
+			miinfo->cur_lineno = buffer->linenos[i];
+			ExecARInsertTriggers(estate, resultRelInfo,
+								 slots[i], NIL, miinfo->transition_capture);
+		}
+
+		ExecClearTuple(slots[i]);
+	}
+
+	/* Mark that all slots are free */
+	buffer->nused = 0;
+
+	/* reset cur_lineno and line_buf_valid to what they were */
+	miinfo->line_buf_valid = line_buf_valid;
+	miinfo->cur_lineno = save_cur_lineno;
+}
+
+/*
+ * Drop used slots and free member for this buffer.
+ *
+ * The buffer must be flushed before cleanup.
+ */
+static inline void
+MultiInsertBufferCleanup(MultiInsertInfo *miinfo,
+							 MultiInsertBuffer *buffer)
+{
+	int			i;
+
+	/* Ensure buffer was flushed */
+	Assert(buffer->nused == 0);
+
+	/* Remove back-link to ourself */
+	buffer->resultRelInfo->ri_MultiInsertBuffer = NULL;
+
+	FreeBulkInsertState(buffer->bistate);
+
+	/* Since we only create slots on demand, just drop the non-null ones. */
+	for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
+		ExecDropSingleTupleTableSlot(buffer->slots[i]);
+
+	table_finish_bulk_insert(buffer->resultRelInfo->ri_RelationDesc,
+							 miinfo->ti_options);
+
+	pfree(buffer);
+}
+
+/*
+ * Write out all stored tuples in all buffers out to the tables.
+ *
+ * Once flushed we also trim the tracked buffers list down to size by removing
+ * the buffers created earliest first.
+ *
+ * Callers should pass 'curr_rri' is the ResultRelInfo that's currently being
+ * used.  When cleaning up old buffers we'll never remove the one for
+ * 'curr_rri'.
+ */
+static inline void
+MultiInsertInfoFlush(MultiInsertInfo *miinfo, ResultRelInfo *curr_rri)
+{
+	ListCell   *lc;
+
+	foreach(lc, miinfo->multiInsertBuffers)
+	{
+		MultiInsertBuffer *buffer = (MultiInsertBuffer *) lfirst(lc);
+
+		MultiInsertBufferFlush(miinfo, buffer);
+	}
+
+	miinfo->bufferedTuples = 0;
+	miinfo->bufferedBytes = 0;
+
+	/*
+	 * Trim the list of tracked buffers down if it exceeds the limit.  Here we
+	 * remove buffers starting with the ones we created first.  It seems less
+	 * likely that these older ones will be needed than the ones that were
+	 * just created.
+	 */
+	while (list_length(miinfo->multiInsertBuffers) > MAX_PARTITION_BUFFERS)
+	{
+		MultiInsertBuffer *buffer;
+
+		buffer = (MultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
+
+		/*
+		 * We never want to remove the buffer that's currently being used, so
+		 * if we happen to find that then move it to the end of the list.
+		 */
+		if (buffer->resultRelInfo == curr_rri)
+		{
+			miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
+			miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
+			buffer = (MultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
+		}
+
+		MultiInsertBufferCleanup(miinfo, buffer);
+		miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
+	}
+}
+
+/*
+ * Cleanup allocated buffers and free memory
+ */
+static inline void
+MultiInsertInfoCleanup(MultiInsertInfo *miinfo)
+{
+	ListCell   *lc;
+
+	foreach(lc, miinfo->multiInsertBuffers)
+		MultiInsertBufferCleanup(miinfo, lfirst(lc));
+
+	list_free(miinfo->multiInsertBuffers);
+}
+
+/*
+ * Get the next TupleTableSlot that the next tuple should be stored in.
+ *
+ * Callers must ensure that the buffer is not full.
+ *
+ * Note: 'miinfo' is unused but has been included for consistency with the
+ * other functions in this area.
+ */
+static inline TupleTableSlot *
+MultiInsertInfoNextFreeSlot(MultiInsertInfo *miinfo,
+								ResultRelInfo *rri)
+{
+	MultiInsertBuffer *buffer = rri->ri_MultiInsertBuffer;
+	int			nused = buffer->nused;
+
+	Assert(buffer != NULL);
+	Assert(nused < MAX_BUFFERED_TUPLES);
+
+	if (buffer->slots[nused] == NULL)
+		buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL);
+	return buffer->slots[nused];
+}
+
+/*
+ * Record the previously reserved TupleTableSlot that was reserved by
+ * MultiInsertInfoNextFreeSlot as being consumed.
+ */
+static inline void
+MultiInsertInfoStore(MultiInsertInfo *miinfo, ResultRelInfo *rri,
+						 TupleTableSlot *slot, int tuplen, uint64 lineno)
+{
+	MultiInsertBuffer *buffer = rri->ri_MultiInsertBuffer;
+
+	Assert(buffer != NULL);
+	Assert(slot == buffer->slots[buffer->nused]);
+
+	/* Store the line number so we can properly report any errors later */
+	buffer->linenos[buffer->nused] = lineno;
+
+	/* Record this slot as being used */
+	buffer->nused++;
+
+	/* Update how many tuples are stored and their size */
+	miinfo->bufferedTuples++;
+	miinfo->bufferedBytes += tuplen;
+}
+
 #endif							/* NODEMODIFYTABLE_H */
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 61ba4c3666..477b326d06 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -14,6 +14,7 @@
 #ifndef EXECNODES_H
 #define EXECNODES_H
 
+#include "access/heapam.h"
 #include "access/tupconvert.h"
 #include "executor/instrument.h"
 #include "fmgr.h"
@@ -32,6 +33,9 @@
 #include "utils/tuplesort.h"
 #include "utils/tuplestore.h"
 
+/* This would be a circular inclusion */
+// #include "executor/nodeModifyTable.h"
+
 struct PlanState;				/* forward references in this file */
 struct ParallelHashJoinState;
 struct ExecRowMark;
@@ -39,8 +43,8 @@ struct ExprState;
 struct ExprContext;
 struct RangeTblEntry;			/* avoid including parsenodes.h here */
 struct ExprEvalStep;			/* avoid including execExpr.h everywhere */
-struct CopyMultiInsertBuffer;
-
+// struct MultiInsertBuffer;
+// struct MultiInsertInfo;
 
 /* ----------------
  *		ExprState node
@@ -498,8 +502,8 @@ typedef struct ResultRelInfo
 	 */
 	TupleConversionMap *ri_ChildToRootMap;
 
-	/* for use by copyfrom.c when performing multi-inserts */
-	struct CopyMultiInsertBuffer *ri_CopyMultiInsertBuffer;
+	/* for use by copyfrom.c/modifyTable when performing multi-inserts */
+	struct MultiInsertBuffer *ri_MultiInsertBuffer;
 } ResultRelInfo;
 
 /* ----------------
@@ -1165,6 +1169,10 @@ typedef struct ModifyTableState
 	List	  **mt_arowmarks;	/* per-subplan ExecAuxRowMark lists */
 	EPQState	mt_epqstate;	/* for evaluating EvalPlanQual rechecks */
 	bool		fireBSTriggers; /* do we need to fire stmt triggers? */
+	BulkInsertState	bistate;	/* state for bulk insert like INSERT SELECT, when miinfo cannot be used */
+	ResultRelInfo	*prevResultRelInfo; /* last child inserted with bistate */
+	struct MultiInsertInfo	*miinfo;
+	size_t		ntuples;	/* Number of tuples inserted; */
 
 	/*
 	 * Slot for storing tuples in the root partitioned table's rowtype during
diff --git a/src/test/regress/expected/insert.out b/src/test/regress/expected/insert.out
index da50ee3b67..bc4c1a4fc2 100644
--- a/src/test/regress/expected/insert.out
+++ b/src/test/regress/expected/insert.out
@@ -462,6 +462,49 @@ Partitions: part_aa_bb FOR VALUES IN ('aa', 'bb'),
             part_xx_yy FOR VALUES IN ('xx', 'yy'), PARTITIONED,
             part_default DEFAULT, PARTITIONED
 
+-- bulk inserts
+truncate hash_parted;
+begin;
+create index on hash_parted(a);
+-- make sure small inserts are flushed
+insert into hash_parted values(11);
+insert into hpart0 values(12);
+select * from hash_parted;
+ a  
+----
+ 12
+ 11
+(2 rows)
+
+-- exercise bulk insert to partitions
+SET client_min_messages=debug1;
+insert into hash_parted select generate_series(1,9999);
+DEBUG:  enabling bulk insert
+DEBUG:  enabling multi insert
+RESET client_min_messages;
+select count(1) from hash_parted;
+ count 
+-------
+ 10001
+(1 row)
+
+commit;
+-- test that index was updated
+vacuum analyze hash_parted;
+explain(costs off)
+select * from hash_parted where a=13;
+                        QUERY PLAN                        
+----------------------------------------------------------
+ Index Only Scan using hpart1_a_idx on hpart1 hash_parted
+   Index Cond: (a = 13)
+(2 rows)
+
+select * from hash_parted where a=13;
+ a  
+----
+ 13
+(1 row)
+
 -- cleanup
 drop table range_parted, list_parted;
 drop table hash_parted;
diff --git a/src/test/regress/sql/insert.sql b/src/test/regress/sql/insert.sql
index 963faa1614..a74eb3826a 100644
--- a/src/test/regress/sql/insert.sql
+++ b/src/test/regress/sql/insert.sql
@@ -280,6 +280,26 @@ from hash_parted order by part;
 -- partitions
 \d+ list_parted
 
+-- bulk inserts
+truncate hash_parted;
+begin;
+create index on hash_parted(a);
+-- make sure small inserts are flushed
+insert into hash_parted values(11);
+insert into hpart0 values(12);
+select * from hash_parted;
+-- exercise bulk insert to partitions
+SET client_min_messages=debug1;
+insert into hash_parted select generate_series(1,9999);
+RESET client_min_messages;
+select count(1) from hash_parted;
+commit;
+-- test that index was updated
+vacuum analyze hash_parted;
+explain(costs off)
+select * from hash_parted where a=13;
+select * from hash_parted where a=13;
+
 -- cleanup
 drop table range_parted, list_parted;
 drop table hash_parted;
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index cf63acbf6f..72653c16e4 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -422,8 +422,6 @@ ConvertRowtypeExpr
 CookedConstraint
 CopyDest
 CopyInsertMethod
-CopyMultiInsertBuffer
-CopyMultiInsertInfo
 CopyState
 CopyStateData
 CopyStmt
@@ -1388,6 +1386,8 @@ ModifyTableState
 MorphOpaque
 MsgType
 MultiAssignRef
+MultiInsertBuffer
+MultiInsertInfo
 MultiSortSupport
 MultiSortSupportData
 MultiXactId
-- 
2.17.0

>From 0dc38dced6fbf302f1174af60754341998c3ef4f Mon Sep 17 00:00:00 2001
From: Justin Pryzby <pryz...@telsasoft.com>
Date: Tue, 1 Dec 2020 23:11:31 -0600
Subject: [PATCH v8 2/4] WIP: Check for volatile defaults

We want to check if any column *uses* a volatile default value, but after
parsing and rewriting, that information appears to be lost about which column
values are defaults and which were specified.  insertedcols doesn't appear to
be useful for this.  So add a field to track if a TargetEntry is planned with
column default.
---
 src/backend/executor/nodeModifyTable.c | 64 ++++++++++++++++++++++++--
 src/backend/nodes/copyfuncs.c          |  1 +
 src/backend/nodes/equalfuncs.c         |  1 +
 src/backend/nodes/makefuncs.c          |  1 +
 src/backend/nodes/outfuncs.c           |  1 +
 src/backend/nodes/readfuncs.c          |  1 +
 src/backend/optimizer/util/tlist.c     |  1 +
 src/backend/rewrite/rewriteHandler.c   |  3 ++
 src/include/nodes/primnodes.h          |  2 +
 9 files changed, 71 insertions(+), 4 deletions(-)

diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c
index 3428d9f48a..47a5271b91 100644
--- a/src/backend/executor/nodeModifyTable.c
+++ b/src/backend/executor/nodeModifyTable.c
@@ -50,6 +50,7 @@
 #include "foreign/fdwapi.h"
 #include "miscadmin.h"
 #include "nodes/nodeFuncs.h"
+#include "optimizer/optimizer.h"
 #include "rewrite/rewriteHandler.h"
 #include "storage/bufmgr.h"
 #include "storage/lmgr.h"
@@ -2258,6 +2259,61 @@ ExecModifyTable(PlanState *pstate)
 	return NULL;
 }
 
+/*
+ * Determine if a table has volatile column defaults which are used by a given
+ * planned statement (if the column is not specified or specified as DEFAULT).
+ * This works only for INSERT.
+ */
+static bool
+has_volatile_defaults(ResultRelInfo *resultRelInfo, ModifyTable *node)
+{
+	TupleDesc	tupDesc = RelationGetDescr(resultRelInfo->ri_RelationDesc);
+	Plan		*plan;
+
+	Assert(list_length(node->plans) == 1);
+	plan = linitial(node->plans);
+
+	for (int attnum = 1; attnum <= tupDesc->natts; attnum++)
+	{
+		Form_pg_attribute att = TupleDescAttr(tupDesc, attnum - 1);
+		Expr		*defexpr;
+		TargetEntry	*tle;
+
+		/* We don't need to check dropped/generated attributes */
+		if (att->attisdropped || att->attgenerated)
+			continue;
+
+		tle = list_nth(plan->targetlist, attnum - 1);
+		Assert(tle != NULL);
+		Assert(tle->resno == attnum);
+
+		/*
+		 * If the column was specified with a non-default value, then don't
+		 * check the volatility of its default
+		 */
+		if (!tle->isdefault)
+			continue;
+
+		/* Check the column's default value if one exists */
+		defexpr = (Expr *) build_column_default(resultRelInfo->ri_RelationDesc, attnum);
+		if (defexpr == NULL)
+			continue;
+
+		/* Run the expression through planner */
+		// defexpr = expression_planner(defexpr);
+		// (void) ExecInitExpr(defexpr, NULL);
+		expression_planner(defexpr);
+
+		if (contain_volatile_functions_not_nextval((Node *) defexpr))
+		{
+			elog(DEBUG1, "found volatile att %d", attnum);
+			return true;
+		}
+	}
+
+	return false;
+}
+
 /* ----------------------------------------------------------------
  *		ExecInitModifyTable
  * ----------------------------------------------------------------
@@ -2352,10 +2408,10 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
 		 * are any statement level insert triggers.
 		 */
 		mtstate->miinfo = NULL;
-	else if (mtstate->rootResultRelInfo->ri_FdwRoutine != NULL
-			/* || cstate->volatile_defexprs */ )
-		// XXX contain_volatile_functions_not_nextval((Node *) defexpr);
-		/* Can't support multi-inserts to foreign tables or if there are any */
+	else if (mtstate->rootResultRelInfo->ri_FdwRoutine != NULL ||
+			has_volatile_defaults(mtstate->rootResultRelInfo, node))
+		/* Can't support multi-inserts to foreign tables or if there are any
+		 * volatile default expressions in the table. */
 		mtstate->miinfo = NULL;
 	else
 	{
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index 910906f639..3bfa59f1a5 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -2177,6 +2177,7 @@ _copyTargetEntry(const TargetEntry *from)
 	COPY_SCALAR_FIELD(resorigtbl);
 	COPY_SCALAR_FIELD(resorigcol);
 	COPY_SCALAR_FIELD(resjunk);
+	COPY_SCALAR_FIELD(isdefault);
 
 	return newnode;
 }
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
index 687609f59e..84530468b8 100644
--- a/src/backend/nodes/equalfuncs.c
+++ b/src/backend/nodes/equalfuncs.c
@@ -769,6 +769,7 @@ _equalTargetEntry(const TargetEntry *a, const TargetEntry *b)
 	COMPARE_SCALAR_FIELD(resorigtbl);
 	COMPARE_SCALAR_FIELD(resorigcol);
 	COMPARE_SCALAR_FIELD(resjunk);
+	COMPARE_SCALAR_FIELD(isdefault);
 
 	return true;
 }
diff --git a/src/backend/nodes/makefuncs.c b/src/backend/nodes/makefuncs.c
index ee033ae779..1cd14fed3d 100644
--- a/src/backend/nodes/makefuncs.c
+++ b/src/backend/nodes/makefuncs.c
@@ -254,6 +254,7 @@ makeTargetEntry(Expr *expr,
 	tle->ressortgroupref = 0;
 	tle->resorigtbl = InvalidOid;
 	tle->resorigcol = 0;
+	tle->isdefault = false;
 
 	tle->resjunk = resjunk;
 
diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c
index 9c73c605a4..ef6d14e072 100644
--- a/src/backend/nodes/outfuncs.c
+++ b/src/backend/nodes/outfuncs.c
@@ -1660,6 +1660,7 @@ _outTargetEntry(StringInfo str, const TargetEntry *node)
 	WRITE_OID_FIELD(resorigtbl);
 	WRITE_INT_FIELD(resorigcol);
 	WRITE_BOOL_FIELD(resjunk);
+	WRITE_BOOL_FIELD(isdefault);
 }
 
 static void
diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c
index 169d5581b9..ba2288f7b0 100644
--- a/src/backend/nodes/readfuncs.c
+++ b/src/backend/nodes/readfuncs.c
@@ -1274,6 +1274,7 @@ _readTargetEntry(void)
 	READ_OID_FIELD(resorigtbl);
 	READ_INT_FIELD(resorigcol);
 	READ_BOOL_FIELD(resjunk);
+	READ_BOOL_FIELD(isdefault);
 
 	READ_DONE();
 }
diff --git a/src/backend/optimizer/util/tlist.c b/src/backend/optimizer/util/tlist.c
index 02a3c6b165..1043ee7b66 100644
--- a/src/backend/optimizer/util/tlist.c
+++ b/src/backend/optimizer/util/tlist.c
@@ -354,6 +354,7 @@ apply_tlist_labeling(List *dest_tlist, List *src_tlist)
 		dest_tle->resorigtbl = src_tle->resorigtbl;
 		dest_tle->resorigcol = src_tle->resorigcol;
 		dest_tle->resjunk = src_tle->resjunk;
+		dest_tle->isdefault = src_tle->isdefault;
 	}
 }
 
diff --git a/src/backend/rewrite/rewriteHandler.c b/src/backend/rewrite/rewriteHandler.c
index c25012f325..9998db2268 100644
--- a/src/backend/rewrite/rewriteHandler.c
+++ b/src/backend/rewrite/rewriteHandler.c
@@ -986,10 +986,13 @@ rewriteTargetListIU(List *targetList,
 			}
 
 			if (new_expr)
+			{
 				new_tle = makeTargetEntry((Expr *) new_expr,
 										  attrno,
 										  pstrdup(NameStr(att_tup->attname)),
 										  false);
+				new_tle->isdefault = true;
+			}
 		}
 
 		/*
diff --git a/src/include/nodes/primnodes.h b/src/include/nodes/primnodes.h
index cdbe781c73..c5e626f175 100644
--- a/src/include/nodes/primnodes.h
+++ b/src/include/nodes/primnodes.h
@@ -1428,6 +1428,8 @@ typedef struct TargetEntry
 	AttrNumber	resorigcol;		/* column's number in source table */
 	bool		resjunk;		/* set to true to eliminate the attribute from
 								 * final target list */
+	bool		isdefault;		/* true if using the column default, either
+								 * by "DEFAULT" or omission of the column */
 } TargetEntry;
 
 
-- 
2.17.0

>From 7930b240635af0045e95591001eac9cba16cfd70 Mon Sep 17 00:00:00 2001
From: Justin Pryzby <pryz...@telsasoft.com>
Date: Sat, 5 Dec 2020 08:52:14 -0600
Subject: [PATCH v8 3/4] COPY: flush multi-insert buffer based on accumulated
 size of tuples..

..rather than line length
---
 src/backend/commands/copyfrom.c        | 2 +-
 src/include/executor/nodeModifyTable.h | 6 +++---
 2 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index c4fe75df8e..a8bbfcc71f 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -683,7 +683,7 @@ CopyFrom(CopyFromState cstate)
 					/* Add this tuple to the tuple buffer */
 					MultiInsertInfoStore(&cstate->miinfo,
 											 resultRelInfo, myslot,
-											 cstate->line_buf.len,
+											 MemoryContextMemAllocated(myslot->tts_mcxt, true),
 											 cstate->miinfo.cur_lineno);
 
 					/*
diff --git a/src/include/executor/nodeModifyTable.h b/src/include/executor/nodeModifyTable.h
index 71de7cf80e..e6bb27aade 100644
--- a/src/include/executor/nodeModifyTable.h
+++ b/src/include/executor/nodeModifyTable.h
@@ -41,10 +41,10 @@ extern void ExecReScanModifyTable(ModifyTableState *node);
 #define MAX_BUFFERED_TUPLES		1000
 
 /*
- * Flush buffers if there are >= this many bytes, as counted by the input
- * size, of tuples stored.
+ * Flush buffers if there are >= this many bytes of tuples stored, as counted
+ * by the slot's memory contexts.
  */
-#define MAX_BUFFERED_BYTES		65535
+#define MAX_BUFFERED_BYTES		(1024*1024*8)
 
 /* Trim the list of buffers back down to this number after flushing */
 #define MAX_PARTITION_BUFFERS	32
-- 
2.17.0

>From ff2a04f3958953f4956e8bbaa7d4ad434e623468 Mon Sep 17 00:00:00 2001
From: Justin Pryzby <pryz...@telsasoft.com>
Date: Tue, 1 Dec 2020 17:20:25 -0600
Subject: [PATCH v8 4/4] WIP: check tuple size

Or maybe INSERT should flush buffer based only on the *number* of tuples, and
not their size ?
---
 src/backend/executor/nodeModifyTable.c | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)

diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c
index 47a5271b91..b4f523e32d 100644
--- a/src/backend/executor/nodeModifyTable.c
+++ b/src/backend/executor/nodeModifyTable.c
@@ -703,7 +703,13 @@ ExecInsert(ModifyTableState *mtstate,
 			batchslot = MultiInsertInfoNextFreeSlot(mtstate->miinfo, resultRelInfo);
 			ExecCopySlot(batchslot, slot);
 
-			MultiInsertInfoStore(mtstate->miinfo, resultRelInfo, batchslot, 0, 0); // XXX: tuplen/lineno
+			MultiInsertInfoStore(mtstate->miinfo, resultRelInfo, batchslot,
+					// sizeof(void*) * batchslot->tts_nvalid, /* tuple size - underestimate */
+					MemoryContextMemAllocated(batchslot->tts_mcxt, true), /* tuple size */
+					mtstate->ntuples); /* lineno */
+
+			elog(DEBUG2, "bufferedBytes %d; tuples %ld",
+					mtstate->miinfo->bufferedBytes, mtstate->ntuples);
 
 			if (MultiInsertInfoIsFull(mtstate->miinfo))
 				MultiInsertInfoFlush(mtstate->miinfo, resultRelInfo);
-- 
2.17.0

Reply via email to