On Mon, Feb 22, 2021 at 02:25:22AM +0000, houzj.f...@fujitsu.com wrote:
> > > Yes, you can see that I've copied the checks from copy.
> > > Like copy, some checks are done once, in ExecInitModifyTable, outside
> > > of the ExecModifyTable "loop".
> > >
> > > This squishes some commits together.
> > > And uses bistate for ON CONFLICT.
> > > And attempts to use memory context for tuple size.
> > 
> > Rebased on 9dc718bdf2b1a574481a45624d42b674332e2903
> > 
> > I guess my patch should/may be subsumed by this other one - I'm fine with
> > that.
> > https://commitfest.postgresql.org/31/2871/
> > 
> > Note that my interest here is just in bistate, to avoid leaving behind many 
> > dirty
> > buffers, not improved performance of COPY.
> 
> I am very interested in this patch, and I plan to do some experiments with 
> the patch.
> Can you please rebase the patch because it seems can not applied to the 
> master now.

Thanks for your interest.

I was sitting on a rebased version since the bulk FDW patch will cause
conflicts, and since this should maybe be built on top of the table-am patch
(2871).  Have fun :)

-- 
Justin
>From e2b93b3b3aaa32f680193b42d91a80bab40768a4 Mon Sep 17 00:00:00 2001
From: Justin Pryzby <pryz...@telsasoft.com>
Date: Fri, 8 May 2020 02:17:32 -0500
Subject: [PATCH v10 1/4] INSERT SELECT to use BulkInsertState and multi_insert

Renames structures;
Move MultipleInsert functions from copyfrom.c to (tentatively) nodeModifyTable.h;
Move into MultiInsertInfo: transition_capture and cur_lineno (via cstate->miinfo);

Dynamically switch to multi-insert mode based on the number of insertions.
This is intended to accomodate 1) the original use case of INSERT using a small
ring buffer to avoid leaving behind dirty buffers; and, 2) Automatically using
multi-inserts for batch operations; 3) allow the old behavior of leaving behind
dirty buffers, which might allow INSERT to run more quickly, at the cost of
leaving behind many dirty buffers which other backends may have to write out.

XXX: for (1), the bulk-insert state is used even if not multi-insert, including
for a VALUES.

TODO: use cstate->miinfo.cur_lineno++ instead of mtstate->miinfo->ntuples
---
 src/backend/commands/copyfrom.c          | 394 +----------------------
 src/backend/commands/copyfromparse.c     |  10 +-
 src/backend/executor/execMain.c          |   2 +-
 src/backend/executor/execPartition.c     |   2 +-
 src/backend/executor/nodeModifyTable.c   | 196 +++++++++--
 src/backend/utils/misc/guc.c             |  10 +
 src/include/commands/copyfrom_internal.h |   5 +-
 src/include/executor/nodeModifyTable.h   | 371 +++++++++++++++++++++
 src/include/nodes/execnodes.h            |  16 +-
 src/test/regress/expected/insert.out     |  43 +++
 src/test/regress/sql/insert.sql          |  20 ++
 src/tools/pgindent/typedefs.list         |   4 +-
 12 files changed, 658 insertions(+), 415 deletions(-)

diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index 796ca7b3f7..5b8a1e4b61 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -46,54 +46,6 @@
 #include "utils/rel.h"
 #include "utils/snapmgr.h"
 
-/*
- * No more than this many tuples per CopyMultiInsertBuffer
- *
- * Caution: Don't make this too big, as we could end up with this many
- * CopyMultiInsertBuffer items stored in CopyMultiInsertInfo's
- * multiInsertBuffers list.  Increasing this can cause quadratic growth in
- * memory requirements during copies into partitioned tables with a large
- * number of partitions.
- */
-#define MAX_BUFFERED_TUPLES		1000
-
-/*
- * Flush buffers if there are >= this many bytes, as counted by the input
- * size, of tuples stored.
- */
-#define MAX_BUFFERED_BYTES		65535
-
-/* Trim the list of buffers back down to this number after flushing */
-#define MAX_PARTITION_BUFFERS	32
-
-/* Stores multi-insert data related to a single relation in CopyFrom. */
-typedef struct CopyMultiInsertBuffer
-{
-	TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */
-	ResultRelInfo *resultRelInfo;	/* ResultRelInfo for 'relid' */
-	BulkInsertState bistate;	/* BulkInsertState for this rel */
-	int			nused;			/* number of 'slots' containing tuples */
-	uint64		linenos[MAX_BUFFERED_TUPLES];	/* Line # of tuple in copy
-												 * stream */
-} CopyMultiInsertBuffer;
-
-/*
- * Stores one or many CopyMultiInsertBuffers and details about the size and
- * number of tuples which are stored in them.  This allows multiple buffers to
- * exist at once when COPYing into a partitioned table.
- */
-typedef struct CopyMultiInsertInfo
-{
-	List	   *multiInsertBuffers; /* List of tracked CopyMultiInsertBuffers */
-	int			bufferedTuples; /* number of tuples buffered over all buffers */
-	int			bufferedBytes;	/* number of bytes from all buffered tuples */
-	CopyFromState	cstate;			/* Copy state for this CopyMultiInsertInfo */
-	EState	   *estate;			/* Executor state used for COPY */
-	CommandId	mycid;			/* Command Id used for COPY */
-	int			ti_options;		/* table insert options */
-} CopyMultiInsertInfo;
-
-
 /* non-export function prototypes */
 static char *limit_printout_length(const char *str);
 
@@ -111,7 +63,7 @@ CopyFromErrorCallback(void *arg)
 	char		curlineno_str[32];
 
 	snprintf(curlineno_str, sizeof(curlineno_str), UINT64_FORMAT,
-			 cstate->cur_lineno);
+			 cstate->miinfo.cur_lineno);
 
 	if (cstate->opts.binary)
 	{
@@ -206,317 +158,6 @@ limit_printout_length(const char *str)
 	return res;
 }
 
-/*
- * Allocate memory and initialize a new CopyMultiInsertBuffer for this
- * ResultRelInfo.
- */
-static CopyMultiInsertBuffer *
-CopyMultiInsertBufferInit(ResultRelInfo *rri)
-{
-	CopyMultiInsertBuffer *buffer;
-
-	buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer));
-	memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
-	buffer->resultRelInfo = rri;
-	buffer->bistate = GetBulkInsertState();
-	buffer->nused = 0;
-
-	return buffer;
-}
-
-/*
- * Make a new buffer for this ResultRelInfo.
- */
-static inline void
-CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo,
-							   ResultRelInfo *rri)
-{
-	CopyMultiInsertBuffer *buffer;
-
-	buffer = CopyMultiInsertBufferInit(rri);
-
-	/* Setup back-link so we can easily find this buffer again */
-	rri->ri_CopyMultiInsertBuffer = buffer;
-	/* Record that we're tracking this buffer */
-	miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
-}
-
-/*
- * Initialize an already allocated CopyMultiInsertInfo.
- *
- * If rri is a non-partitioned table then a CopyMultiInsertBuffer is set up
- * for that table.
- */
-static void
-CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
-						CopyFromState cstate, EState *estate, CommandId mycid,
-						int ti_options)
-{
-	miinfo->multiInsertBuffers = NIL;
-	miinfo->bufferedTuples = 0;
-	miinfo->bufferedBytes = 0;
-	miinfo->cstate = cstate;
-	miinfo->estate = estate;
-	miinfo->mycid = mycid;
-	miinfo->ti_options = ti_options;
-
-	/*
-	 * Only setup the buffer when not dealing with a partitioned table.
-	 * Buffers for partitioned tables will just be setup when we need to send
-	 * tuples their way for the first time.
-	 */
-	if (rri->ri_RelationDesc->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
-		CopyMultiInsertInfoSetupBuffer(miinfo, rri);
-}
-
-/*
- * Returns true if the buffers are full
- */
-static inline bool
-CopyMultiInsertInfoIsFull(CopyMultiInsertInfo *miinfo)
-{
-	if (miinfo->bufferedTuples >= MAX_BUFFERED_TUPLES ||
-		miinfo->bufferedBytes >= MAX_BUFFERED_BYTES)
-		return true;
-	return false;
-}
-
-/*
- * Returns true if we have no buffered tuples
- */
-static inline bool
-CopyMultiInsertInfoIsEmpty(CopyMultiInsertInfo *miinfo)
-{
-	return miinfo->bufferedTuples == 0;
-}
-
-/*
- * Write the tuples stored in 'buffer' out to the table.
- */
-static inline void
-CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
-						   CopyMultiInsertBuffer *buffer)
-{
-	MemoryContext oldcontext;
-	int			i;
-	uint64		save_cur_lineno;
-	CopyFromState	cstate = miinfo->cstate;
-	EState	   *estate = miinfo->estate;
-	CommandId	mycid = miinfo->mycid;
-	int			ti_options = miinfo->ti_options;
-	bool		line_buf_valid = cstate->line_buf_valid;
-	int			nused = buffer->nused;
-	ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
-	TupleTableSlot **slots = buffer->slots;
-
-	/*
-	 * Print error context information correctly, if one of the operations
-	 * below fail.
-	 */
-	cstate->line_buf_valid = false;
-	save_cur_lineno = cstate->cur_lineno;
-
-	/*
-	 * table_multi_insert may leak memory, so switch to short-lived memory
-	 * context before calling it.
-	 */
-	oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
-	table_multi_insert(resultRelInfo->ri_RelationDesc,
-					   slots,
-					   nused,
-					   mycid,
-					   ti_options,
-					   buffer->bistate);
-	MemoryContextSwitchTo(oldcontext);
-
-	for (i = 0; i < nused; i++)
-	{
-		/*
-		 * If there are any indexes, update them for all the inserted tuples,
-		 * and run AFTER ROW INSERT triggers.
-		 */
-		if (resultRelInfo->ri_NumIndices > 0)
-		{
-			List	   *recheckIndexes;
-
-			cstate->cur_lineno = buffer->linenos[i];
-			recheckIndexes =
-				ExecInsertIndexTuples(resultRelInfo,
-									  buffer->slots[i], estate, false, false,
-									  NULL, NIL);
-			ExecARInsertTriggers(estate, resultRelInfo,
-								 slots[i], recheckIndexes,
-								 cstate->transition_capture);
-			list_free(recheckIndexes);
-		}
-
-		/*
-		 * There's no indexes, but see if we need to run AFTER ROW INSERT
-		 * triggers anyway.
-		 */
-		else if (resultRelInfo->ri_TrigDesc != NULL &&
-				 (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
-				  resultRelInfo->ri_TrigDesc->trig_insert_new_table))
-		{
-			cstate->cur_lineno = buffer->linenos[i];
-			ExecARInsertTriggers(estate, resultRelInfo,
-								 slots[i], NIL, cstate->transition_capture);
-		}
-
-		ExecClearTuple(slots[i]);
-	}
-
-	/* Mark that all slots are free */
-	buffer->nused = 0;
-
-	/* reset cur_lineno and line_buf_valid to what they were */
-	cstate->line_buf_valid = line_buf_valid;
-	cstate->cur_lineno = save_cur_lineno;
-}
-
-/*
- * Drop used slots and free member for this buffer.
- *
- * The buffer must be flushed before cleanup.
- */
-static inline void
-CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo,
-							 CopyMultiInsertBuffer *buffer)
-{
-	int			i;
-
-	/* Ensure buffer was flushed */
-	Assert(buffer->nused == 0);
-
-	/* Remove back-link to ourself */
-	buffer->resultRelInfo->ri_CopyMultiInsertBuffer = NULL;
-
-	FreeBulkInsertState(buffer->bistate);
-
-	/* Since we only create slots on demand, just drop the non-null ones. */
-	for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
-		ExecDropSingleTupleTableSlot(buffer->slots[i]);
-
-	table_finish_bulk_insert(buffer->resultRelInfo->ri_RelationDesc,
-							 miinfo->ti_options);
-
-	pfree(buffer);
-}
-
-/*
- * Write out all stored tuples in all buffers out to the tables.
- *
- * Once flushed we also trim the tracked buffers list down to size by removing
- * the buffers created earliest first.
- *
- * Callers should pass 'curr_rri' is the ResultRelInfo that's currently being
- * used.  When cleaning up old buffers we'll never remove the one for
- * 'curr_rri'.
- */
-static inline void
-CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri)
-{
-	ListCell   *lc;
-
-	foreach(lc, miinfo->multiInsertBuffers)
-	{
-		CopyMultiInsertBuffer *buffer = (CopyMultiInsertBuffer *) lfirst(lc);
-
-		CopyMultiInsertBufferFlush(miinfo, buffer);
-	}
-
-	miinfo->bufferedTuples = 0;
-	miinfo->bufferedBytes = 0;
-
-	/*
-	 * Trim the list of tracked buffers down if it exceeds the limit.  Here we
-	 * remove buffers starting with the ones we created first.  It seems less
-	 * likely that these older ones will be needed than the ones that were
-	 * just created.
-	 */
-	while (list_length(miinfo->multiInsertBuffers) > MAX_PARTITION_BUFFERS)
-	{
-		CopyMultiInsertBuffer *buffer;
-
-		buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
-
-		/*
-		 * We never want to remove the buffer that's currently being used, so
-		 * if we happen to find that then move it to the end of the list.
-		 */
-		if (buffer->resultRelInfo == curr_rri)
-		{
-			miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
-			miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
-			buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
-		}
-
-		CopyMultiInsertBufferCleanup(miinfo, buffer);
-		miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
-	}
-}
-
-/*
- * Cleanup allocated buffers and free memory
- */
-static inline void
-CopyMultiInsertInfoCleanup(CopyMultiInsertInfo *miinfo)
-{
-	ListCell   *lc;
-
-	foreach(lc, miinfo->multiInsertBuffers)
-		CopyMultiInsertBufferCleanup(miinfo, lfirst(lc));
-
-	list_free(miinfo->multiInsertBuffers);
-}
-
-/*
- * Get the next TupleTableSlot that the next tuple should be stored in.
- *
- * Callers must ensure that the buffer is not full.
- *
- * Note: 'miinfo' is unused but has been included for consistency with the
- * other functions in this area.
- */
-static inline TupleTableSlot *
-CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo,
-								ResultRelInfo *rri)
-{
-	CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
-	int			nused = buffer->nused;
-
-	Assert(buffer != NULL);
-	Assert(nused < MAX_BUFFERED_TUPLES);
-
-	if (buffer->slots[nused] == NULL)
-		buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL);
-	return buffer->slots[nused];
-}
-
-/*
- * Record the previously reserved TupleTableSlot that was reserved by
- * CopyMultiInsertInfoNextFreeSlot as being consumed.
- */
-static inline void
-CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
-						 TupleTableSlot *slot, int tuplen, uint64 lineno)
-{
-	CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
-
-	Assert(buffer != NULL);
-	Assert(slot == buffer->slots[buffer->nused]);
-
-	/* Store the line number so we can properly report any errors later */
-	buffer->linenos[buffer->nused] = lineno;
-
-	/* Record this slot as being used */
-	buffer->nused++;
-
-	/* Update how many tuples are stored and their size */
-	miinfo->bufferedTuples++;
-	miinfo->bufferedBytes += tuplen;
-}
-
 /*
  * Copy FROM file to relation.
  */
@@ -538,7 +179,6 @@ CopyFrom(CopyFromState cstate)
 	int			ti_options = 0; /* start with default options for insert */
 	BulkInsertState bistate = NULL;
 	CopyInsertMethod insertMethod;
-	CopyMultiInsertInfo multiInsertInfo = {0};	/* pacify compiler */
 	uint64		processed = 0;
 	bool		has_before_insert_row_trig;
 	bool		has_instead_insert_row_trig;
@@ -726,7 +366,7 @@ CopyFrom(CopyFromState cstate)
 		 * For partitioned tables we can't support multi-inserts when there
 		 * are any statement level insert triggers. It might be possible to
 		 * allow partitioned tables with such triggers in the future, but for
-		 * now, CopyMultiInsertInfoFlush expects that any before row insert
+		 * now, MultiInsertInfoFlush expects that any before row insert
 		 * and statement level insert triggers are on the same relation.
 		 */
 		insertMethod = CIM_SINGLE;
@@ -774,7 +414,8 @@ CopyFrom(CopyFromState cstate)
 		else
 			insertMethod = CIM_MULTI;
 
-		CopyMultiInsertInfoInit(&multiInsertInfo, resultRelInfo, cstate,
+		MultiInsertInfoInit(&cstate->miinfo, resultRelInfo,
+								cstate->transition_capture,
 								estate, mycid, ti_options);
 	}
 
@@ -837,7 +478,7 @@ CopyFrom(CopyFromState cstate)
 			Assert(resultRelInfo == target_resultRelInfo);
 			Assert(insertMethod == CIM_MULTI);
 
-			myslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
+			myslot = MultiInsertInfoNextFreeSlot(&cstate->miinfo,
 													 resultRelInfo);
 		}
 
@@ -906,18 +547,18 @@ CopyFrom(CopyFromState cstate)
 				/* Set the multi-insert buffer to use for this partition. */
 				if (leafpart_use_multi_insert)
 				{
-					if (resultRelInfo->ri_CopyMultiInsertBuffer == NULL)
-						CopyMultiInsertInfoSetupBuffer(&multiInsertInfo,
+					if (resultRelInfo->ri_MultiInsertBuffer == NULL)
+						MultiInsertInfoSetupBuffer(&cstate->miinfo,
 													   resultRelInfo);
 				}
 				else if (insertMethod == CIM_MULTI_CONDITIONAL &&
-						 !CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
+						 !MultiInsertInfoIsEmpty(&cstate->miinfo))
 				{
 					/*
 					 * Flush pending inserts if this partition can't use
 					 * batching, so rows are visible to triggers etc.
 					 */
-					CopyMultiInsertInfoFlush(&multiInsertInfo, resultRelInfo);
+					MultiInsertInfoFlush(&cstate->miinfo, resultRelInfo);
 				}
 
 				if (bistate != NULL)
@@ -963,7 +604,7 @@ CopyFrom(CopyFromState cstate)
 				/* no other path available for partitioned table */
 				Assert(insertMethod == CIM_MULTI_CONDITIONAL);
 
-				batchslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
+				batchslot = MultiInsertInfoNextFreeSlot(&cstate->miinfo,
 															resultRelInfo);
 
 				if (map != NULL)
@@ -1043,17 +684,17 @@ CopyFrom(CopyFromState cstate)
 					ExecMaterializeSlot(myslot);
 
 					/* Add this tuple to the tuple buffer */
-					CopyMultiInsertInfoStore(&multiInsertInfo,
+					MultiInsertInfoStore(&cstate->miinfo,
 											 resultRelInfo, myslot,
 											 cstate->line_buf.len,
-											 cstate->cur_lineno);
+											 cstate->miinfo.cur_lineno);
 
 					/*
 					 * If enough inserts have queued up, then flush all
 					 * buffers out to their tables.
 					 */
-					if (CopyMultiInsertInfoIsFull(&multiInsertInfo))
-						CopyMultiInsertInfoFlush(&multiInsertInfo, resultRelInfo);
+					if (MultiInsertInfoIsFull(&cstate->miinfo))
+						MultiInsertInfoFlush(&cstate->miinfo, resultRelInfo);
 				}
 				else
 				{
@@ -1114,8 +755,8 @@ CopyFrom(CopyFromState cstate)
 	/* Flush any remaining buffered tuples */
 	if (insertMethod != CIM_SINGLE)
 	{
-		if (!CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
-			CopyMultiInsertInfoFlush(&multiInsertInfo, NULL);
+		if (!MultiInsertInfoIsEmpty(&cstate->miinfo))
+			MultiInsertInfoFlush(&cstate->miinfo, NULL);
 	}
 
 	/* Done, clean up */
@@ -1149,7 +790,7 @@ CopyFrom(CopyFromState cstate)
 
 	/* Tear down the multi-insert buffer data */
 	if (insertMethod != CIM_SINGLE)
-		CopyMultiInsertInfoCleanup(&multiInsertInfo);
+		MultiInsertInfoCleanup(&cstate->miinfo);
 
 	/* Close all the partitioned tables, leaf partitions, and their indices */
 	if (proute)
@@ -1328,7 +969,6 @@ BeginCopyFrom(ParseState *pstate,
 	cstate->reached_eof = false;
 	cstate->eol_type = EOL_UNKNOWN;
 	cstate->cur_relname = RelationGetRelationName(cstate->rel);
-	cstate->cur_lineno = 0;
 	cstate->cur_attname = NULL;
 	cstate->cur_attval = NULL;
 
diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c
index 315b16fd7a..3fbf8cb431 100644
--- a/src/backend/commands/copyfromparse.c
+++ b/src/backend/commands/copyfromparse.c
@@ -460,14 +460,14 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields)
 	Assert(!cstate->opts.binary);
 
 	/* on input just throw the header line away */
-	if (cstate->cur_lineno == 0 && cstate->opts.header_line)
+	if (cstate->miinfo.cur_lineno == 0 && cstate->opts.header_line)
 	{
-		cstate->cur_lineno++;
+		cstate->miinfo.cur_lineno++;
 		if (CopyReadLine(cstate))
 			return false;		/* done */
 	}
 
-	cstate->cur_lineno++;
+	cstate->miinfo.cur_lineno++;
 
 	/* Actually read the line into memory here */
 	done = CopyReadLine(cstate);
@@ -608,7 +608,7 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext,
 		int16		fld_count;
 		ListCell   *cur;
 
-		cstate->cur_lineno++;
+		cstate->miinfo.cur_lineno++;
 
 		if (!CopyGetInt16(cstate, &fld_count))
 		{
@@ -916,7 +916,7 @@ CopyReadLineText(CopyFromState cstate)
 			 * at all --- is cur_lineno a physical or logical count?)
 			 */
 			if (in_quote && c == (cstate->eol_type == EOL_NL ? '\n' : '\r'))
-				cstate->cur_lineno++;
+				cstate->miinfo.cur_lineno++;
 		}
 
 		/* Process \r */
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index c74ce36ffb..245e173021 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -1233,7 +1233,7 @@ InitResultRelInfo(ResultRelInfo *resultRelInfo,
 													 * ExecInitRoutingInfo */
 	resultRelInfo->ri_PartitionTupleSlot = NULL;	/* ditto */
 	resultRelInfo->ri_ChildToRootMap = NULL;
-	resultRelInfo->ri_CopyMultiInsertBuffer = NULL;
+	resultRelInfo->ri_MultiInsertBuffer = NULL;
 }
 
 /*
diff --git a/src/backend/executor/execPartition.c b/src/backend/executor/execPartition.c
index b8da4c5967..b0f4b68b6e 100644
--- a/src/backend/executor/execPartition.c
+++ b/src/backend/executor/execPartition.c
@@ -1011,7 +1011,7 @@ ExecInitRoutingInfo(ModifyTableState *mtstate,
 
 	Assert(partRelInfo->ri_BatchSize >= 1);
 
-	partRelInfo->ri_CopyMultiInsertBuffer = NULL;
+	partRelInfo->ri_MultiInsertBuffer = NULL;
 
 	/*
 	 * Keep track of it in the PartitionTupleRouting->partitions array.
diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c
index 2993ba43e3..0907b3ebd5 100644
--- a/src/backend/executor/nodeModifyTable.c
+++ b/src/backend/executor/nodeModifyTable.c
@@ -43,6 +43,7 @@
 #include "access/xact.h"
 #include "catalog/catalog.h"
 #include "commands/trigger.h"
+#include "commands/copy.h"
 #include "executor/execPartition.h"
 #include "executor/executor.h"
 #include "executor/nodeModifyTable.h"
@@ -79,6 +80,8 @@ static TupleTableSlot *ExecPrepareTupleRouting(ModifyTableState *mtstate,
 											   ResultRelInfo *targetRelInfo,
 											   TupleTableSlot *slot,
 											   ResultRelInfo **partRelInfo);
+/* guc */
+int bulk_insert_ntuples = 1000;
 
 /*
  * Verify that the tuples to be produced by INSERT or UPDATE match the
@@ -397,6 +400,8 @@ ExecInsert(ModifyTableState *mtstate,
 	OnConflictAction onconflict = node->onConflictAction;
 	PartitionTupleRouting *proute = mtstate->mt_partition_tuple_routing;
 	MemoryContext oldContext;
+	TupleTableSlot *batchslot = NULL;
+	bool	use_multi_insert = false;
 
 	/*
 	 * If the input result relation is a partitioned table, find the leaf
@@ -416,6 +421,66 @@ ExecInsert(ModifyTableState *mtstate,
 
 	resultRelationDesc = resultRelInfo->ri_RelationDesc;
 
+	/* Use bulk insert after a threshold number of tuples */
+	// XXX: maybe this should only be done if it's not a partitioned table or
+	// if the partitions don't support miinfo, which uses its own bistates
+	mtstate->ntuples++;
+	if (mtstate->bistate == NULL &&
+			mtstate->operation == CMD_INSERT &&
+			mtstate->ntuples > bulk_insert_ntuples &&
+			bulk_insert_ntuples >= 0)
+	{
+		elog(DEBUG1, "enabling bulk insert");
+		mtstate->bistate = GetBulkInsertState();
+	}
+
+	if (!mtstate->miinfo)
+	{
+		/*
+		 * If multi-inserts aren't possible for this statement at all, so don't
+		 * check further
+		 */
+	} else if (proute == NULL)
+	{
+		if (mtstate->miinfo->ntuples++ >= bulk_insert_ntuples &&
+			bulk_insert_ntuples >= 0)
+			use_multi_insert = true;
+	}
+	else
+	{
+		/*
+		 * If a partitioned table itself allows multi-insert, and bistate
+		 * indicates we've inserted the threshold number of tuples, check if
+		 * the partition also supports it.
+		 */
+
+		/* Determine which triggers exist on this partition */
+		// XXX copyfrom.c only checks triggers when the partition changes,
+		// so maybe use_multi_insert should be in mtstate ?
+		bool has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
+				resultRelInfo->ri_TrigDesc->trig_insert_before_row);
+
+		bool has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
+				resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
+
+		/*
+		 * Disable multi-inserts when the partition has BEFORE/INSTEAD
+		 * OF triggers, or if the partition is a foreign partition.
+		 * The number of tuples eligible for multi-insert is tracked separately
+		 * from the total number of tuples in case it's not supported for some
+		 * partitions.
+		 */
+		if (!has_before_insert_row_trig &&
+			!has_instead_insert_row_trig &&
+			resultRelInfo->ri_FdwRoutine == NULL &&
+			mtstate->miinfo->ntuples++ >= bulk_insert_ntuples &&
+			bulk_insert_ntuples >= 0)
+			use_multi_insert = true;
+	}
+
+	if (use_multi_insert && mtstate->miinfo->ntuples - 1 == bulk_insert_ntuples)
+		elog(DEBUG1, "enabling multi insert");
+
 	/*
 	 * BEFORE ROW INSERT Triggers.
 	 *
@@ -651,7 +716,7 @@ ExecInsert(ModifyTableState *mtstate,
 			table_tuple_insert_speculative(resultRelationDesc, slot,
 										   estate->es_output_cid,
 										   0,
-										   NULL,
+										   mtstate->bistate,
 										   specToken);
 
 			/* insert index entries for tuple */
@@ -686,12 +751,39 @@ ExecInsert(ModifyTableState *mtstate,
 
 			/* Since there was no insertion conflict, we're done */
 		}
+		else if (use_multi_insert)
+		{
+			if (resultRelInfo->ri_MultiInsertBuffer == NULL)
+				MultiInsertInfoSetupBuffer(mtstate->miinfo, resultRelInfo);
+
+			batchslot = MultiInsertInfoNextFreeSlot(mtstate->miinfo, resultRelInfo);
+			ExecCopySlot(batchslot, slot);
+
+			MultiInsertInfoStore(mtstate->miinfo, resultRelInfo, batchslot, 0, 0); // XXX: tuplen/lineno
+
+			if (MultiInsertInfoIsFull(mtstate->miinfo))
+				MultiInsertInfoFlush(mtstate->miinfo, resultRelInfo);
+		}
 		else
 		{
+			if (proute && mtstate->prevResultRelInfo != resultRelInfo)
+			{
+				if (mtstate->bistate)
+					ReleaseBulkInsertStatePin(mtstate->bistate);
+				mtstate->prevResultRelInfo = resultRelInfo;
+			}
+
+			/*
+			 * Flush pending inserts if this partition can't use
+			 * batching, so rows are visible to triggers etc.
+			 */
+			if (mtstate->miinfo)
+				MultiInsertInfoFlush(mtstate->miinfo, resultRelInfo);
+
 			/* insert the tuple normally */
 			table_tuple_insert(resultRelationDesc, slot,
 							   estate->es_output_cid,
-							   0, NULL);
+							   0, mtstate->bistate);
 
 			/* insert index entries for tuple */
 			if (resultRelInfo->ri_NumIndices > 0)
@@ -704,32 +796,36 @@ ExecInsert(ModifyTableState *mtstate,
 	if (canSetTag)
 		(estate->es_processed)++;
 
-	/*
-	 * If this insert is the result of a partition key update that moved the
-	 * tuple to a new partition, put this row into the transition NEW TABLE,
-	 * if there is one. We need to do this separately for DELETE and INSERT
-	 * because they happen on different tables.
-	 */
-	ar_insert_trig_tcs = mtstate->mt_transition_capture;
-	if (mtstate->operation == CMD_UPDATE && mtstate->mt_transition_capture
-		&& mtstate->mt_transition_capture->tcs_update_new_table)
+	/* Triggers were already run in the batch insert case */
+	if (batchslot == NULL)
 	{
-		ExecARUpdateTriggers(estate, resultRelInfo, NULL,
-							 NULL,
-							 slot,
-							 NULL,
-							 mtstate->mt_transition_capture);
-
 		/*
-		 * We've already captured the NEW TABLE row, so make sure any AR
-		 * INSERT trigger fired below doesn't capture it again.
+		 * If this insert is the result of a partition key update that moved the
+		 * tuple to a new partition, put this row into the transition NEW TABLE,
+		 * if there is one. We need to do this separately for DELETE and INSERT
+		 * because they happen on different tables.
 		 */
-		ar_insert_trig_tcs = NULL;
-	}
+		ar_insert_trig_tcs = mtstate->mt_transition_capture;
+		if (mtstate->operation == CMD_UPDATE && mtstate->mt_transition_capture
+			&& mtstate->mt_transition_capture->tcs_update_new_table)
+		{
+			ExecARUpdateTriggers(estate, resultRelInfo, NULL,
+								 NULL,
+								 slot,
+								 NULL,
+								 mtstate->mt_transition_capture);
+
+			/*
+			 * We've already captured the NEW TABLE row, so make sure any AR
+			 * INSERT trigger fired below doesn't capture it again.
+			 */
+			ar_insert_trig_tcs = NULL;
+		}
 
-	/* AFTER ROW INSERT Triggers */
-	ExecARInsertTriggers(estate, resultRelInfo, slot, recheckIndexes,
-						 ar_insert_trig_tcs);
+		/* AFTER ROW INSERT Triggers */
+		ExecARInsertTriggers(estate, resultRelInfo, slot, recheckIndexes,
+							 ar_insert_trig_tcs);
+	}
 
 	list_free(recheckIndexes);
 
@@ -2372,6 +2468,45 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
 
 	mtstate->mt_arowmarks = (List **) palloc0(sizeof(List *) * nplans);
 	mtstate->mt_nplans = nplans;
+	mtstate->bistate = NULL;
+
+	/*
+	 * Set miinfo if it can support multi-insert. This is the equivalent of
+	 * CIM_MULTI_* et al in copyfrom.c
+	 */
+
+	if (operation != CMD_INSERT ||
+			node->onConflictAction != ONCONFLICT_NONE)
+		mtstate->miinfo = NULL;
+	else if (mtstate->rootResultRelInfo->ri_TrigDesc != NULL &&
+			(mtstate->rootResultRelInfo->ri_TrigDesc->trig_insert_before_row ||
+			 // mtstate->rootResultRelInfo->ri_TrigDesc->trig_insert_after_row || // XXX or any row level triggers at all?
+			 mtstate->rootResultRelInfo->ri_TrigDesc->trig_insert_instead_row))
+		/*
+		 * Can't support multi-inserts when there are any BEFORE/INSTEAD OF
+		 * triggers on the table.
+		 */
+		mtstate->miinfo = NULL;
+	else if (node->rootRelation > 0 &&
+			mtstate->rootResultRelInfo->ri_TrigDesc != NULL &&
+			 mtstate->rootResultRelInfo->ri_TrigDesc->trig_insert_new_table)
+		/*
+		 * For partitioned tables we can't support multi-inserts when there
+		 * are any statement level insert triggers.
+		 */
+		mtstate->miinfo = NULL;
+	else if (mtstate->rootResultRelInfo->ri_FdwRoutine != NULL
+			/* || cstate->volatile_defexprs */ )
+		// XXX contain_volatile_functions_not_nextval((Node *) defexpr);
+		/* Can't support multi-inserts to foreign tables or if there are any */
+		mtstate->miinfo = NULL;
+	else
+	{
+		mtstate->miinfo = calloc(sizeof(*mtstate->miinfo), 1);
+		MultiInsertInfoInit(mtstate->miinfo, mtstate->rootResultRelInfo,
+				mtstate->mt_transition_capture,
+				estate, GetCurrentCommandId(true), 0);
+	}
 
 	/* set up epqstate with dummy subplan data for the moment */
 	EvalPlanQualInit(&mtstate->mt_epqstate, estate, NULL, NIL, node->epqParam);
@@ -2864,6 +2999,19 @@ ExecEndModifyTable(ModifyTableState *node)
 														   resultRelInfo);
 	}
 
+	if (node->bistate)
+	{
+		FreeBulkInsertState(node->bistate);
+		table_finish_bulk_insert(node->rootResultRelInfo->ri_RelationDesc, 0);
+	}
+
+	if (node->miinfo)
+	{
+		if (!MultiInsertInfoIsEmpty(node->miinfo))
+			 MultiInsertInfoFlush(node->miinfo, node->resultRelInfo); // root ?
+		MultiInsertInfoCleanup(node->miinfo);
+	}
+
 	/*
 	 * Close all the partitioned tables, leaf partitions, and their indices
 	 * and release the slot used for tuple routing, if set.
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 00018abb7d..e08577851f 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -54,6 +54,7 @@
 #include "libpq/pqformat.h"
 #include "miscadmin.h"
 #include "optimizer/cost.h"
+#include "executor/nodeModifyTable.h"
 #include "optimizer/geqo.h"
 #include "optimizer/optimizer.h"
 #include "optimizer/paths.h"
@@ -3445,6 +3446,15 @@ static struct config_int ConfigureNamesInt[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"bulk_insert_ntuples", PGC_USERSET, CLIENT_CONN_STATEMENT,
+			gettext_noop("Enable bulk insertions after this number of tuples."),
+			gettext_noop("A ring buffer of limited size will be used and updates done in batch"),
+		},
+		&bulk_insert_ntuples,
+		1000, -1, INT_MAX,
+	},
+
 	/* End-of-list marker */
 	{
 		{NULL, 0, 0, NULL, NULL}, NULL, 0, 0, 0, NULL, NULL, NULL
diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h
index e37942df39..372414b1c0 100644
--- a/src/include/commands/copyfrom_internal.h
+++ b/src/include/commands/copyfrom_internal.h
@@ -15,6 +15,7 @@
 #define COPYFROM_INTERNAL_H
 
 #include "commands/copy.h"
+#include "executor/nodeModifyTable.h"
 #include "commands/trigger.h"
 
 /*
@@ -92,10 +93,12 @@ typedef struct CopyFromStateData
 
 	/* these are just for error messages, see CopyFromErrorCallback */
 	const char *cur_relname;	/* table name for error messages */
-	uint64		cur_lineno;		/* line number for error messages */
 	const char *cur_attname;	/* current att for error messages */
 	const char *cur_attval;		/* current att value for error messages */
 
+	/* For bulk inserts and for error callback */
+	MultiInsertInfo miinfo;
+
 	/*
 	 * Working state
 	 */
diff --git a/src/include/executor/nodeModifyTable.h b/src/include/executor/nodeModifyTable.h
index 83e2965531..30542a542a 100644
--- a/src/include/executor/nodeModifyTable.h
+++ b/src/include/executor/nodeModifyTable.h
@@ -13,8 +13,12 @@
 #ifndef NODEMODIFYTABLE_H
 #define NODEMODIFYTABLE_H
 
+#include "commands/trigger.h"
+#include "executor/executor.h" // XXX
 #include "nodes/execnodes.h"
 
+extern PGDLLIMPORT int bulk_insert_ntuples;
+
 extern void ExecComputeStoredGenerated(ResultRelInfo *resultRelInfo,
 									   EState *estate, TupleTableSlot *slot,
 									   CmdType cmdtype);
@@ -23,4 +27,371 @@ extern ModifyTableState *ExecInitModifyTable(ModifyTable *node, EState *estate,
 extern void ExecEndModifyTable(ModifyTableState *node);
 extern void ExecReScanModifyTable(ModifyTableState *node);
 
+/* Bulk insert stuff which used to live in copy.c */
+
+/*
+ * No more than this many tuples per MultiInsertBuffer
+ *
+ * Caution: Don't make this too big, as we could end up with this many
+ * MultiInsertBuffer items stored in MultiInsertInfo's
+ * multiInsertBuffers list.  Increasing this can cause quadratic growth in
+ * memory requirements during copies into partitioned tables with a large
+ * number of partitions.
+ */
+#define MAX_BUFFERED_TUPLES		1000
+
+/*
+ * Flush buffers if there are >= this many bytes, as counted by the input
+ * size, of tuples stored.
+ */
+#define MAX_BUFFERED_BYTES		65535
+
+/* Trim the list of buffers back down to this number after flushing */
+#define MAX_PARTITION_BUFFERS	32
+
+/* Stores multi-insert data related to a single relation in CopyFrom. */
+typedef struct MultiInsertBuffer
+{
+	TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */
+	ResultRelInfo *resultRelInfo;	/* ResultRelInfo for 'relid' */
+	BulkInsertState bistate;	/* BulkInsertState for this rel */
+	int			nused;			/* number of 'slots' containing tuples */
+	uint64		linenos[MAX_BUFFERED_TUPLES];	/* Line # of tuple in copy
+												 * stream */
+} MultiInsertBuffer;
+
+/*
+ * Stores one or many MultiInsertBuffers and details about the size and
+ * number of tuples which are stored in them.  This allows multiple buffers to
+ * exist at once when COPY/INSERTing into a partitioned table.
+ */
+typedef struct MultiInsertInfo
+{
+	List	   *multiInsertBuffers; /* List of tracked MultiInsertBuffers */
+	int			bufferedTuples; /* number of tuples buffered over all buffers */
+	int			bufferedBytes;	/* number of bytes from all buffered tuples */
+	TransitionCaptureState	*transition_capture;
+	EState	   *estate;			/* Executor state */
+	CommandId	mycid;			/* Command Id */
+	int			ti_options;		/* table insert options */
+	size_t		ntuples;		/* Number of rows *eligible* for multi-insert */
+
+	/* Line number for errors in copyfrom.c */
+	uint64		cur_lineno;
+	bool		line_buf_valid;
+} MultiInsertInfo;
+
+
+/*
+ * Allocate memory and initialize a new MultiInsertBuffer for this
+ * ResultRelInfo.
+ */
+static MultiInsertBuffer *
+MultiInsertBufferInit(ResultRelInfo *rri)
+{
+	MultiInsertBuffer *buffer;
+
+	buffer = (MultiInsertBuffer *) palloc(sizeof(MultiInsertBuffer));
+	memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
+	buffer->resultRelInfo = rri;
+	buffer->bistate = GetBulkInsertState();
+	buffer->nused = 0;
+
+	return buffer;
+}
+
+/*
+ * Make a new buffer for this ResultRelInfo.
+ */
+static inline void
+MultiInsertInfoSetupBuffer(MultiInsertInfo *miinfo,
+							   ResultRelInfo *rri)
+{
+	MultiInsertBuffer *buffer;
+
+	buffer = MultiInsertBufferInit(rri);
+
+	/* Setup back-link so we can easily find this buffer again */
+	rri->ri_MultiInsertBuffer = buffer;
+	/* Record that we're tracking this buffer */
+	miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
+}
+
+/*
+ * Initialize an already allocated MultiInsertInfo.
+ *
+ * If rri is a non-partitioned table then a MultiInsertBuffer is set up
+ * for that table.
+ */
+static inline void
+MultiInsertInfoInit(MultiInsertInfo *miinfo, ResultRelInfo *rri,
+						TransitionCaptureState *transition_capture,
+						EState *estate, CommandId mycid, int ti_options)
+{
+	miinfo->multiInsertBuffers = NIL;
+	miinfo->bufferedTuples = 0;
+	miinfo->bufferedBytes = 0;
+	miinfo->transition_capture = transition_capture;
+	miinfo->estate = estate;
+	miinfo->mycid = mycid;
+	miinfo->ti_options = ti_options;
+	miinfo->cur_lineno = 0;
+
+	/*
+	 * Only setup the buffer when not dealing with a partitioned table.
+	 * Buffers for partitioned tables will just be setup when we need to send
+	 * tuples their way for the first time.
+	 */
+	if (rri->ri_RelationDesc->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
+		MultiInsertInfoSetupBuffer(miinfo, rri);
+}
+
+/*
+ * Returns true if the buffers are full
+ */
+static inline bool
+MultiInsertInfoIsFull(MultiInsertInfo *miinfo)
+{
+	if (miinfo->bufferedTuples >= MAX_BUFFERED_TUPLES ||
+		miinfo->bufferedBytes >= MAX_BUFFERED_BYTES)
+		return true;
+	return false;
+}
+
+/*
+ * Returns true if we have no buffered tuples
+ */
+static inline bool
+MultiInsertInfoIsEmpty(MultiInsertInfo *miinfo)
+{
+	return miinfo->bufferedTuples == 0;
+}
+
+/*
+ * Write the tuples stored in 'buffer' out to the table.
+ */
+static inline void
+MultiInsertBufferFlush(MultiInsertInfo *miinfo,
+						   MultiInsertBuffer *buffer)
+{
+	MemoryContext oldcontext;
+	int			i;
+	uint64		save_cur_lineno;
+	EState	   *estate = miinfo->estate;
+	CommandId	mycid = miinfo->mycid;
+	int			ti_options = miinfo->ti_options;
+	bool		line_buf_valid = miinfo->line_buf_valid;
+	int			nused = buffer->nused;
+	ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
+	TupleTableSlot **slots = buffer->slots;
+
+	/*
+	 * Print error context information correctly, if one of the operations
+	 * below fail.
+	 */
+	miinfo->line_buf_valid = false;
+	save_cur_lineno = miinfo->cur_lineno;
+
+	/*
+	 * table_multi_insert may leak memory, so switch to short-lived memory
+	 * context before calling it.
+	 */
+	oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); // XXX requires executor.h
+	table_multi_insert(resultRelInfo->ri_RelationDesc,
+					   slots,
+					   nused,
+					   mycid,
+					   ti_options,
+					   buffer->bistate);
+	MemoryContextSwitchTo(oldcontext);
+
+	for (i = 0; i < nused; i++)
+	{
+		/*
+		 * If there are any indexes, update them for all the inserted tuples,
+		 * and run AFTER ROW INSERT triggers.
+		 */
+		if (resultRelInfo->ri_NumIndices > 0)
+		{
+			List	   *recheckIndexes;
+
+			miinfo->cur_lineno = buffer->linenos[i];
+			recheckIndexes =
+				ExecInsertIndexTuples(resultRelInfo,
+									  buffer->slots[i], estate, false, false,
+									  NULL, NIL);
+
+			ExecARInsertTriggers(estate, resultRelInfo,
+								 slots[i], recheckIndexes,
+								 miinfo->transition_capture);
+			list_free(recheckIndexes);
+		}
+
+		/*
+		 * There's no indexes, but see if we need to run AFTER ROW INSERT
+		 * triggers anyway.
+		 */
+		else if (resultRelInfo->ri_TrigDesc != NULL &&
+				 (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
+				  resultRelInfo->ri_TrigDesc->trig_insert_new_table))
+		{
+			miinfo->cur_lineno = buffer->linenos[i];
+			ExecARInsertTriggers(estate, resultRelInfo,
+								 slots[i], NIL, miinfo->transition_capture);
+		}
+
+		ExecClearTuple(slots[i]);
+	}
+
+	/* Mark that all slots are free */
+	buffer->nused = 0;
+
+	/* reset cur_lineno and line_buf_valid to what they were */
+	miinfo->line_buf_valid = line_buf_valid;
+	miinfo->cur_lineno = save_cur_lineno;
+}
+
+/*
+ * Drop used slots and free member for this buffer.
+ *
+ * The buffer must be flushed before cleanup.
+ */
+static inline void
+MultiInsertBufferCleanup(MultiInsertInfo *miinfo,
+							 MultiInsertBuffer *buffer)
+{
+	int			i;
+
+	/* Ensure buffer was flushed */
+	Assert(buffer->nused == 0);
+
+	/* Remove back-link to ourself */
+	buffer->resultRelInfo->ri_MultiInsertBuffer = NULL;
+
+	FreeBulkInsertState(buffer->bistate);
+
+	/* Since we only create slots on demand, just drop the non-null ones. */
+	for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
+		ExecDropSingleTupleTableSlot(buffer->slots[i]);
+
+	table_finish_bulk_insert(buffer->resultRelInfo->ri_RelationDesc,
+							 miinfo->ti_options);
+
+	pfree(buffer);
+}
+
+/*
+ * Write out all stored tuples in all buffers out to the tables.
+ *
+ * Once flushed we also trim the tracked buffers list down to size by removing
+ * the buffers created earliest first.
+ *
+ * Callers should pass 'curr_rri' is the ResultRelInfo that's currently being
+ * used.  When cleaning up old buffers we'll never remove the one for
+ * 'curr_rri'.
+ */
+static inline void
+MultiInsertInfoFlush(MultiInsertInfo *miinfo, ResultRelInfo *curr_rri)
+{
+	ListCell   *lc;
+
+	foreach(lc, miinfo->multiInsertBuffers)
+	{
+		MultiInsertBuffer *buffer = (MultiInsertBuffer *) lfirst(lc);
+
+		MultiInsertBufferFlush(miinfo, buffer);
+	}
+
+	miinfo->bufferedTuples = 0;
+	miinfo->bufferedBytes = 0;
+
+	/*
+	 * Trim the list of tracked buffers down if it exceeds the limit.  Here we
+	 * remove buffers starting with the ones we created first.  It seems less
+	 * likely that these older ones will be needed than the ones that were
+	 * just created.
+	 */
+	while (list_length(miinfo->multiInsertBuffers) > MAX_PARTITION_BUFFERS)
+	{
+		MultiInsertBuffer *buffer;
+
+		buffer = (MultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
+
+		/*
+		 * We never want to remove the buffer that's currently being used, so
+		 * if we happen to find that then move it to the end of the list.
+		 */
+		if (buffer->resultRelInfo == curr_rri)
+		{
+			miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
+			miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
+			buffer = (MultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
+		}
+
+		MultiInsertBufferCleanup(miinfo, buffer);
+		miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
+	}
+}
+
+/*
+ * Cleanup allocated buffers and free memory
+ */
+static inline void
+MultiInsertInfoCleanup(MultiInsertInfo *miinfo)
+{
+	ListCell   *lc;
+
+	foreach(lc, miinfo->multiInsertBuffers)
+		MultiInsertBufferCleanup(miinfo, lfirst(lc));
+
+	list_free(miinfo->multiInsertBuffers);
+}
+
+/*
+ * Get the next TupleTableSlot that the next tuple should be stored in.
+ *
+ * Callers must ensure that the buffer is not full.
+ *
+ * Note: 'miinfo' is unused but has been included for consistency with the
+ * other functions in this area.
+ */
+static inline TupleTableSlot *
+MultiInsertInfoNextFreeSlot(MultiInsertInfo *miinfo,
+								ResultRelInfo *rri)
+{
+	MultiInsertBuffer *buffer = rri->ri_MultiInsertBuffer;
+	int			nused = buffer->nused;
+
+	Assert(buffer != NULL);
+	Assert(nused < MAX_BUFFERED_TUPLES);
+
+	if (buffer->slots[nused] == NULL)
+		buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL);
+	return buffer->slots[nused];
+}
+
+/*
+ * Record the previously reserved TupleTableSlot that was reserved by
+ * MultiInsertInfoNextFreeSlot as being consumed.
+ */
+static inline void
+MultiInsertInfoStore(MultiInsertInfo *miinfo, ResultRelInfo *rri,
+						 TupleTableSlot *slot, int tuplen, uint64 lineno)
+{
+	MultiInsertBuffer *buffer = rri->ri_MultiInsertBuffer;
+
+	Assert(buffer != NULL);
+	Assert(slot == buffer->slots[buffer->nused]);
+
+	/* Store the line number so we can properly report any errors later */
+	buffer->linenos[buffer->nused] = lineno;
+
+	/* Record this slot as being used */
+	buffer->nused++;
+
+	/* Update how many tuples are stored and their size */
+	miinfo->bufferedTuples++;
+	miinfo->bufferedBytes += tuplen;
+}
+
 #endif							/* NODEMODIFYTABLE_H */
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 943931f65d..19668bbf66 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -14,6 +14,7 @@
 #ifndef EXECNODES_H
 #define EXECNODES_H
 
+#include "access/heapam.h"
 #include "access/tupconvert.h"
 #include "executor/instrument.h"
 #include "fmgr.h"
@@ -32,6 +33,9 @@
 #include "utils/tuplesort.h"
 #include "utils/tuplestore.h"
 
+/* This would be a circular inclusion */
+// #include "executor/nodeModifyTable.h"
+
 struct PlanState;				/* forward references in this file */
 struct ParallelHashJoinState;
 struct ExecRowMark;
@@ -39,8 +43,8 @@ struct ExprState;
 struct ExprContext;
 struct RangeTblEntry;			/* avoid including parsenodes.h here */
 struct ExprEvalStep;			/* avoid including execExpr.h everywhere */
-struct CopyMultiInsertBuffer;
-
+// struct MultiInsertBuffer;
+// struct MultiInsertInfo;
 
 /* ----------------
  *		ExprState node
@@ -511,8 +515,8 @@ typedef struct ResultRelInfo
 	 */
 	TupleConversionMap *ri_ChildToRootMap;
 
-	/* for use by copyfrom.c when performing multi-inserts */
-	struct CopyMultiInsertBuffer *ri_CopyMultiInsertBuffer;
+	/* for use by copyfrom.c/modifyTable when performing multi-inserts */
+	struct MultiInsertBuffer *ri_MultiInsertBuffer;
 } ResultRelInfo;
 
 /* ----------------
@@ -1178,6 +1182,10 @@ typedef struct ModifyTableState
 	List	  **mt_arowmarks;	/* per-subplan ExecAuxRowMark lists */
 	EPQState	mt_epqstate;	/* for evaluating EvalPlanQual rechecks */
 	bool		fireBSTriggers; /* do we need to fire stmt triggers? */
+	BulkInsertState	bistate;	/* state for bulk insert like INSERT SELECT, when miinfo cannot be used */
+	ResultRelInfo	*prevResultRelInfo; /* last child inserted with bistate */
+	struct MultiInsertInfo	*miinfo;
+	size_t		ntuples;	/* Number of tuples inserted; */
 
 	/*
 	 * Slot for storing tuples in the root partitioned table's rowtype during
diff --git a/src/test/regress/expected/insert.out b/src/test/regress/expected/insert.out
index da50ee3b67..bc4c1a4fc2 100644
--- a/src/test/regress/expected/insert.out
+++ b/src/test/regress/expected/insert.out
@@ -462,6 +462,49 @@ Partitions: part_aa_bb FOR VALUES IN ('aa', 'bb'),
             part_xx_yy FOR VALUES IN ('xx', 'yy'), PARTITIONED,
             part_default DEFAULT, PARTITIONED
 
+-- bulk inserts
+truncate hash_parted;
+begin;
+create index on hash_parted(a);
+-- make sure small inserts are flushed
+insert into hash_parted values(11);
+insert into hpart0 values(12);
+select * from hash_parted;
+ a  
+----
+ 12
+ 11
+(2 rows)
+
+-- exercise bulk insert to partitions
+SET client_min_messages=debug1;
+insert into hash_parted select generate_series(1,9999);
+DEBUG:  enabling bulk insert
+DEBUG:  enabling multi insert
+RESET client_min_messages;
+select count(1) from hash_parted;
+ count 
+-------
+ 10001
+(1 row)
+
+commit;
+-- test that index was updated
+vacuum analyze hash_parted;
+explain(costs off)
+select * from hash_parted where a=13;
+                        QUERY PLAN                        
+----------------------------------------------------------
+ Index Only Scan using hpart1_a_idx on hpart1 hash_parted
+   Index Cond: (a = 13)
+(2 rows)
+
+select * from hash_parted where a=13;
+ a  
+----
+ 13
+(1 row)
+
 -- cleanup
 drop table range_parted, list_parted;
 drop table hash_parted;
diff --git a/src/test/regress/sql/insert.sql b/src/test/regress/sql/insert.sql
index 963faa1614..a74eb3826a 100644
--- a/src/test/regress/sql/insert.sql
+++ b/src/test/regress/sql/insert.sql
@@ -280,6 +280,26 @@ from hash_parted order by part;
 -- partitions
 \d+ list_parted
 
+-- bulk inserts
+truncate hash_parted;
+begin;
+create index on hash_parted(a);
+-- make sure small inserts are flushed
+insert into hash_parted values(11);
+insert into hpart0 values(12);
+select * from hash_parted;
+-- exercise bulk insert to partitions
+SET client_min_messages=debug1;
+insert into hash_parted select generate_series(1,9999);
+RESET client_min_messages;
+select count(1) from hash_parted;
+commit;
+-- test that index was updated
+vacuum analyze hash_parted;
+explain(costs off)
+select * from hash_parted where a=13;
+select * from hash_parted where a=13;
+
 -- cleanup
 drop table range_parted, list_parted;
 drop table hash_parted;
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index bab4f3adb3..e07588fb6c 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -423,8 +423,6 @@ ConvertRowtypeExpr
 CookedConstraint
 CopyDest
 CopyInsertMethod
-CopyMultiInsertBuffer
-CopyMultiInsertInfo
 CopyState
 CopyStateData
 CopyStmt
@@ -1402,6 +1400,8 @@ ModifyTableState
 MorphOpaque
 MsgType
 MultiAssignRef
+MultiInsertBuffer
+MultiInsertInfo
 MultiSortSupport
 MultiSortSupportData
 MultiXactId
-- 
2.17.0

>From 2799ea649a720305d74bc77baeed5387cdcbc49d Mon Sep 17 00:00:00 2001
From: Justin Pryzby <pryz...@telsasoft.com>
Date: Tue, 1 Dec 2020 23:11:31 -0600
Subject: [PATCH v10 2/4] WIP: Check for volatile defaults

We want to check if any column *uses* a volatile default value, but after
parsing and rewriting, that information appears to be lost about which column
values are defaults and which were specified.  insertedcols doesn't appear to
be useful for this.  So add a field to track if a TargetEntry is planned with
column default.
---
 src/backend/executor/nodeModifyTable.c | 64 ++++++++++++++++++++++++--
 src/backend/nodes/copyfuncs.c          |  1 +
 src/backend/nodes/equalfuncs.c         |  1 +
 src/backend/nodes/makefuncs.c          |  1 +
 src/backend/nodes/outfuncs.c           |  1 +
 src/backend/nodes/readfuncs.c          |  1 +
 src/backend/optimizer/util/tlist.c     |  1 +
 src/backend/rewrite/rewriteHandler.c   |  3 ++
 src/include/nodes/primnodes.h          |  2 +
 9 files changed, 71 insertions(+), 4 deletions(-)

diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c
index 0907b3ebd5..e018946c0e 100644
--- a/src/backend/executor/nodeModifyTable.c
+++ b/src/backend/executor/nodeModifyTable.c
@@ -50,6 +50,7 @@
 #include "foreign/fdwapi.h"
 #include "miscadmin.h"
 #include "nodes/nodeFuncs.h"
+#include "optimizer/optimizer.h"
 #include "rewrite/rewriteHandler.h"
 #include "storage/bufmgr.h"
 #include "storage/lmgr.h"
@@ -2401,6 +2402,61 @@ ExecModifyTable(PlanState *pstate)
 	return NULL;
 }
 
+/*
+ * Determine if a table has volatile column defaults which are used by a given
+ * planned statement (if the column is not specified or specified as DEFAULT).
+ * This works only for INSERT.
+ */
+static bool
+has_volatile_defaults(ResultRelInfo *resultRelInfo, ModifyTable *node)
+{
+	TupleDesc	tupDesc = RelationGetDescr(resultRelInfo->ri_RelationDesc);
+	Plan		*plan;
+
+	Assert(list_length(node->plans) == 1);
+	plan = linitial(node->plans);
+
+	for (int attnum = 1; attnum <= tupDesc->natts; attnum++)
+	{
+		Form_pg_attribute att = TupleDescAttr(tupDesc, attnum - 1);
+		Expr		*defexpr;
+		TargetEntry	*tle;
+
+		/* We don't need to check dropped/generated attributes */
+		if (att->attisdropped || att->attgenerated)
+			continue;
+
+		tle = list_nth(plan->targetlist, attnum - 1);
+		Assert(tle != NULL);
+		Assert(tle->resno == attnum);
+
+		/*
+		 * If the column was specified with a non-default value, then don't
+		 * check the volatility of its default
+		 */
+		if (!tle->isdefault)
+			continue;
+
+		/* Check the column's default value if one exists */
+		defexpr = (Expr *) build_column_default(resultRelInfo->ri_RelationDesc, attnum);
+		if (defexpr == NULL)
+			continue;
+
+		/* Run the expression through planner */
+		// defexpr = expression_planner(defexpr);
+		// (void) ExecInitExpr(defexpr, NULL);
+		expression_planner(defexpr);
+
+		if (contain_volatile_functions_not_nextval((Node *) defexpr))
+		{
+			elog(DEBUG1, "found volatile att %d", attnum);
+			return true;
+		}
+	}
+
+	return false;
+}
+
 /* ----------------------------------------------------------------
  *		ExecInitModifyTable
  * ----------------------------------------------------------------
@@ -2495,10 +2551,10 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
 		 * are any statement level insert triggers.
 		 */
 		mtstate->miinfo = NULL;
-	else if (mtstate->rootResultRelInfo->ri_FdwRoutine != NULL
-			/* || cstate->volatile_defexprs */ )
-		// XXX contain_volatile_functions_not_nextval((Node *) defexpr);
-		/* Can't support multi-inserts to foreign tables or if there are any */
+	else if (mtstate->rootResultRelInfo->ri_FdwRoutine != NULL ||
+			has_volatile_defaults(mtstate->rootResultRelInfo, node))
+		/* Can't support multi-inserts to foreign tables or if there are any
+		 * volatile default expressions in the table. */
 		mtstate->miinfo = NULL;
 	else
 	{
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index 65bbc18ecb..0088539df7 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -2178,6 +2178,7 @@ _copyTargetEntry(const TargetEntry *from)
 	COPY_SCALAR_FIELD(resorigtbl);
 	COPY_SCALAR_FIELD(resorigcol);
 	COPY_SCALAR_FIELD(resjunk);
+	COPY_SCALAR_FIELD(isdefault);
 
 	return newnode;
 }
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
index c2d73626fc..12260f803f 100644
--- a/src/backend/nodes/equalfuncs.c
+++ b/src/backend/nodes/equalfuncs.c
@@ -770,6 +770,7 @@ _equalTargetEntry(const TargetEntry *a, const TargetEntry *b)
 	COMPARE_SCALAR_FIELD(resorigtbl);
 	COMPARE_SCALAR_FIELD(resorigcol);
 	COMPARE_SCALAR_FIELD(resjunk);
+	COMPARE_SCALAR_FIELD(isdefault);
 
 	return true;
 }
diff --git a/src/backend/nodes/makefuncs.c b/src/backend/nodes/makefuncs.c
index 01c110cd2f..aeeba7032f 100644
--- a/src/backend/nodes/makefuncs.c
+++ b/src/backend/nodes/makefuncs.c
@@ -254,6 +254,7 @@ makeTargetEntry(Expr *expr,
 	tle->ressortgroupref = 0;
 	tle->resorigtbl = InvalidOid;
 	tle->resorigcol = 0;
+	tle->isdefault = false;
 
 	tle->resjunk = resjunk;
 
diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c
index f5dcedf6e8..50f5bfda8e 100644
--- a/src/backend/nodes/outfuncs.c
+++ b/src/backend/nodes/outfuncs.c
@@ -1661,6 +1661,7 @@ _outTargetEntry(StringInfo str, const TargetEntry *node)
 	WRITE_OID_FIELD(resorigtbl);
 	WRITE_INT_FIELD(resorigcol);
 	WRITE_BOOL_FIELD(resjunk);
+	WRITE_BOOL_FIELD(isdefault);
 }
 
 static void
diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c
index 4388aae71d..973b558eec 100644
--- a/src/backend/nodes/readfuncs.c
+++ b/src/backend/nodes/readfuncs.c
@@ -1315,6 +1315,7 @@ _readTargetEntry(void)
 	READ_OID_FIELD(resorigtbl);
 	READ_INT_FIELD(resorigcol);
 	READ_BOOL_FIELD(resjunk);
+	READ_BOOL_FIELD(isdefault);
 
 	READ_DONE();
 }
diff --git a/src/backend/optimizer/util/tlist.c b/src/backend/optimizer/util/tlist.c
index 89853a0630..7ef1517027 100644
--- a/src/backend/optimizer/util/tlist.c
+++ b/src/backend/optimizer/util/tlist.c
@@ -349,6 +349,7 @@ apply_tlist_labeling(List *dest_tlist, List *src_tlist)
 		dest_tle->resorigtbl = src_tle->resorigtbl;
 		dest_tle->resorigcol = src_tle->resorigcol;
 		dest_tle->resjunk = src_tle->resjunk;
+		dest_tle->isdefault = src_tle->isdefault;
 	}
 }
 
diff --git a/src/backend/rewrite/rewriteHandler.c b/src/backend/rewrite/rewriteHandler.c
index 0672f497c6..3d93e24004 100644
--- a/src/backend/rewrite/rewriteHandler.c
+++ b/src/backend/rewrite/rewriteHandler.c
@@ -987,10 +987,13 @@ rewriteTargetListIU(List *targetList,
 			}
 
 			if (new_expr)
+			{
 				new_tle = makeTargetEntry((Expr *) new_expr,
 										  attrno,
 										  pstrdup(NameStr(att_tup->attname)),
 										  false);
+				new_tle->isdefault = true;
+			}
 		}
 
 		/*
diff --git a/src/include/nodes/primnodes.h b/src/include/nodes/primnodes.h
index d4ce037088..888bd36a07 100644
--- a/src/include/nodes/primnodes.h
+++ b/src/include/nodes/primnodes.h
@@ -1437,6 +1437,8 @@ typedef struct TargetEntry
 	AttrNumber	resorigcol;		/* column's number in source table */
 	bool		resjunk;		/* set to true to eliminate the attribute from
 								 * final target list */
+	bool		isdefault;		/* true if using the column default, either
+								 * by "DEFAULT" or omission of the column */
 } TargetEntry;
 
 
-- 
2.17.0

>From 1f1647182bb9c2620bde6af054c5fb27a2534f1c Mon Sep 17 00:00:00 2001
From: Justin Pryzby <pryz...@telsasoft.com>
Date: Sat, 5 Dec 2020 08:52:14 -0600
Subject: [PATCH v10 3/4] COPY: flush multi-insert buffer based on accumulated
 size of tuples..

..rather than line length
---
 src/backend/commands/copyfrom.c        | 2 +-
 src/include/executor/nodeModifyTable.h | 6 +++---
 2 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index 5b8a1e4b61..f36c29ce7f 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -686,7 +686,7 @@ CopyFrom(CopyFromState cstate)
 					/* Add this tuple to the tuple buffer */
 					MultiInsertInfoStore(&cstate->miinfo,
 											 resultRelInfo, myslot,
-											 cstate->line_buf.len,
+											 MemoryContextMemAllocated(myslot->tts_mcxt, true),
 											 cstate->miinfo.cur_lineno);
 
 					/*
diff --git a/src/include/executor/nodeModifyTable.h b/src/include/executor/nodeModifyTable.h
index 30542a542a..87b689099c 100644
--- a/src/include/executor/nodeModifyTable.h
+++ b/src/include/executor/nodeModifyTable.h
@@ -41,10 +41,10 @@ extern void ExecReScanModifyTable(ModifyTableState *node);
 #define MAX_BUFFERED_TUPLES		1000
 
 /*
- * Flush buffers if there are >= this many bytes, as counted by the input
- * size, of tuples stored.
+ * Flush buffers if there are >= this many bytes of tuples stored, as counted
+ * by the slot's memory contexts.
  */
-#define MAX_BUFFERED_BYTES		65535
+#define MAX_BUFFERED_BYTES		(1024*1024*8)
 
 /* Trim the list of buffers back down to this number after flushing */
 #define MAX_PARTITION_BUFFERS	32
-- 
2.17.0

>From 85cf321bfab8efeb51affa97bb4226646264340e Mon Sep 17 00:00:00 2001
From: Justin Pryzby <pryz...@telsasoft.com>
Date: Tue, 1 Dec 2020 17:20:25 -0600
Subject: [PATCH v10 4/4] WIP: check tuple size

Or maybe INSERT should flush buffer based only on the *number* of tuples, and
not their size ?
---
 src/backend/executor/nodeModifyTable.c | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)

diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c
index e018946c0e..ec9e43ac55 100644
--- a/src/backend/executor/nodeModifyTable.c
+++ b/src/backend/executor/nodeModifyTable.c
@@ -760,7 +760,13 @@ ExecInsert(ModifyTableState *mtstate,
 			batchslot = MultiInsertInfoNextFreeSlot(mtstate->miinfo, resultRelInfo);
 			ExecCopySlot(batchslot, slot);
 
-			MultiInsertInfoStore(mtstate->miinfo, resultRelInfo, batchslot, 0, 0); // XXX: tuplen/lineno
+			MultiInsertInfoStore(mtstate->miinfo, resultRelInfo, batchslot,
+					// sizeof(void*) * batchslot->tts_nvalid, /* tuple size - underestimate */
+					MemoryContextMemAllocated(batchslot->tts_mcxt, true), /* tuple size */
+					mtstate->ntuples); /* lineno */
+
+			elog(DEBUG2, "bufferedBytes %d; tuples %ld",
+					mtstate->miinfo->bufferedBytes, mtstate->ntuples);
 
 			if (MultiInsertInfoIsFull(mtstate->miinfo))
 				MultiInsertInfoFlush(mtstate->miinfo, resultRelInfo);
-- 
2.17.0

Reply via email to