diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 05ceb6550d..a395283e06 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -2106,7 +2106,7 @@ heap_prepare_insert(Relation relation, HeapTuple tup, TransactionId xid,
  * temporary context before calling this, if that's a problem.
  */
 void
-heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
+heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples,
 				  CommandId cid, int options, BulkInsertState bistate)
 {
 	TransactionId xid = GetCurrentTransactionId();
@@ -2127,11 +2127,18 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
 	saveFreeSpace = RelationGetTargetPageFreeSpace(relation,
 												   HEAP_DEFAULT_FILLFACTOR);
 
-	/* Toast and set header data in all the tuples */
+	/* Toast and set header data in all the slots */
 	heaptuples = palloc(ntuples * sizeof(HeapTuple));
 	for (i = 0; i < ntuples; i++)
-		heaptuples[i] = heap_prepare_insert(relation, tuples[i],
-											xid, cid, options);
+	{
+		HeapTuple tuple;
+
+		tuple = ExecFetchSlotHeapTuple(slots[i], true, NULL);
+		slots[i]->tts_tableOid = RelationGetRelid(relation);
+		tuple->t_tableOid = slots[i]->tts_tableOid;
+		heaptuples[i] = heap_prepare_insert(relation, tuple, xid, cid,
+											options);
+	}
 
 	/*
 	 * We're about to do the actual inserts -- but check for conflict first,
@@ -2361,13 +2368,9 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
 			CacheInvalidateHeapTuple(relation, heaptuples[i], NULL);
 	}
 
-	/*
-	 * Copy t_self fields back to the caller's original tuples. This does
-	 * nothing for untoasted tuples (tuples[i] == heaptuples[i)], but it's
-	 * probably faster to always copy than check.
-	 */
+	/* copy t_self fields back to the caller's slots */
 	for (i = 0; i < ntuples; i++)
-		tuples[i]->t_self = heaptuples[i]->t_self;
+		slots[i]->tts_tid = heaptuples[i]->t_self;
 
 	pgstat_count_heap_insert(relation, ntuples);
 }
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index 5c96fc91b7..2f71e45f4b 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -2411,6 +2411,7 @@ static const TableAmRoutine heapam_methods = {
 	.tuple_complete_speculative = heapam_tuple_complete_speculative,
 	.tuple_delete = heapam_tuple_delete,
 	.tuple_update = heapam_tuple_update,
+	.multi_insert = heap_multi_insert,
 	.tuple_lock = heapam_tuple_lock,
 	.finish_bulk_insert = heapam_finish_bulk_insert,
 
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index c1fd7b78ce..3cd7682bde 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -236,6 +236,52 @@ typedef struct
 } DR_copy;
 
 
+/*
+* No more than this many tuples per CopyMultiInsertBuffer
+ *
+ * Caution: Don't make this too big as we could end up with this many
+ * CopyMultiInsertBuffer items stored in CopyMultiInsertInfo's
+ * multiInsertBuffers list.  Increasing this can cause quadratic growth
+ * in memory requirements during copies into partitioned tables with a
+ * large number of partitions.
+ */
+#define MAX_BUFFERED_TUPLES		1000
+
+/* Class the buffer full if there are >= this many bytes of tuples stored */
+#define MAX_BUFFERED_BYTES		65535
+
+/* Trim the list of buffers back down to this number after flushing */
+#define MAX_PARTITION_BUFFERS	32
+
+/* Stores multi-insert data related to a single relation in CopyFrom. */
+typedef struct CopyMultiInsertBuffer
+{
+	TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */
+	ResultRelInfo *resultRelInfo;	/* ResultRelInfo for 'relid' */
+	BulkInsertState bistate;	/* BulkInsertState for this rel */
+	int			nused;			/* number of 'slots' containing tuples */
+	uint64		linenos[MAX_BUFFERED_TUPLES];	/* Line # of tuple in copy
+												 * stream */
+} CopyMultiInsertBuffer;
+
+/*
+ * Stores one or many CopyMultiInsertBuffers and details about the size and
+ * number of tuples which are stored in them.  This allows multiple buffers to
+ * exist at once when COPYing into a partitioned table.
+ */
+typedef struct CopyMultiInsertInfo
+{
+	List	   *multiInsertBuffers; /* List of tracked CopyMultiInsertBuffers */
+	int			bufferedTuples; /* number of tuples buffered over all buffers */
+	int			bufferedBytes;	/* number of bytes from all buffered tuples */
+	int			nbuffers;		/* number of buffers we're tracking */
+	CopyState	cstate;			/* Copy state for this CopyMultiInsertInfo */
+	EState	   *estate;			/* Executor state used for COPY */
+	CommandId	mycid;			/* Command Id used for COPY */
+	int			ti_options;		/* table insert options */
+} CopyMultiInsertInfo;
+
+
 /*
  * These macros centralize code used to process line_buf and raw_buf buffers.
  * They are macros because they often do continue/break control and to avoid
@@ -316,14 +362,7 @@ static CopyState BeginCopyTo(ParseState *pstate, Relation rel, RawStmt *query,
 static void EndCopyTo(CopyState cstate);
 static uint64 DoCopyTo(CopyState cstate);
 static uint64 CopyTo(CopyState cstate);
-static void CopyOneRowTo(CopyState cstate,
-			 Datum *values, bool *nulls);
-static void CopyFromInsertBatch(CopyState cstate, EState *estate,
-					CommandId mycid, int ti_options,
-					ResultRelInfo *resultRelInfo, TupleTableSlot *myslot,
-					BulkInsertState bistate,
-					int nBufferedTuples, HeapTuple *bufferedTuples,
-					uint64 firstBufferedLineNo);
+static void CopyOneRowTo(CopyState cstate, TupleTableSlot *slot);
 static bool CopyReadLine(CopyState cstate);
 static bool CopyReadLineText(CopyState cstate);
 static int	CopyReadAttributesText(CopyState cstate);
@@ -2073,33 +2112,27 @@ CopyTo(CopyState cstate)
 
 	if (cstate->rel)
 	{
-		Datum	   *values;
-		bool	   *nulls;
+		TupleTableSlot *slot;
 		TableScanDesc scandesc;
-		HeapTuple	tuple;
-
-		values = (Datum *) palloc(num_phys_attrs * sizeof(Datum));
-		nulls = (bool *) palloc(num_phys_attrs * sizeof(bool));
 
 		scandesc = table_beginscan(cstate->rel, GetActiveSnapshot(), 0, NULL);
+		slot = table_slot_create(cstate->rel, NULL);
 
 		processed = 0;
-		while ((tuple = heap_getnext(scandesc, ForwardScanDirection)) != NULL)
+		while (table_scan_getnextslot(scandesc, ForwardScanDirection, slot))
 		{
 			CHECK_FOR_INTERRUPTS();
 
-			/* Deconstruct the tuple ... faster than repeated heap_getattr */
-			heap_deform_tuple(tuple, tupDesc, values, nulls);
+			/* Deconstruct the tuple ... */
+			slot_getallattrs(slot);
 
 			/* Format and send the data */
-			CopyOneRowTo(cstate, values, nulls);
+			CopyOneRowTo(cstate, slot);
 			processed++;
 		}
 
+		ExecDropSingleTupleTableSlot(slot);
 		table_endscan(scandesc);
-
-		pfree(values);
-		pfree(nulls);
 	}
 	else
 	{
@@ -2125,7 +2158,7 @@ CopyTo(CopyState cstate)
  * Emit one row during CopyTo().
  */
 static void
-CopyOneRowTo(CopyState cstate, Datum *values, bool *nulls)
+CopyOneRowTo(CopyState cstate, TupleTableSlot *slot)
 {
 	bool		need_delim = false;
 	FmgrInfo   *out_functions = cstate->out_functions;
@@ -2142,11 +2175,14 @@ CopyOneRowTo(CopyState cstate, Datum *values, bool *nulls)
 		CopySendInt16(cstate, list_length(cstate->attnumlist));
 	}
 
+	/* Make sure the tuple is fully deconstructed */
+	slot_getallattrs(slot);
+
 	foreach(cur, cstate->attnumlist)
 	{
 		int			attnum = lfirst_int(cur);
-		Datum		value = values[attnum - 1];
-		bool		isnull = nulls[attnum - 1];
+		Datum		value = slot->tts_values[attnum - 1];
+		bool		isnull = slot->tts_isnull[attnum - 1];
 
 		if (!cstate->binary)
 		{
@@ -2305,49 +2341,353 @@ limit_printout_length(const char *str)
 	return res;
 }
 
+/*
+ * CopyMultiInsertBuffer_Init
+ *		Allocate memory and initialize a new CopyMultiInsertBuffer for this
+ *		ResultRelInfo.
+ */
+static CopyMultiInsertBuffer *
+CopyMultiInsertBuffer_Init(ResultRelInfo *rri)
+{
+	CopyMultiInsertBuffer *buffer;
+
+	buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer));
+	memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
+	buffer->resultRelInfo = rri;
+	buffer->bistate = GetBulkInsertState();
+	buffer->nused = 0;
+
+	return buffer;
+}
+
+/*
+ * CopyMultiInsertInfo_SetupBuffer
+ *		Make a new buffer for this rri.
+ */
+static inline void
+CopyMultiInsertInfo_SetupBuffer(CopyMultiInsertInfo *miinfo,
+								ResultRelInfo *rri)
+{
+	CopyMultiInsertBuffer *buffer;
+
+	buffer = CopyMultiInsertBuffer_Init(rri);
+
+	/* Setup back-link so we can easily find this buffer again */
+	rri->ri_CopyMultiInsertBuffer = buffer;
+	/* Record that we're tracking this buffer */
+	miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
+}
+
+/*
+ * CopyMultiInsertInfo_Init
+ *		Initialize an already allocated CopyMultiInsertInfo. If rri is a
+ *		non-partitioned table then a CopyMultiInsertBuffer is set up for
+ *		that table.
+ */
+static void
+CopyMultiInsertInfo_Init(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
+						 CopyState cstate, EState *estate, CommandId mycid,
+						 int ti_options)
+{
+	miinfo->multiInsertBuffers = NIL;
+	miinfo->bufferedTuples = 0;
+	miinfo->bufferedBytes = 0;
+	miinfo->cstate = cstate;
+	miinfo->estate = estate;
+	miinfo->mycid = mycid;
+	miinfo->ti_options = ti_options;
+
+	/*
+	 * Only setup the buffer when not dealing with a partitioned table.
+	 * Buffers for partitioned tables will just be setup when we need to send
+	 * tuples their way for the first time.
+	 */
+	if (rri->ri_RelationDesc->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
+		CopyMultiInsertInfo_SetupBuffer(miinfo, rri);
+}
+
+/*
+ * CopyMultiInsertInfo_IsFull
+ *		Returns true if the buffers are full
+ */
+static inline bool
+CopyMultiInsertInfo_IsFull(CopyMultiInsertInfo *miinfo)
+{
+	if (miinfo->bufferedTuples >= MAX_BUFFERED_TUPLES ||
+		miinfo->bufferedBytes >= MAX_BUFFERED_BYTES)
+		return true;
+	return false;
+}
+
+/*
+ * CopyMultiInsertInfo_IsEmpty
+ *		Returns true if we have no buffered tuples
+ */
+static inline bool
+CopyMultiInsertInfo_IsEmpty(CopyMultiInsertInfo *miinfo)
+{
+	return miinfo->bufferedTuples == 0;
+}
+
+/*
+ * CopyMultiInsertBuffer_Flush
+ *		Write the tuples stored in 'buffer' out to the table.
+ */
+static inline void
+CopyMultiInsertBuffer_Flush(CopyMultiInsertInfo *miinfo,
+							CopyMultiInsertBuffer *buffer)
+{
+	MemoryContext oldcontext;
+	int			i;
+	uint64		save_cur_lineno;
+	CopyState	cstate = miinfo->cstate;
+	EState	   *estate = miinfo->estate;
+	CommandId	mycid = miinfo->mycid;
+	int			ti_options = miinfo->ti_options;
+	bool		line_buf_valid = cstate->line_buf_valid;
+	int			nused = buffer->nused;
+	ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
+	TupleTableSlot **slots = buffer->slots;
+
+	/*
+	 * Print error context information correctly, if one of the operations
+	 * below fail.
+	 */
+	cstate->line_buf_valid = false;
+	save_cur_lineno = cstate->cur_lineno;
+
+	/*
+	 * table_multi_insert may leak memory, so switch to short-lived memory
+	 * context before calling it.
+	 */
+	oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
+	table_multi_insert(resultRelInfo->ri_RelationDesc,
+					   slots,
+					   nused,
+					   mycid,
+					   ti_options,
+					   buffer->bistate);
+	MemoryContextSwitchTo(oldcontext);
+
+	/*
+	 * If there are any indexes, update them for all the inserted tuples, and
+	 * run AFTER ROW INSERT triggers.
+	 */
+	if (resultRelInfo->ri_NumIndices > 0)
+	{
+		for (i = 0; i < nused; i++)
+		{
+			List	   *recheckIndexes;
+
+			cstate->cur_lineno = buffer->linenos[i];
+			recheckIndexes =
+				ExecInsertIndexTuples(buffer->slots[i], estate, false, NULL,
+									  NIL);
+			ExecARInsertTriggers(estate, resultRelInfo,
+								 slots[i], recheckIndexes,
+								 cstate->transition_capture);
+			list_free(recheckIndexes);
+		}
+	}
+
+	/*
+	 * There's no indexes, but see if we need to run AFTER ROW INSERT triggers
+	 * anyway.
+	 */
+	else if (resultRelInfo->ri_TrigDesc != NULL &&
+			 (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
+			  resultRelInfo->ri_TrigDesc->trig_insert_new_table))
+	{
+		for (i = 0; i < nused; i++)
+		{
+			cstate->cur_lineno = buffer->linenos[i];
+			ExecARInsertTriggers(estate, resultRelInfo,
+								 slots[i], NIL, cstate->transition_capture);
+		}
+	}
+
+	for (i = 0; i < nused; i++)
+		ExecClearTuple(slots[i]);
+
+	/* Mark that all slots are free */
+	buffer->nused = 0;
+
+	/* reset cur_lineno and line_buf_valid to what they were */
+	cstate->line_buf_valid = line_buf_valid;
+	cstate->cur_lineno = save_cur_lineno;
+}
+
+/*
+ * CopyMultiInsertBuffer_Cleanup
+ *		Drop used slots and free member for this buffer.  The buffer
+ *		must be flushed before cleanup.
+ */
+static inline void
+CopyMultiInsertBuffer_Cleanup(CopyMultiInsertBuffer *buffer)
+{
+	int			i;
+
+	/* Ensure buffer was flushed */
+	Assert(buffer->nused == 0);
+
+	/* Remove back-link to ourself */
+	buffer->resultRelInfo->ri_CopyMultiInsertBuffer = NULL;
+
+	ReleaseBulkInsertStatePin(buffer->bistate);
+
+	/* Since we only create slots on demand, just drop the non-null ones. */
+	for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
+		ExecDropSingleTupleTableSlot(buffer->slots[i]);
+
+	pfree(buffer);
+}
+
+/*
+ * CopyMultiInsertInfo_Flush
+ *		Write out all stored tuples in all buffers out to the tables.
+ *
+ * Once flushed we also trim the tracked buffers list down to size by removing
+ * the buffers created earliest first.
+ *
+ * Callers should pass 'curr_rri' is the ResultRelInfo that's currently being
+ * used.  When cleaning up old buffers we'll never remove the one for
+ * 'curr_rri'.
+ */
+static inline void
+CopyMultiInsertInfo_Flush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri)
+{
+	ListCell   *lc;
+
+	foreach(lc, miinfo->multiInsertBuffers)
+	{
+		CopyMultiInsertBuffer *buffer = (CopyMultiInsertBuffer *) lfirst(lc);
+
+		CopyMultiInsertBuffer_Flush(miinfo, buffer);
+	}
+
+	miinfo->bufferedTuples = 0;
+	miinfo->bufferedBytes = 0;
+
+	/*
+	 * Trim the list of tracked buffers down if it exceeds the limit.  Here we
+	 * remove buffers starting with the ones we created first.  It seems more
+	 * likely that these older ones are less likely to be needed than ones
+	 * that were just created.
+	 */
+	while (list_length(miinfo->multiInsertBuffers) > MAX_PARTITION_BUFFERS)
+	{
+		CopyMultiInsertBuffer *buffer;
+
+		buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
+
+		/*
+		 * We never want to remove the buffer that's currently being used, so
+		 * if we happen to find that then move it to the end of the list.
+		 */
+		if (buffer->resultRelInfo == curr_rri)
+		{
+			miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
+			miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
+			buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
+		}
+
+		CopyMultiInsertBuffer_Cleanup(buffer);
+		miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
+	}
+}
+
+/*
+ * CopyMultiInsertInfo_Cleanup
+ *		Cleanup allocated buffers and free memory
+ */
+static inline void
+CopyMultiInsertInfo_Cleanup(CopyMultiInsertInfo *miinfo)
+{
+	ListCell   *lc;
+
+	foreach(lc, miinfo->multiInsertBuffers)
+		CopyMultiInsertBuffer_Cleanup(lfirst(lc));
+
+	list_free(miinfo->multiInsertBuffers);
+}
+
+/*
+ * CopyMultiInsertInfo_NextFreeSlot
+ *		Get the next TupleTableSlot that the next tuple should be stored in.
+ *
+ * Callers must ensure that the buffer is not full.
+ */
+static inline TupleTableSlot *
+CopyMultiInsertInfo_NextFreeSlot(CopyMultiInsertInfo *miinfo,
+								 ResultRelInfo *rri)
+{
+	CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
+	int			nused = buffer->nused;
+
+	Assert(buffer != NULL);
+	Assert(nused < MAX_BUFFERED_TUPLES);
+
+	if (buffer->slots[nused] == NULL)
+		buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL);
+	return buffer->slots[nused];
+}
+
+/*
+ * CopyMultiInsertInfo_Store
+ *		Consume the previously reserved TupleTableSlot that was reserved by
+ *		CopyMultiInsertInfo_NextFreeSlot.
+ */
+static inline void
+CopyMultiInsertInfo_Store(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
+						  TupleTableSlot *slot, int tuplen, uint64 lineno)
+{
+	CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
+
+	Assert(buffer != NULL);
+	Assert(slot == buffer->slots[buffer->nused]);
+
+	/* Store the line number so we can properly report any errors later */
+	buffer->linenos[buffer->nused] = lineno;
+
+	/* Record this slot as being used */
+	buffer->nused++;
+
+	/* Update how many tuples are stored and their size */
+	miinfo->bufferedTuples++;
+	miinfo->bufferedBytes += tuplen;
+}
+
 /*
  * Copy FROM file to relation.
  */
 uint64
 CopyFrom(CopyState cstate)
 {
-	HeapTuple	tuple;
-	TupleDesc	tupDesc;
-	Datum	   *values;
-	bool	   *nulls;
 	ResultRelInfo *resultRelInfo;
 	ResultRelInfo *target_resultRelInfo;
 	ResultRelInfo *prevResultRelInfo = NULL;
 	EState	   *estate = CreateExecutorState(); /* for ExecConstraints() */
 	ModifyTableState *mtstate;
 	ExprContext *econtext;
-	TupleTableSlot *myslot;
+	TupleTableSlot *singleslot = NULL;
 	MemoryContext oldcontext = CurrentMemoryContext;
-	MemoryContext batchcontext;
 
 	PartitionTupleRouting *proute = NULL;
 	ErrorContextCallback errcallback;
 	CommandId	mycid = GetCurrentCommandId(true);
 	int			ti_options = 0; /* start with default table_insert options */
-	BulkInsertState bistate;
+	BulkInsertState bistate = NULL;
 	CopyInsertMethod insertMethod;
+	CopyMultiInsertInfo multiInsertInfo;
 	uint64		processed = 0;
-	int			nBufferedTuples = 0;
 	bool		has_before_insert_row_trig;
 	bool		has_instead_insert_row_trig;
 	bool		leafpart_use_multi_insert = false;
 
-#define MAX_BUFFERED_TUPLES 1000
-#define RECHECK_MULTI_INSERT_THRESHOLD 1000
-	HeapTuple  *bufferedTuples = NULL;	/* initialize to silence warning */
-	Size		bufferedTuplesSize = 0;
-	uint64		firstBufferedLineNo = 0;
-	uint64		lastPartitionSampleLineNo = 0;
-	uint64		nPartitionChanges = 0;
-	double		avgTuplesPerPartChange = 0;
-
 	Assert(cstate->rel);
 
+	memset(&multiInsertInfo, 0, sizeof(CopyMultiInsertInfo));
+
 	/*
 	 * The target must be a plain, foreign, or partitioned relation, or have
 	 * an INSTEAD OF INSERT row trigger.  (Currently, such triggers are only
@@ -2382,8 +2722,6 @@ CopyFrom(CopyState cstate)
 							RelationGetRelationName(cstate->rel))));
 	}
 
-	tupDesc = RelationGetDescr(cstate->rel);
-
 	/*----------
 	 * Check to see if we can avoid writing WAL
 	 *
@@ -2518,10 +2856,6 @@ CopyFrom(CopyState cstate)
 
 	ExecInitRangeTable(estate, cstate->range_table);
 
-	/* Set up a tuple slot too */
-	myslot = ExecInitExtraTupleSlot(estate, tupDesc,
-									&TTSOpsHeapTuple);
-
 	/*
 	 * Set up a ModifyTableState so we can let FDW(s) init themselves for
 	 * foreign-table result relation(s).
@@ -2565,10 +2899,11 @@ CopyFrom(CopyState cstate)
 												&mtstate->ps);
 
 	/*
-	 * It's more efficient to prepare a bunch of tuples for insertion, and
-	 * insert them in one heap_multi_insert() call, than call heap_insert()
-	 * separately for every tuple. However, there are a number of reasons why
-	 * we might not be able to do this.  These are explained below.
+	 * It's generally more efficient to prepare a bunch of tuples for
+	 * insertion, and insert them in one table_multi_insert() call, than call
+	 * table_insert() separately for every tuple. However, there are a number
+	 * of reasons why we might not be able to do this.  These are explained
+	 * below.
 	 */
 	if (resultRelInfo->ri_TrigDesc != NULL &&
 		(resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
@@ -2589,8 +2924,8 @@ CopyFrom(CopyState cstate)
 		 * For partitioned tables we can't support multi-inserts when there
 		 * are any statement level insert triggers. It might be possible to
 		 * allow partitioned tables with such triggers in the future, but for
-		 * now, CopyFromInsertBatch expects that any before row insert and
-		 * statement level insert triggers are on the same relation.
+		 * now, CopyMultiInsertInfo_Flush expects that any before row insert
+		 * and statement level insert triggers are on the same relation.
 		 */
 		insertMethod = CIM_SINGLE;
 	}
@@ -2622,8 +2957,7 @@ CopyFrom(CopyState cstate)
 	{
 		/*
 		 * For partitioned tables, we may still be able to perform bulk
-		 * inserts for sets of consecutive tuples which belong to the same
-		 * partition.  However, the possibility of this depends on which types
+		 * inserts.  However, the possibility of this depends on which types
 		 * of triggers exist on the partition.  We must disable bulk inserts
 		 * if the partition is a foreign table or it has any before row insert
 		 * or insert instead triggers (same as we checked above for the parent
@@ -2632,18 +2966,27 @@ CopyFrom(CopyState cstate)
 		 * have the intermediate insert method of CIM_MULTI_CONDITIONAL to
 		 * flag that we must later determine if we can use bulk-inserts for
 		 * the partition being inserted into.
-		 *
-		 * Normally, when performing bulk inserts we just flush the insert
-		 * buffer whenever it becomes full, but for the partitioned table
-		 * case, we flush it whenever the current tuple does not belong to the
-		 * same partition as the previous tuple.
 		 */
 		if (proute)
 			insertMethod = CIM_MULTI_CONDITIONAL;
 		else
 			insertMethod = CIM_MULTI;
 
-		bufferedTuples = palloc(MAX_BUFFERED_TUPLES * sizeof(HeapTuple));
+		CopyMultiInsertInfo_Init(&multiInsertInfo, resultRelInfo, cstate,
+								 estate, mycid, ti_options);
+	}
+
+	/*
+	 * If not using batch mode (which allocates slots as needed) set up a
+	 * tuple slot too. When inserting into a partitioned table, we also need
+	 * one, even if we might batch insert, to read the tuple in the root
+	 * partition's form.
+	 */
+	if (insertMethod == CIM_SINGLE || insertMethod == CIM_MULTI_CONDITIONAL)
+	{
+		singleslot = table_slot_create(resultRelInfo->ri_RelationDesc,
+									   &estate->es_tupleTable);
+		bistate = GetBulkInsertState();
 	}
 
 	has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
@@ -2660,10 +3003,6 @@ CopyFrom(CopyState cstate)
 	 */
 	ExecBSInsertTriggers(estate, resultRelInfo);
 
-	values = (Datum *) palloc(tupDesc->natts * sizeof(Datum));
-	nulls = (bool *) palloc(tupDesc->natts * sizeof(bool));
-
-	bistate = GetBulkInsertState();
 	econtext = GetPerTupleExprContext(estate);
 
 	/* Set up callback to identify error line number */
@@ -2672,17 +3011,9 @@ CopyFrom(CopyState cstate)
 	errcallback.previous = error_context_stack;
 	error_context_stack = &errcallback;
 
-	/*
-	 * Set up memory context for batches. For cases without batching we could
-	 * use the per-tuple context, but it's simpler to just use it every time.
-	 */
-	batchcontext = AllocSetContextCreate(CurrentMemoryContext,
-										 "batch context",
-										 ALLOCSET_DEFAULT_SIZES);
-
 	for (;;)
 	{
-		TupleTableSlot *slot;
+		TupleTableSlot *myslot;
 		bool		skip_tuple;
 
 		CHECK_FOR_INTERRUPTS();
@@ -2693,20 +3024,33 @@ CopyFrom(CopyState cstate)
 		 */
 		ResetPerTupleExprContext(estate);
 
+		if (insertMethod == CIM_SINGLE || proute)
+		{
+			myslot = singleslot;
+			Assert(myslot != NULL);
+		}
+		else
+		{
+			Assert(resultRelInfo == target_resultRelInfo);
+			Assert(insertMethod == CIM_MULTI);
+
+			myslot = CopyMultiInsertInfo_NextFreeSlot(&multiInsertInfo,
+													  resultRelInfo);
+		}
+
 		/*
 		 * Switch to per-tuple context before calling NextCopyFrom, which does
 		 * evaluate default expressions etc. and requires per-tuple context.
 		 */
 		MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
 
-		if (!NextCopyFrom(cstate, econtext, values, nulls))
-			break;
+		ExecClearTuple(myslot);
 
-		/* Switch into per-batch memory context before forming the tuple. */
-		MemoryContextSwitchTo(batchcontext);
+		/* Directly store the values/nulls array in the slot */
+		if (!NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull))
+			break;
 
-		/* And now we can form the input tuple. */
-		tuple = heap_form_tuple(tupDesc, values, nulls);
+		ExecStoreVirtualTuple(myslot);
 
 		/*
 		 * Constraints might reference the tableoid column, so (re-)initialize
@@ -2717,18 +3061,15 @@ CopyFrom(CopyState cstate)
 		/* Triggers and stuff need to be invoked in query context. */
 		MemoryContextSwitchTo(oldcontext);
 
-		/* Place tuple in tuple slot --- but slot shouldn't free it */
-		slot = myslot;
-		ExecStoreHeapTuple(tuple, slot, false);
-
 		if (cstate->whereClause)
 		{
 			econtext->ecxt_scantuple = myslot;
+			/* Skip items that don't match the COPY's WHERE clause */
 			if (!ExecQual(cstate->qualexpr, econtext))
 				continue;
 		}
 
-		/* Determine the partition to heap_insert the tuple into */
+		/* Determine the partition to table_insert the tuple into */
 		if (proute)
 		{
 			TupleConversionMap *map;
@@ -2739,80 +3080,10 @@ CopyFrom(CopyState cstate)
 			 * if the found partition is not suitable for INSERTs.
 			 */
 			resultRelInfo = ExecFindPartition(mtstate, target_resultRelInfo,
-											  proute, slot, estate);
+											  proute, myslot, estate);
 
 			if (prevResultRelInfo != resultRelInfo)
 			{
-				/* Check if we can multi-insert into this partition */
-				if (insertMethod == CIM_MULTI_CONDITIONAL)
-				{
-					/*
-					 * When performing bulk-inserts into partitioned tables we
-					 * must insert the tuples seen so far to the heap whenever
-					 * the partition changes.
-					 */
-					if (nBufferedTuples > 0)
-					{
-						MemoryContext	oldcontext;
-
-						CopyFromInsertBatch(cstate, estate, mycid, ti_options,
-											prevResultRelInfo, myslot, bistate,
-											nBufferedTuples, bufferedTuples,
-											firstBufferedLineNo);
-						nBufferedTuples = 0;
-						bufferedTuplesSize = 0;
-
-						/*
-						 * The tuple is already allocated in the batch context, which
-						 * we want to reset.  So to keep the tuple we copy it into the
-						 * short-lived (per-tuple) context, reset the batch context
-						 * and then copy it back into the per-batch one.
-						 */
-						oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
-						tuple = heap_copytuple(tuple);
-						MemoryContextSwitchTo(oldcontext);
-
-						/* cleanup the old batch */
-						MemoryContextReset(batchcontext);
-
-						/* copy the tuple back to the per-batch context */
-						oldcontext = MemoryContextSwitchTo(batchcontext);
-						tuple = heap_copytuple(tuple);
-						MemoryContextSwitchTo(oldcontext);
-
-						/*
-						 * Also push the tuple copy to the slot (resetting the context
-						 * invalidated the slot contents).
-						 */
-						ExecStoreHeapTuple(tuple, slot, false);
-					}
-
-					nPartitionChanges++;
-
-					/*
-					 * Here we adaptively enable multi-inserts based on the
-					 * average number of tuples from recent multi-insert
-					 * batches.  We recalculate the average every
-					 * RECHECK_MULTI_INSERT_THRESHOLD tuples instead of taking
-					 * the average over the whole copy.  This allows us to
-					 * enable multi-inserts when we get periods in the copy
-					 * stream that have tuples commonly belonging to the same
-					 * partition, and disable when the partition is changing
-					 * too often.
-					 */
-					if (unlikely(lastPartitionSampleLineNo <= (cstate->cur_lineno -
-															   RECHECK_MULTI_INSERT_THRESHOLD)
-								 && cstate->cur_lineno >= RECHECK_MULTI_INSERT_THRESHOLD))
-					{
-						avgTuplesPerPartChange =
-							(cstate->cur_lineno - lastPartitionSampleLineNo) /
-							(double) nPartitionChanges;
-
-						lastPartitionSampleLineNo = cstate->cur_lineno;
-						nPartitionChanges = 0;
-					}
-				}
-
 				/* Determine which triggers exist on this partition */
 				has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
 											  resultRelInfo->ri_TrigDesc->trig_insert_before_row);
@@ -2821,23 +3092,22 @@ CopyFrom(CopyState cstate)
 											   resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
 
 				/*
-				 * Tests have shown that using multi-inserts when the
-				 * partition changes on every tuple slightly decreases the
-				 * performance, however, there are benefits even when only
-				 * some batches have just 2 tuples, so let's enable
-				 * multi-inserts even when the average is quite low.
+				 * Enable multi-inserts when the partition has BEFORE/INSTEAD
+				 * OF triggers, or if the partition is a foreign partition.
 				 */
 				leafpart_use_multi_insert = insertMethod == CIM_MULTI_CONDITIONAL &&
-					avgTuplesPerPartChange >= 1.3 &&
 					!has_before_insert_row_trig &&
 					!has_instead_insert_row_trig &&
 					resultRelInfo->ri_FdwRoutine == NULL;
 
-				/*
-				 * We'd better make the bulk insert mechanism gets a new
-				 * buffer when the partition being inserted into changes.
-				 */
-				ReleaseBulkInsertStatePin(bistate);
+				/* Set the multi-insert buffer to use for this partition. */
+				if (leafpart_use_multi_insert &&
+					resultRelInfo->ri_CopyMultiInsertBuffer == NULL)
+					CopyMultiInsertInfo_SetupBuffer(&multiInsertInfo,
+													resultRelInfo);
+
+				if (bistate != NULL)
+					ReleaseBulkInsertStatePin(bistate);
 				prevResultRelInfo = resultRelInfo;
 			}
 
@@ -2879,26 +3149,48 @@ CopyFrom(CopyState cstate)
 			 * rowtype.
 			 */
 			map = resultRelInfo->ri_PartitionInfo->pi_RootToPartitionMap;
-			if (map != NULL)
+			if (insertMethod == CIM_SINGLE || !leafpart_use_multi_insert)
 			{
-				TupleTableSlot *new_slot;
-				MemoryContext oldcontext;
-
-				new_slot = resultRelInfo->ri_PartitionInfo->pi_PartitionTupleSlot;
-				Assert(new_slot != NULL);
-
-				slot = execute_attr_map_slot(map->attrMap, slot, new_slot);
+				/* non batch insert */
+				if (map != NULL)
+				{
+					TupleTableSlot *new_slot;
 
+					new_slot = resultRelInfo->ri_PartitionInfo->pi_PartitionTupleSlot;
+					myslot = execute_attr_map_slot(map->attrMap, myslot, new_slot);
+				}
+			}
+			else
+			{
 				/*
-				 * Get the tuple in the per-batch context, so that it will be
-				 * freed after each batch insert.
+				 * Batch insert into partitioned table.
 				 */
-				oldcontext = MemoryContextSwitchTo(batchcontext);
-				tuple = ExecCopySlotHeapTuple(slot);
-				MemoryContextSwitchTo(oldcontext);
+				TupleTableSlot *nextslot;
+
+				/* no other path available for partitioned table */
+				Assert(insertMethod == CIM_MULTI_CONDITIONAL);
+
+				nextslot = CopyMultiInsertInfo_NextFreeSlot(&multiInsertInfo,
+															resultRelInfo);
+
+				if (map != NULL)
+					myslot = execute_attr_map_slot(map->attrMap, myslot, nextslot);
+				else
+				{
+					/*
+					 * This looks more expensive than it is (Believe me, I
+					 * optimized it away. Twice). The input is in virtual
+					 * form, and we'll materialize the slot below - for most
+					 * slot types the copy performs the work materialization
+					 * would later require anyway.
+					 */
+					ExecCopySlot(nextslot, myslot);
+					myslot = nextslot;
+				}
 			}
 
-			slot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
+			/* ensure that triggers etc see the right relation  */
+			myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
 		}
 
 		skip_tuple = false;
@@ -2906,7 +3198,7 @@ CopyFrom(CopyState cstate)
 		/* BEFORE ROW INSERT Triggers */
 		if (has_before_insert_row_trig)
 		{
-			if (!ExecBRInsertTriggers(estate, resultRelInfo, slot))
+			if (!ExecBRInsertTriggers(estate, resultRelInfo, myslot))
 				skip_tuple = true;	/* "do nothing" */
 		}
 
@@ -2919,7 +3211,7 @@ CopyFrom(CopyState cstate)
 			 */
 			if (has_instead_insert_row_trig)
 			{
-				ExecIRInsertTriggers(estate, resultRelInfo, slot);
+				ExecIRInsertTriggers(estate, resultRelInfo, myslot);
 			}
 			else
 			{
@@ -2931,12 +3223,7 @@ CopyFrom(CopyState cstate)
 				 */
 				if (resultRelInfo->ri_RelationDesc->rd_att->constr &&
 					resultRelInfo->ri_RelationDesc->rd_att->constr->has_generated_stored)
-				{
-					ExecComputeStoredGenerated(estate, slot);
-					MemoryContextSwitchTo(batchcontext);
-					tuple = ExecCopySlotHeapTuple(slot);
-					MemoryContextSwitchTo(oldcontext);
-				}
+					ExecComputeStoredGenerated(estate, myslot);
 
 				/*
 				 * If the target is a plain table, check the constraints of
@@ -2944,7 +3231,7 @@ CopyFrom(CopyState cstate)
 				 */
 				if (resultRelInfo->ri_FdwRoutine == NULL &&
 					resultRelInfo->ri_RelationDesc->rd_att->constr)
-					ExecConstraints(resultRelInfo, slot, estate);
+					ExecConstraints(resultRelInfo, myslot, estate);
 
 				/*
 				 * Also check the tuple against the partition constraint, if
@@ -2954,40 +3241,29 @@ CopyFrom(CopyState cstate)
 				 */
 				if (resultRelInfo->ri_PartitionCheck &&
 					(proute == NULL || has_before_insert_row_trig))
-					ExecPartitionCheck(resultRelInfo, slot, estate, true);
+					ExecPartitionCheck(resultRelInfo, myslot, estate, true);
 
-				/*
-				 * Perform multi-inserts when enabled, or when loading a
-				 * partitioned table that can support multi-inserts as
-				 * determined above.
-				 */
+				/* Store the slot in the mutli-insert buffer, when enabled. */
 				if (insertMethod == CIM_MULTI || leafpart_use_multi_insert)
 				{
+					/*
+					 * The slot previously might point into the per-tuple
+					 * context. For batching it needs to be longer lived.
+					 */
+					ExecMaterializeSlot(myslot);
+
 					/* Add this tuple to the tuple buffer */
-					if (nBufferedTuples == 0)
-						firstBufferedLineNo = cstate->cur_lineno;
-					bufferedTuples[nBufferedTuples++] = tuple;
-					bufferedTuplesSize += tuple->t_len;
+					CopyMultiInsertInfo_Store(&multiInsertInfo,
+											  resultRelInfo, myslot,
+											  cstate->line_buf.len,
+											  cstate->cur_lineno);
 
 					/*
-					 * If the buffer filled up, flush it.  Also flush if the
-					 * total size of all the tuples in the buffer becomes
-					 * large, to avoid using large amounts of memory for the
-					 * buffer when the tuples are exceptionally wide.
+					 * If the buffer is now full then flush all buffers out to
+					 * their tables.
 					 */
-					if (nBufferedTuples == MAX_BUFFERED_TUPLES ||
-						bufferedTuplesSize > 65535)
-					{
-						CopyFromInsertBatch(cstate, estate, mycid, ti_options,
-											resultRelInfo, myslot, bistate,
-											nBufferedTuples, bufferedTuples,
-											firstBufferedLineNo);
-						nBufferedTuples = 0;
-						bufferedTuplesSize = 0;
-
-						/* free memory occupied by tuples from the batch */
-						MemoryContextReset(batchcontext);
-					}
+					if (CopyMultiInsertInfo_IsFull(&multiInsertInfo))
+						CopyMultiInsertInfo_Flush(&multiInsertInfo, resultRelInfo);
 				}
 				else
 				{
@@ -2996,12 +3272,12 @@ CopyFrom(CopyState cstate)
 					/* OK, store the tuple */
 					if (resultRelInfo->ri_FdwRoutine != NULL)
 					{
-						slot = resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate,
-																			   resultRelInfo,
-																			   slot,
-																			   NULL);
+						myslot = resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate,
+																				 resultRelInfo,
+																				 myslot,
+																				 NULL);
 
-						if (slot == NULL)	/* "do nothing" */
+						if (myslot == NULL) /* "do nothing" */
 							continue;	/* next tuple please */
 
 						/*
@@ -3009,27 +3285,25 @@ CopyFrom(CopyState cstate)
 						 * column, so (re-)initialize tts_tableOid before
 						 * evaluating them.
 						 */
-						slot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
+						myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
 					}
 					else
 					{
-						tuple = ExecFetchSlotHeapTuple(slot, true, NULL);
-						heap_insert(resultRelInfo->ri_RelationDesc, tuple,
-									mycid, ti_options, bistate);
-						ItemPointerCopy(&tuple->t_self, &slot->tts_tid);
-						slot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
+						/* OK, store the tuple and create index entries for it */
+						table_insert(resultRelInfo->ri_RelationDesc, myslot,
+									 mycid, ti_options, bistate);
 					}
 
 					/* And create index entries for it */
 					if (resultRelInfo->ri_NumIndices > 0)
-						recheckIndexes = ExecInsertIndexTuples(slot,
+						recheckIndexes = ExecInsertIndexTuples(myslot,
 															   estate,
 															   false,
 															   NULL,
 															   NIL);
 
 					/* AFTER ROW INSERT Triggers */
-					ExecARInsertTriggers(estate, resultRelInfo, slot,
+					ExecARInsertTriggers(estate, resultRelInfo, myslot,
 										 recheckIndexes, cstate->transition_capture);
 
 					list_free(recheckIndexes);
@@ -3045,32 +3319,24 @@ CopyFrom(CopyState cstate)
 		}
 	}
 
-	/* Flush any remaining buffered tuples */
-	if (nBufferedTuples > 0)
+	if (insertMethod != CIM_SINGLE)
 	{
-		if (insertMethod == CIM_MULTI_CONDITIONAL)
-		{
-			CopyFromInsertBatch(cstate, estate, mycid, ti_options,
-								prevResultRelInfo, myslot, bistate,
-								nBufferedTuples, bufferedTuples,
-								firstBufferedLineNo);
-		}
-		else
-			CopyFromInsertBatch(cstate, estate, mycid, ti_options,
-								resultRelInfo, myslot, bistate,
-								nBufferedTuples, bufferedTuples,
-								firstBufferedLineNo);
+		/* Flush any remaining buffered tuples */
+		if (!CopyMultiInsertInfo_IsEmpty(&multiInsertInfo))
+			CopyMultiInsertInfo_Flush(&multiInsertInfo, NULL);
+
+		/* Tear down the multi-insert buffer data */
+		CopyMultiInsertInfo_Cleanup(&multiInsertInfo);
 	}
 
 	/* Done, clean up */
 	error_context_stack = errcallback.previous;
 
-	FreeBulkInsertState(bistate);
+	if (bistate != NULL)
+		FreeBulkInsertState(bistate);
 
 	MemoryContextSwitchTo(oldcontext);
 
-	MemoryContextDelete(batchcontext);
-
 	/*
 	 * In the old protocol, tell pqcomm that we can process normal protocol
 	 * messages again.
@@ -3084,9 +3350,6 @@ CopyFrom(CopyState cstate)
 	/* Handle queued AFTER triggers */
 	AfterTriggerEndQuery(estate);
 
-	pfree(values);
-	pfree(nulls);
-
 	ExecResetTupleTable(estate->es_tupleTable, false);
 
 	/* Allow the FDW to shut down */
@@ -3111,88 +3374,6 @@ CopyFrom(CopyState cstate)
 	return processed;
 }
 
-/*
- * A subroutine of CopyFrom, to write the current batch of buffered heap
- * tuples to the heap. Also updates indexes and runs AFTER ROW INSERT
- * triggers.
- */
-static void
-CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid,
-					int ti_options, ResultRelInfo *resultRelInfo,
-					TupleTableSlot *myslot, BulkInsertState bistate,
-					int nBufferedTuples, HeapTuple *bufferedTuples,
-					uint64 firstBufferedLineNo)
-{
-	MemoryContext oldcontext;
-	int			i;
-	uint64		save_cur_lineno;
-	bool		line_buf_valid = cstate->line_buf_valid;
-
-	/*
-	 * Print error context information correctly, if one of the operations
-	 * below fail.
-	 */
-	cstate->line_buf_valid = false;
-	save_cur_lineno = cstate->cur_lineno;
-
-	/*
-	 * heap_multi_insert leaks memory, so switch to short-lived memory context
-	 * before calling it.
-	 */
-	oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
-	heap_multi_insert(resultRelInfo->ri_RelationDesc,
-					  bufferedTuples,
-					  nBufferedTuples,
-					  mycid,
-					  ti_options,
-					  bistate);
-	MemoryContextSwitchTo(oldcontext);
-
-	/*
-	 * If there are any indexes, update them for all the inserted tuples, and
-	 * run AFTER ROW INSERT triggers.
-	 */
-	if (resultRelInfo->ri_NumIndices > 0)
-	{
-		for (i = 0; i < nBufferedTuples; i++)
-		{
-			List	   *recheckIndexes;
-
-			cstate->cur_lineno = firstBufferedLineNo + i;
-			ExecStoreHeapTuple(bufferedTuples[i], myslot, false);
-			recheckIndexes =
-				ExecInsertIndexTuples(myslot,
-									  estate, false, NULL, NIL);
-			ExecARInsertTriggers(estate, resultRelInfo,
-								 myslot,
-								 recheckIndexes, cstate->transition_capture);
-			list_free(recheckIndexes);
-		}
-	}
-
-	/*
-	 * There's no indexes, but see if we need to run AFTER ROW INSERT triggers
-	 * anyway.
-	 */
-	else if (resultRelInfo->ri_TrigDesc != NULL &&
-			 (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
-			  resultRelInfo->ri_TrigDesc->trig_insert_new_table))
-	{
-		for (i = 0; i < nBufferedTuples; i++)
-		{
-			cstate->cur_lineno = firstBufferedLineNo + i;
-			ExecStoreHeapTuple(bufferedTuples[i], myslot, false);
-			ExecARInsertTriggers(estate, resultRelInfo,
-								 myslot,
-								 NIL, cstate->transition_capture);
-		}
-	}
-
-	/* reset cur_lineno and line_buf_valid to what they were */
-	cstate->line_buf_valid = line_buf_valid;
-	cstate->cur_lineno = save_cur_lineno;
-}
-
 /*
  * Setup to read tuples from a file for COPY FROM.
  *
@@ -4990,11 +5171,8 @@ copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
 	DR_copy    *myState = (DR_copy *) self;
 	CopyState	cstate = myState->cstate;
 
-	/* Make sure the tuple is fully deconstructed */
-	slot_getallattrs(slot);
-
-	/* And send the data */
-	CopyOneRowTo(cstate, slot->tts_values, slot->tts_isnull);
+	/* Send the data */
+	CopyOneRowTo(cstate, slot);
 	myState->processed++;
 
 	return true;
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index 03dcc7b820..602a08e585 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -1346,6 +1346,7 @@ InitResultRelInfo(ResultRelInfo *resultRelInfo,
 	resultRelInfo->ri_PartitionCheck = partition_check;
 	resultRelInfo->ri_PartitionRoot = partition_root;
 	resultRelInfo->ri_PartitionInfo = NULL; /* may be set later */
+	resultRelInfo->ri_CopyMultiInsertBuffer = NULL;
 }
 
 /*
diff --git a/src/backend/executor/execPartition.c b/src/backend/executor/execPartition.c
index b72db85aab..50800aa5ca 100644
--- a/src/backend/executor/execPartition.c
+++ b/src/backend/executor/execPartition.c
@@ -947,6 +947,7 @@ ExecInitRoutingInfo(ModifyTableState *mtstate,
 		partRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate, partRelInfo);
 
 	partRelInfo->ri_PartitionInfo = partrouteinfo;
+	partRelInfo->ri_CopyMultiInsertBuffer = NULL;
 
 	/*
 	 * Keep track of it in the PartitionTupleRouting->partitions array.
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 4c077755d5..ed0e2de144 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -36,6 +36,7 @@
 #define HEAP_INSERT_SPECULATIVE 0x0010
 
 typedef struct BulkInsertStateData *BulkInsertState;
+struct TupleTableSlot;
 
 #define MaxLockTupleMode	LockTupleExclusive
 
@@ -143,7 +144,7 @@ extern void ReleaseBulkInsertStatePin(BulkInsertState bistate);
 
 extern void heap_insert(Relation relation, HeapTuple tup, CommandId cid,
 			int options, BulkInsertState bistate);
-extern void heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
+extern void heap_multi_insert(Relation relation, struct TupleTableSlot **slots, int ntuples,
 				  CommandId cid, int options, BulkInsertState bistate);
 extern TM_Result heap_delete(Relation relation, ItemPointer tid,
 			CommandId cid, Snapshot crosscheck, bool wait,
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index 4efe178ed1..c2fdedc551 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -328,6 +328,9 @@ typedef struct TableAmRoutine
 	 * ------------------------------------------------------------------------
 	 */
 
+	void		(*multi_insert) (Relation rel, TupleTableSlot **slots, int nslots,
+								 CommandId cid, int options, struct BulkInsertStateData *bistate);
+
 	/* see table_insert() for reference about parameters */
 	void		(*tuple_insert) (Relation rel, TupleTableSlot *slot,
 								 CommandId cid, int options,
@@ -1157,6 +1160,17 @@ table_update(Relation rel, ItemPointer otid, TupleTableSlot *slot,
 										 lockmode, update_indexes);
 }
 
+/*
+ *	table_multi_insert	- insert multiple tuple into a table
+ */
+static inline void
+table_multi_insert(Relation rel, TupleTableSlot **slots, int nslots,
+				   CommandId cid, int options, struct BulkInsertStateData *bistate)
+{
+	rel->rd_tableam->multi_insert(rel, slots, nslots,
+								  cid, options, bistate);
+}
+
 /*
  * Lock a tuple in the specified mode.
  *
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 5b4ea6c235..7e089e20e2 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -40,7 +40,7 @@ struct ExprState;
 struct ExprContext;
 struct RangeTblEntry;			/* avoid including parsenodes.h here */
 struct ExprEvalStep;			/* avoid including execExpr.h everywhere */
-
+struct CopyMultiInsertBuffer;
 
 /* ----------------
  *		ExprState node
@@ -481,6 +481,9 @@ typedef struct ResultRelInfo
 
 	/* Additional information specific to partition tuple routing */
 	struct PartitionRoutingInfo *ri_PartitionInfo;
+
+	/* For use by copy.c when performing multi-inserts */
+	struct CopyMultiInsertBuffer *ri_CopyMultiInsertBuffer;
 } ResultRelInfo;
 
 /* ----------------
