diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index de5bb9194e..dc7a2b12b5 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -2106,7 +2106,7 @@ heap_prepare_insert(Relation relation, HeapTuple tup, TransactionId xid,
  * temporary context before calling this, if that's a problem.
  */
 void
-heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
+heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples,
 				  CommandId cid, int options, BulkInsertState bistate)
 {
 	TransactionId xid = GetCurrentTransactionId();
@@ -2127,11 +2127,18 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
 	saveFreeSpace = RelationGetTargetPageFreeSpace(relation,
 												   HEAP_DEFAULT_FILLFACTOR);
 
-	/* Toast and set header data in all the tuples */
+	/* Toast and set header data in all the slots */
 	heaptuples = palloc(ntuples * sizeof(HeapTuple));
 	for (i = 0; i < ntuples; i++)
-		heaptuples[i] = heap_prepare_insert(relation, tuples[i],
-											xid, cid, options);
+	{
+		HeapTuple tuple;
+
+		tuple = ExecFetchSlotHeapTuple(slots[i], true, NULL);
+		slots[i]->tts_tableOid = RelationGetRelid(relation);
+		tuple->t_tableOid = slots[i]->tts_tableOid;
+		heaptuples[i] = heap_prepare_insert(relation, tuple, xid, cid,
+											options);
+	}
 
 	/*
 	 * We're about to do the actual inserts -- but check for conflict first,
@@ -2361,13 +2368,9 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
 			CacheInvalidateHeapTuple(relation, heaptuples[i], NULL);
 	}
 
-	/*
-	 * Copy t_self fields back to the caller's original tuples. This does
-	 * nothing for untoasted tuples (tuples[i] == heaptuples[i)], but it's
-	 * probably faster to always copy than check.
-	 */
+	/* copy t_self fields back to the caller's slots */
 	for (i = 0; i < ntuples; i++)
-		tuples[i]->t_self = heaptuples[i]->t_self;
+		slots[i]->tts_tid = heaptuples[i]->t_self;
 
 	pgstat_count_heap_insert(relation, ntuples);
 }
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index cf6e35670b..aa3cb7e7b3 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -2010,6 +2010,7 @@ static const TableAmRoutine heapam_methods = {
 	.tuple_complete_speculative = heapam_tuple_complete_speculative,
 	.tuple_delete = heapam_tuple_delete,
 	.tuple_update = heapam_tuple_update,
+	.multi_insert = heap_multi_insert,
 	.tuple_lock = heapam_tuple_lock,
 
 	.tuple_fetch_row_version = heapam_fetch_row_version,
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 27d3a012af..6bd6e6d68f 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -316,13 +316,12 @@ static CopyState BeginCopyTo(ParseState *pstate, Relation rel, RawStmt *query,
 static void EndCopyTo(CopyState cstate);
 static uint64 DoCopyTo(CopyState cstate);
 static uint64 CopyTo(CopyState cstate);
-static void CopyOneRowTo(CopyState cstate,
-			 Datum *values, bool *nulls);
+static void CopyOneRowTo(CopyState cstate, TupleTableSlot *slot);
 static void CopyFromInsertBatch(CopyState cstate, EState *estate,
 					CommandId mycid, int hi_options,
-					ResultRelInfo *resultRelInfo, TupleTableSlot *myslot,
+					ResultRelInfo *resultRelInfo,
 					BulkInsertState bistate,
-					int nBufferedTuples, HeapTuple *bufferedTuples,
+					int nBufferedTuples, TupleTableSlot **bufferedSlots,
 					uint64 firstBufferedLineNo);
 static bool CopyReadLine(CopyState cstate);
 static bool CopyReadLineText(CopyState cstate);
@@ -2073,33 +2072,27 @@ CopyTo(CopyState cstate)
 
 	if (cstate->rel)
 	{
-		Datum	   *values;
-		bool	   *nulls;
+		TupleTableSlot *slot;
 		TableScanDesc scandesc;
-		HeapTuple	tuple;
-
-		values = (Datum *) palloc(num_phys_attrs * sizeof(Datum));
-		nulls = (bool *) palloc(num_phys_attrs * sizeof(bool));
 
 		scandesc = table_beginscan(cstate->rel, GetActiveSnapshot(), 0, NULL);
+		slot = table_slot_create(cstate->rel, NULL);
 
 		processed = 0;
-		while ((tuple = heap_getnext(scandesc, ForwardScanDirection)) != NULL)
+		while (table_scan_getnextslot(scandesc, ForwardScanDirection, slot))
 		{
 			CHECK_FOR_INTERRUPTS();
 
-			/* Deconstruct the tuple ... faster than repeated heap_getattr */
-			heap_deform_tuple(tuple, tupDesc, values, nulls);
+			/* Deconstruct the tuple ... */
+			slot_getallattrs(slot);
 
 			/* Format and send the data */
-			CopyOneRowTo(cstate, values, nulls);
+			CopyOneRowTo(cstate, slot);
 			processed++;
 		}
 
+		ExecDropSingleTupleTableSlot(slot);
 		table_endscan(scandesc);
-
-		pfree(values);
-		pfree(nulls);
 	}
 	else
 	{
@@ -2125,7 +2118,7 @@ CopyTo(CopyState cstate)
  * Emit one row during CopyTo().
  */
 static void
-CopyOneRowTo(CopyState cstate, Datum *values, bool *nulls)
+CopyOneRowTo(CopyState cstate, TupleTableSlot *slot)
 {
 	bool		need_delim = false;
 	FmgrInfo   *out_functions = cstate->out_functions;
@@ -2142,11 +2135,14 @@ CopyOneRowTo(CopyState cstate, Datum *values, bool *nulls)
 		CopySendInt16(cstate, list_length(cstate->attnumlist));
 	}
 
+	/* Make sure the tuple is fully deconstructed */
+	slot_getallattrs(slot);
+
 	foreach(cur, cstate->attnumlist)
 	{
 		int			attnum = lfirst_int(cur);
-		Datum		value = values[attnum - 1];
-		bool		isnull = nulls[attnum - 1];
+		Datum		value = slot->tts_values[attnum - 1];
+		bool		isnull = slot->tts_isnull[attnum - 1];
 
 		if (!cstate->binary)
 		{
@@ -2305,25 +2301,219 @@ limit_printout_length(const char *str)
 	return res;
 }
 
+#define MAX_BUFFERED_TUPLES		1000
+#define MAX_BUFFERED_BYTES		65535
+#define MAX_PARTITION_BUFFERS	16
+
+typedef struct {
+	Oid		relid;
+	TupleTableSlot **slots;
+	ResultRelInfo *resultRelInfo;
+	int	nused;
+} CopyMultiInsertBuffer;
+
+typedef struct {
+	HTAB *multiInsertBufferTab;
+	CopyMultiInsertBuffer *buffer;
+	int bufferedTuples;
+	int bufferedBytes;
+	int nbuffers;
+} CopyMultiInsertInfo;
+
+
+static void
+CopyMultiInsertInfo_Init(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
+						 bool partitioned)
+{
+	if (partitioned)
+	{
+		HASHCTL		ctl;
+		HTAB	   *htab;
+
+		memset(&ctl, 0, sizeof(ctl));
+		ctl.keysize = sizeof(Oid);
+		ctl.entrysize = sizeof(CopyMultiInsertBuffer);
+		ctl.hcxt = CurrentMemoryContext;
+
+		htab = hash_create("Copy multi-insert buffer table", MAX_PARTITION_BUFFERS,
+			&ctl, HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+
+		miinfo->multiInsertBufferTab = htab;
+		miinfo->buffer = NULL;
+		miinfo->nbuffers = 0;
+	}
+	else
+	{
+		CopyMultiInsertBuffer *buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer));
+
+		buffer->relid = RelationGetRelid(rri->ri_RelationDesc);
+		buffer->slots = palloc0(MAX_BUFFERED_TUPLES * sizeof(TupleTableSlot *));
+		buffer->resultRelInfo = rri;
+		buffer->nused = 0;
+		miinfo->multiInsertBufferTab = NULL;
+		miinfo->buffer = buffer;
+		miinfo->nbuffers = 1;
+	}
+
+	miinfo->bufferedTuples = 0;
+	miinfo->bufferedBytes = 0;
+}
+
+static inline void
+CopyMultiInsertInfo_SetCurrentBuffer(CopyMultiInsertInfo *miinfo,
+									 ResultRelInfo *rri)
+{
+	CopyMultiInsertBuffer *buffer;
+	Oid		relid = RelationGetRelid(rri->ri_RelationDesc);
+	bool	found;
+
+	Assert(miinfo->multiInsertBufferTab != NULL);
+
+	buffer = hash_search(miinfo->multiInsertBufferTab, &relid, HASH_ENTER, &found);
+
+	if (!found)
+	{
+		buffer->relid = relid;
+		buffer->slots = palloc0(MAX_BUFFERED_TUPLES * sizeof(TupleTableSlot *));
+		buffer->resultRelInfo = rri;
+		buffer->nused = 0;
+		miinfo->nbuffers++;
+	}
+
+	miinfo->buffer = buffer;
+}
+
+static inline bool
+CopyMultiInsertInfo_IsFull(CopyMultiInsertInfo *miinfo)
+{
+	if (miinfo->bufferedTuples >= MAX_BUFFERED_TUPLES ||
+		miinfo->bufferedBytes >= MAX_BUFFERED_BYTES ||
+		miinfo->nbuffers >= MAX_PARTITION_BUFFERS)
+		return true;
+	return false;
+}
+
+static inline bool
+CopyMultiInsertInfo_IsEmpty(CopyMultiInsertInfo *miinfo)
+{
+	return miinfo->bufferedTuples == 0;
+}
+
+static inline void
+CopyMultiInsertInfo_FlushSingleBuffer(CopyMultiInsertBuffer *buffer, CopyState cstate, EState *estate,
+									  CommandId mycid, int hi_options,
+									  BulkInsertState bistate, uint64 firstBufferedLineNo)
+{
+	CopyFromInsertBatch(cstate, estate, mycid, hi_options,
+		buffer->resultRelInfo, bistate,
+		buffer->nused,
+		buffer->slots,
+		firstBufferedLineNo);
+	buffer->nused = 0;
+
+	/*
+	 * We'd better make the bulk insert mechanism gets a new
+	 * buffer when the partition being inserted into changes.
+	 */
+	ReleaseBulkInsertStatePin(bistate);
+}
+
+static inline void
+CopyMultiInsertBuffer_RemoveBuffer(CopyMultiInsertInfo *miinfo, CopyMultiInsertBuffer *buffer)
+{
+	Oid		relid = buffer->relid;
+	int		i = 0;
+
+	while (buffer->slots[i] != NULL)
+		ExecClearTuple(buffer->slots[i++]);
+	pfree(buffer->slots);
+
+	hash_search(miinfo->multiInsertBufferTab, (void *) &relid, HASH_REMOVE,
+				NULL);
+	miinfo->nbuffers--;
+}
+
+
+static inline void
+CopyMultiInsertInfo_Flush(CopyMultiInsertInfo *miinfo, CopyState cstate, EState *estate,
+						  CommandId mycid, int hi_options,
+						  BulkInsertState bistate, uint64 firstBufferedLineNo)
+{
+	if (miinfo->multiInsertBufferTab == NULL)
+		CopyMultiInsertInfo_FlushSingleBuffer(miinfo->buffer, cstate, estate,
+											  mycid, hi_options, bistate,
+											  firstBufferedLineNo);
+	else
+	{
+		HASH_SEQ_STATUS status;
+		CopyMultiInsertBuffer *buffer;
+
+		hash_seq_init(&status, miinfo->multiInsertBufferTab);
+
+		while ((buffer = (CopyMultiInsertBuffer *) hash_seq_search(&status)) != NULL)
+		{
+			if (buffer->nused > 0)
+			{
+				/* Flush the buffer if it was used */
+				CopyMultiInsertInfo_FlushSingleBuffer(buffer, cstate, estate,
+													  mycid, hi_options,
+													  bistate,
+													  firstBufferedLineNo);
+			}
+			else
+			{
+				/*
+				 * Otherwise just remove it.  If we saw no tuples for it this
+				 * batch, then likely its best to make way for buffers for
+				 * other partitions.
+				 */
+				CopyMultiInsertBuffer_RemoveBuffer(miinfo, buffer);
+			}
+		}
+	}
+
+	miinfo->bufferedTuples = 0;
+	miinfo->bufferedBytes = 0;
+}
+
+static inline TupleTableSlot *
+CopyMultiInsertInfo_NextFreeSlot(CopyMultiInsertInfo *miinfo,
+								 ResultRelInfo *rri, List **tupleTable)
+{
+	CopyMultiInsertBuffer *buffer = miinfo->buffer;
+
+	if (buffer->slots[buffer->nused] == NULL)
+		buffer->slots[buffer->nused] = table_slot_create(rri->ri_RelationDesc, tupleTable);
+	return buffer->slots[buffer->nused];
+}
+
+static inline void
+CopyMultiInsertInfo_Store(CopyMultiInsertInfo *miinfo, TupleTableSlot *slot,
+						  int tuplen)
+{
+	CopyMultiInsertBuffer *buffer = miinfo->buffer;
+
+	Assert(slot == buffer->slots[buffer->nused]);
+
+	buffer->nused++;
+	miinfo->bufferedTuples++;
+	miinfo->bufferedBytes += tuplen;
+}
+
 /*
  * Copy FROM file to relation.
  */
 uint64
 CopyFrom(CopyState cstate)
 {
-	HeapTuple	tuple;
-	TupleDesc	tupDesc;
-	Datum	   *values;
-	bool	   *nulls;
 	ResultRelInfo *resultRelInfo;
 	ResultRelInfo *target_resultRelInfo;
 	ResultRelInfo *prevResultRelInfo = NULL;
 	EState	   *estate = CreateExecutorState(); /* for ExecConstraints() */
 	ModifyTableState *mtstate;
 	ExprContext *econtext;
-	TupleTableSlot *myslot;
+	TupleTableSlot *singleslot = NULL;
 	MemoryContext oldcontext = CurrentMemoryContext;
-	MemoryContext batchcontext;
 
 	PartitionTupleRouting *proute = NULL;
 	ErrorContextCallback errcallback;
@@ -2331,23 +2521,16 @@ CopyFrom(CopyState cstate)
 	int			hi_options = 0; /* start with default heap_insert options */
 	BulkInsertState bistate;
 	CopyInsertMethod insertMethod;
+	CopyMultiInsertInfo multiInsertInfo;
 	uint64		processed = 0;
-	int			nBufferedTuples = 0;
 	bool		has_before_insert_row_trig;
 	bool		has_instead_insert_row_trig;
 	bool		leafpart_use_multi_insert = false;
-
-#define MAX_BUFFERED_TUPLES 1000
-#define RECHECK_MULTI_INSERT_THRESHOLD 1000
-	HeapTuple  *bufferedTuples = NULL;	/* initialize to silence warning */
-	Size		bufferedTuplesSize = 0;
 	uint64		firstBufferedLineNo = 0;
-	uint64		lastPartitionSampleLineNo = 0;
-	uint64		nPartitionChanges = 0;
-	double		avgTuplesPerPartChange = 0;
-
 	Assert(cstate->rel);
 
+	memset(&multiInsertInfo, 0, sizeof(CopyMultiInsertInfo));
+
 	/*
 	 * The target must be a plain, foreign, or partitioned relation, or have
 	 * an INSTEAD OF INSERT row trigger.  (Currently, such triggers are only
@@ -2382,8 +2565,6 @@ CopyFrom(CopyState cstate)
 							RelationGetRelationName(cstate->rel))));
 	}
 
-	tupDesc = RelationGetDescr(cstate->rel);
-
 	/*----------
 	 * Check to see if we can avoid writing WAL
 	 *
@@ -2518,10 +2699,6 @@ CopyFrom(CopyState cstate)
 
 	ExecInitRangeTable(estate, cstate->range_table);
 
-	/* Set up a tuple slot too */
-	myslot = ExecInitExtraTupleSlot(estate, tupDesc,
-									&TTSOpsHeapTuple);
-
 	/*
 	 * Set up a ModifyTableState so we can let FDW(s) init themselves for
 	 * foreign-table result relation(s).
@@ -2565,10 +2742,11 @@ CopyFrom(CopyState cstate)
 												&mtstate->ps);
 
 	/*
-	 * It's more efficient to prepare a bunch of tuples for insertion, and
-	 * insert them in one heap_multi_insert() call, than call heap_insert()
-	 * separately for every tuple. However, there are a number of reasons why
-	 * we might not be able to do this.  These are explained below.
+	 * It's generally more efficient to prepare a bunch of tuples for
+	 * insertion, and insert them in one table_multi_insert() call, than call
+	 * table_insert() separately for every tuple. However, there are a number
+	 * of reasons why we might not be able to do this.  These are explained
+	 * below.
 	 */
 	if (resultRelInfo->ri_TrigDesc != NULL &&
 		(resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
@@ -2622,8 +2800,7 @@ CopyFrom(CopyState cstate)
 	{
 		/*
 		 * For partitioned tables, we may still be able to perform bulk
-		 * inserts for sets of consecutive tuples which belong to the same
-		 * partition.  However, the possibility of this depends on which types
+		 * inserts.  However, the possibility of this depends on which types
 		 * of triggers exist on the partition.  We must disable bulk inserts
 		 * if the partition is a foreign table or it has any before row insert
 		 * or insert instead triggers (same as we checked above for the parent
@@ -2632,18 +2809,26 @@ CopyFrom(CopyState cstate)
 		 * have the intermediate insert method of CIM_MULTI_CONDITIONAL to
 		 * flag that we must later determine if we can use bulk-inserts for
 		 * the partition being inserted into.
-		 *
-		 * Normally, when performing bulk inserts we just flush the insert
-		 * buffer whenever it becomes full, but for the partitioned table
-		 * case, we flush it whenever the current tuple does not belong to the
-		 * same partition as the previous tuple.
 		 */
 		if (proute)
 			insertMethod = CIM_MULTI_CONDITIONAL;
 		else
 			insertMethod = CIM_MULTI;
 
-		bufferedTuples = palloc(MAX_BUFFERED_TUPLES * sizeof(HeapTuple));
+		CopyMultiInsertInfo_Init(&multiInsertInfo, resultRelInfo,
+								 proute != NULL);
+	}
+
+	/*
+	 * If not using batch mode (which allocates slots as needed) set up a
+	 * tuple slot too. When inserting into a partitioned table, we also need
+	 * one, even if we might batch insert, to read the tuple in the root
+	 * partition's form.
+	 */
+	if (insertMethod == CIM_SINGLE || insertMethod == CIM_MULTI_CONDITIONAL)
+	{
+		singleslot = table_slot_create(resultRelInfo->ri_RelationDesc,
+									   &estate->es_tupleTable);
 	}
 
 	has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
@@ -2660,9 +2845,6 @@ CopyFrom(CopyState cstate)
 	 */
 	ExecBSInsertTriggers(estate, resultRelInfo);
 
-	values = (Datum *) palloc(tupDesc->natts * sizeof(Datum));
-	nulls = (bool *) palloc(tupDesc->natts * sizeof(bool));
-
 	bistate = GetBulkInsertState();
 	econtext = GetPerTupleExprContext(estate);
 
@@ -2672,17 +2854,9 @@ CopyFrom(CopyState cstate)
 	errcallback.previous = error_context_stack;
 	error_context_stack = &errcallback;
 
-	/*
-	 * Set up memory context for batches. For cases without batching we could
-	 * use the per-tuple context, but it's simpler to just use it every time.
-	 */
-	batchcontext = AllocSetContextCreate(CurrentMemoryContext,
-										 "batch context",
-										 ALLOCSET_DEFAULT_SIZES);
-
 	for (;;)
 	{
-		TupleTableSlot *slot;
+		TupleTableSlot *myslot;
 		bool		skip_tuple;
 
 		CHECK_FOR_INTERRUPTS();
@@ -2693,20 +2867,34 @@ CopyFrom(CopyState cstate)
 		 */
 		ResetPerTupleExprContext(estate);
 
+		if (insertMethod == CIM_SINGLE || proute)
+		{
+			myslot = singleslot;
+			Assert(myslot != NULL);
+		}
+		else
+		{
+			Assert(resultRelInfo == target_resultRelInfo);
+			Assert(insertMethod == CIM_MULTI);
+
+			myslot = CopyMultiInsertInfo_NextFreeSlot(&multiInsertInfo,
+													  resultRelInfo,
+													  &estate->es_tupleTable);
+		}
+
 		/*
 		 * Switch to per-tuple context before calling NextCopyFrom, which does
 		 * evaluate default expressions etc. and requires per-tuple context.
 		 */
 		MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
 
-		if (!NextCopyFrom(cstate, econtext, values, nulls))
-			break;
+		ExecClearTuple(myslot);
 
-		/* Switch into per-batch memory context before forming the tuple. */
-		MemoryContextSwitchTo(batchcontext);
+		/* Directly store the values/nulls array in the slot */
+		if (!NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull))
+			break;
 
-		/* And now we can form the input tuple. */
-		tuple = heap_form_tuple(tupDesc, values, nulls);
+		ExecStoreVirtualTuple(myslot);
 
 		/*
 		 * Constraints might reference the tableoid column, so (re-)initialize
@@ -2717,18 +2905,15 @@ CopyFrom(CopyState cstate)
 		/* Triggers and stuff need to be invoked in query context. */
 		MemoryContextSwitchTo(oldcontext);
 
-		/* Place tuple in tuple slot --- but slot shouldn't free it */
-		slot = myslot;
-		ExecStoreHeapTuple(tuple, slot, false);
-
 		if (cstate->whereClause)
 		{
 			econtext->ecxt_scantuple = myslot;
+			/* Skip items that don't match the COPY's WHERE clause */
 			if (!ExecQual(cstate->qualexpr, econtext))
 				continue;
 		}
 
-		/* Determine the partition to heap_insert the tuple into */
+		/* Determine the partition to table_insert the tuple into */
 		if (proute)
 		{
 			TupleConversionMap *map;
@@ -2739,80 +2924,10 @@ CopyFrom(CopyState cstate)
 			 * if the found partition is not suitable for INSERTs.
 			 */
 			resultRelInfo = ExecFindPartition(mtstate, target_resultRelInfo,
-											  proute, slot, estate);
+											  proute, myslot, estate);
 
 			if (prevResultRelInfo != resultRelInfo)
 			{
-				/* Check if we can multi-insert into this partition */
-				if (insertMethod == CIM_MULTI_CONDITIONAL)
-				{
-					/*
-					 * When performing bulk-inserts into partitioned tables we
-					 * must insert the tuples seen so far to the heap whenever
-					 * the partition changes.
-					 */
-					if (nBufferedTuples > 0)
-					{
-						MemoryContext	oldcontext;
-
-						CopyFromInsertBatch(cstate, estate, mycid, hi_options,
-											prevResultRelInfo, myslot, bistate,
-											nBufferedTuples, bufferedTuples,
-											firstBufferedLineNo);
-						nBufferedTuples = 0;
-						bufferedTuplesSize = 0;
-
-						/*
-						 * The tuple is already allocated in the batch context, which
-						 * we want to reset.  So to keep the tuple we copy it into the
-						 * short-lived (per-tuple) context, reset the batch context
-						 * and then copy it back into the per-batch one.
-						 */
-						oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
-						tuple = heap_copytuple(tuple);
-						MemoryContextSwitchTo(oldcontext);
-
-						/* cleanup the old batch */
-						MemoryContextReset(batchcontext);
-
-						/* copy the tuple back to the per-batch context */
-						oldcontext = MemoryContextSwitchTo(batchcontext);
-						tuple = heap_copytuple(tuple);
-						MemoryContextSwitchTo(oldcontext);
-
-						/*
-						 * Also push the tuple copy to the slot (resetting the context
-						 * invalidated the slot contents).
-						 */
-						ExecStoreHeapTuple(tuple, slot, false);
-					}
-
-					nPartitionChanges++;
-
-					/*
-					 * Here we adaptively enable multi-inserts based on the
-					 * average number of tuples from recent multi-insert
-					 * batches.  We recalculate the average every
-					 * RECHECK_MULTI_INSERT_THRESHOLD tuples instead of taking
-					 * the average over the whole copy.  This allows us to
-					 * enable multi-inserts when we get periods in the copy
-					 * stream that have tuples commonly belonging to the same
-					 * partition, and disable when the partition is changing
-					 * too often.
-					 */
-					if (unlikely(lastPartitionSampleLineNo <= (cstate->cur_lineno -
-															   RECHECK_MULTI_INSERT_THRESHOLD)
-								 && cstate->cur_lineno >= RECHECK_MULTI_INSERT_THRESHOLD))
-					{
-						avgTuplesPerPartChange =
-							(cstate->cur_lineno - lastPartitionSampleLineNo) /
-							(double) nPartitionChanges;
-
-						lastPartitionSampleLineNo = cstate->cur_lineno;
-						nPartitionChanges = 0;
-					}
-				}
-
 				/* Determine which triggers exist on this partition */
 				has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
 											  resultRelInfo->ri_TrigDesc->trig_insert_before_row);
@@ -2821,23 +2936,19 @@ CopyFrom(CopyState cstate)
 											   resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
 
 				/*
-				 * Tests have shown that using multi-inserts when the
-				 * partition changes on every tuple slightly decreases the
-				 * performance, however, there are benefits even when only
-				 * some batches have just 2 tuples, so let's enable
-				 * multi-inserts even when the average is quite low.
+				 * Enable multi-inserts when the partition has BEFORE/INSTEAD
+				 * OF triggers, or if the partition is a foreign partition.
 				 */
 				leafpart_use_multi_insert = insertMethod == CIM_MULTI_CONDITIONAL &&
-					avgTuplesPerPartChange >= 1.3 &&
-					!has_before_insert_row_trig &&
-					!has_instead_insert_row_trig &&
-					resultRelInfo->ri_FdwRoutine == NULL;
+											!has_before_insert_row_trig &&
+											!has_instead_insert_row_trig &&
+											resultRelInfo->ri_FdwRoutine == NULL;
+
+				/* Set the multi-insert buffer to use for this partition. */
+				if (leafpart_use_multi_insert)
+					CopyMultiInsertInfo_SetCurrentBuffer(&multiInsertInfo,
+														 resultRelInfo);
 
-				/*
-				 * We'd better make the bulk insert mechanism gets a new
-				 * buffer when the partition being inserted into changes.
-				 */
-				ReleaseBulkInsertStatePin(bistate);
 				prevResultRelInfo = resultRelInfo;
 			}
 
@@ -2879,26 +2990,49 @@ CopyFrom(CopyState cstate)
 			 * rowtype.
 			 */
 			map = resultRelInfo->ri_PartitionInfo->pi_RootToPartitionMap;
-			if (map != NULL)
+			if (insertMethod == CIM_SINGLE || !leafpart_use_multi_insert)
 			{
-				TupleTableSlot *new_slot;
-				MemoryContext oldcontext;
-
-				new_slot = resultRelInfo->ri_PartitionInfo->pi_PartitionTupleSlot;
-				Assert(new_slot != NULL);
-
-				slot = execute_attr_map_slot(map->attrMap, slot, new_slot);
+				/* non batch insert */
+				if (map != NULL)
+				{
+					TupleTableSlot *new_slot;
 
+					new_slot = resultRelInfo->ri_PartitionInfo->pi_PartitionTupleSlot;
+					myslot = execute_attr_map_slot(map->attrMap, myslot, new_slot);
+				}
+			}
+			else
+			{
 				/*
-				 * Get the tuple in the per-batch context, so that it will be
-				 * freed after each batch insert.
+				 * Batch insert into partitioned table.
 				 */
-				oldcontext = MemoryContextSwitchTo(batchcontext);
-				tuple = ExecCopySlotHeapTuple(slot);
-				MemoryContextSwitchTo(oldcontext);
+				TupleTableSlot *nextslot;
+
+				/* no other path available for partitioned table */
+				Assert(insertMethod == CIM_MULTI_CONDITIONAL);
+
+				nextslot = CopyMultiInsertInfo_NextFreeSlot(&multiInsertInfo,
+															resultRelInfo,
+															&estate->es_tupleTable);
+
+				if (map != NULL)
+					myslot = execute_attr_map_slot(map->attrMap, myslot, nextslot);
+				else
+				{
+					/*
+					 * This looks more expensive than it is (Believe me, I
+					 * optimized it away. Twice). The input is in virtual
+					 * form, and we'll materialize the slot below - for most
+					 * slot types the copy performs the work materialization
+					 * would later require anyway.
+					 */
+					ExecCopySlot(nextslot, myslot);
+					myslot = nextslot;
+				}
 			}
 
-			slot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
+			/* ensure that triggers etc see the right relation  */
+			myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
 		}
 
 		skip_tuple = false;
@@ -2906,7 +3040,7 @@ CopyFrom(CopyState cstate)
 		/* BEFORE ROW INSERT Triggers */
 		if (has_before_insert_row_trig)
 		{
-			if (!ExecBRInsertTriggers(estate, resultRelInfo, slot))
+			if (!ExecBRInsertTriggers(estate, resultRelInfo, myslot))
 				skip_tuple = true;	/* "do nothing" */
 		}
 
@@ -2919,7 +3053,7 @@ CopyFrom(CopyState cstate)
 			 */
 			if (has_instead_insert_row_trig)
 			{
-				ExecIRInsertTriggers(estate, resultRelInfo, slot);
+				ExecIRInsertTriggers(estate, resultRelInfo, myslot);
 			}
 			else
 			{
@@ -2931,12 +3065,7 @@ CopyFrom(CopyState cstate)
 				 */
 				if (resultRelInfo->ri_RelationDesc->rd_att->constr &&
 					resultRelInfo->ri_RelationDesc->rd_att->constr->has_generated_stored)
-				{
-					ExecComputeStoredGenerated(estate, slot);
-					MemoryContextSwitchTo(batchcontext);
-					tuple = ExecCopySlotHeapTuple(slot);
-					MemoryContextSwitchTo(oldcontext);
-				}
+					ExecComputeStoredGenerated(estate, myslot);
 
 				/*
 				 * If the target is a plain table, check the constraints of
@@ -2944,7 +3073,7 @@ CopyFrom(CopyState cstate)
 				 */
 				if (resultRelInfo->ri_FdwRoutine == NULL &&
 					resultRelInfo->ri_RelationDesc->rd_att->constr)
-					ExecConstraints(resultRelInfo, slot, estate);
+					ExecConstraints(resultRelInfo, myslot, estate);
 
 				/*
 				 * Also check the tuple against the partition constraint, if
@@ -2954,7 +3083,7 @@ CopyFrom(CopyState cstate)
 				 */
 				if (resultRelInfo->ri_PartitionCheck &&
 					(proute == NULL || has_before_insert_row_trig))
-					ExecPartitionCheck(resultRelInfo, slot, estate, true);
+					ExecPartitionCheck(resultRelInfo, myslot, estate, true);
 
 				/*
 				 * Perform multi-inserts when enabled, or when loading a
@@ -2964,30 +3093,21 @@ CopyFrom(CopyState cstate)
 				if (insertMethod == CIM_MULTI || leafpart_use_multi_insert)
 				{
 					/* Add this tuple to the tuple buffer */
-					if (nBufferedTuples == 0)
+					if (CopyMultiInsertInfo_IsEmpty(&multiInsertInfo))
 						firstBufferedLineNo = cstate->cur_lineno;
-					bufferedTuples[nBufferedTuples++] = tuple;
-					bufferedTuplesSize += tuple->t_len;
 
 					/*
-					 * If the buffer filled up, flush it.  Also flush if the
-					 * total size of all the tuples in the buffer becomes
-					 * large, to avoid using large amounts of memory for the
-					 * buffer when the tuples are exceptionally wide.
+					 * The slot previously might point into the per-tuple
+					 * context. For batching it needs to be longer lived.
 					 */
-					if (nBufferedTuples == MAX_BUFFERED_TUPLES ||
-						bufferedTuplesSize > 65535)
-					{
-						CopyFromInsertBatch(cstate, estate, mycid, hi_options,
-											resultRelInfo, myslot, bistate,
-											nBufferedTuples, bufferedTuples,
-											firstBufferedLineNo);
-						nBufferedTuples = 0;
-						bufferedTuplesSize = 0;
-
-						/* free memory occupied by tuples from the batch */
-						MemoryContextReset(batchcontext);
-					}
+					ExecMaterializeSlot(myslot);
+
+					CopyMultiInsertInfo_Store(&multiInsertInfo, myslot, cstate->line_buf.len);
+
+					/* If the buffer filled up, flush it. */
+					if (CopyMultiInsertInfo_IsFull(&multiInsertInfo))
+						CopyMultiInsertInfo_Flush(&multiInsertInfo, cstate, estate, mycid, hi_options,
+												  bistate, firstBufferedLineNo);
 				}
 				else
 				{
@@ -2996,12 +3116,12 @@ CopyFrom(CopyState cstate)
 					/* OK, store the tuple */
 					if (resultRelInfo->ri_FdwRoutine != NULL)
 					{
-						slot = resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate,
-																			   resultRelInfo,
-																			   slot,
-																			   NULL);
+						myslot = resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate,
+																				 resultRelInfo,
+																				 myslot,
+																				 NULL);
 
-						if (slot == NULL)	/* "do nothing" */
+						if (myslot == NULL)	/* "do nothing" */
 							continue;	/* next tuple please */
 
 						/*
@@ -3009,27 +3129,26 @@ CopyFrom(CopyState cstate)
 						 * column, so (re-)initialize tts_tableOid before
 						 * evaluating them.
 						 */
-						slot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
+						myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
 					}
 					else
 					{
-						tuple = ExecFetchSlotHeapTuple(slot, true, NULL);
-						heap_insert(resultRelInfo->ri_RelationDesc, tuple,
-									mycid, hi_options, bistate);
-						ItemPointerCopy(&tuple->t_self, &slot->tts_tid);
-						slot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
+						/* OK, store the tuple and create index entries for it */
+						table_insert(resultRelInfo->ri_RelationDesc, myslot,
+									 mycid, hi_options, bistate);
 					}
 
+
 					/* And create index entries for it */
 					if (resultRelInfo->ri_NumIndices > 0)
-						recheckIndexes = ExecInsertIndexTuples(slot,
+						recheckIndexes = ExecInsertIndexTuples(myslot,
 															   estate,
 															   false,
 															   NULL,
 															   NIL);
 
 					/* AFTER ROW INSERT Triggers */
-					ExecARInsertTriggers(estate, resultRelInfo, slot,
+					ExecARInsertTriggers(estate, resultRelInfo, myslot,
 										 recheckIndexes, cstate->transition_capture);
 
 					list_free(recheckIndexes);
@@ -3046,31 +3165,15 @@ CopyFrom(CopyState cstate)
 	}
 
 	/* Flush any remaining buffered tuples */
-	if (nBufferedTuples > 0)
-	{
-		if (insertMethod == CIM_MULTI_CONDITIONAL)
-		{
-			CopyFromInsertBatch(cstate, estate, mycid, hi_options,
-								prevResultRelInfo, myslot, bistate,
-								nBufferedTuples, bufferedTuples,
-								firstBufferedLineNo);
-		}
-		else
-			CopyFromInsertBatch(cstate, estate, mycid, hi_options,
-								resultRelInfo, myslot, bistate,
-								nBufferedTuples, bufferedTuples,
-								firstBufferedLineNo);
-	}
+	if (insertMethod != CIM_SINGLE && !CopyMultiInsertInfo_IsEmpty(&multiInsertInfo))
+		CopyMultiInsertInfo_Flush(&multiInsertInfo, cstate, estate, mycid, hi_options,
+			bistate, firstBufferedLineNo);
 
 	/* Done, clean up */
 	error_context_stack = errcallback.previous;
 
-	FreeBulkInsertState(bistate);
-
 	MemoryContextSwitchTo(oldcontext);
 
-	MemoryContextDelete(batchcontext);
-
 	/*
 	 * In the old protocol, tell pqcomm that we can process normal protocol
 	 * messages again.
@@ -3084,9 +3187,6 @@ CopyFrom(CopyState cstate)
 	/* Handle queued AFTER triggers */
 	AfterTriggerEndQuery(estate);
 
-	pfree(values);
-	pfree(nulls);
-
 	ExecResetTupleTable(estate->es_tupleTable, false);
 
 	/* Allow the FDW to shut down */
@@ -3124,8 +3224,7 @@ CopyFrom(CopyState cstate)
 static void
 CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid,
 					int hi_options, ResultRelInfo *resultRelInfo,
-					TupleTableSlot *myslot, BulkInsertState bistate,
-					int nBufferedTuples, HeapTuple *bufferedTuples,
+					BulkInsertState bistate, int nBufferedTuples, TupleTableSlot **bufferedSlots,
 					uint64 firstBufferedLineNo)
 {
 	MemoryContext oldcontext;
@@ -3145,12 +3244,12 @@ CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid,
 	 * before calling it.
 	 */
 	oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
-	heap_multi_insert(resultRelInfo->ri_RelationDesc,
-					  bufferedTuples,
-					  nBufferedTuples,
-					  mycid,
-					  hi_options,
-					  bistate);
+	table_multi_insert(resultRelInfo->ri_RelationDesc,
+					   bufferedSlots,
+					   nBufferedTuples,
+					   mycid,
+					   hi_options,
+					   bistate);
 	MemoryContextSwitchTo(oldcontext);
 
 	/*
@@ -3164,12 +3263,11 @@ CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid,
 			List	   *recheckIndexes;
 
 			cstate->cur_lineno = firstBufferedLineNo + i;
-			ExecStoreHeapTuple(bufferedTuples[i], myslot, false);
 			recheckIndexes =
-				ExecInsertIndexTuples(myslot,
-									  estate, false, NULL, NIL);
+				ExecInsertIndexTuples(bufferedSlots[i], estate, false, NULL,
+									  NIL);
 			ExecARInsertTriggers(estate, resultRelInfo,
-								 myslot,
+								 bufferedSlots[i],
 								 recheckIndexes, cstate->transition_capture);
 			list_free(recheckIndexes);
 		}
@@ -3186,13 +3284,15 @@ CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid,
 		for (i = 0; i < nBufferedTuples; i++)
 		{
 			cstate->cur_lineno = firstBufferedLineNo + i;
-			ExecStoreHeapTuple(bufferedTuples[i], myslot, false);
 			ExecARInsertTriggers(estate, resultRelInfo,
-								 myslot,
+								 bufferedSlots[i],
 								 NIL, cstate->transition_capture);
 		}
 	}
 
+	for (i = 0; i < nBufferedTuples; i++)
+		ExecClearTuple(bufferedSlots[i]);
+
 	/* reset cur_lineno and line_buf_valid to what they were */
 	cstate->line_buf_valid = line_buf_valid;
 	cstate->cur_lineno = save_cur_lineno;
@@ -4995,11 +5095,8 @@ copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
 	DR_copy    *myState = (DR_copy *) self;
 	CopyState	cstate = myState->cstate;
 
-	/* Make sure the tuple is fully deconstructed */
-	slot_getallattrs(slot);
-
-	/* And send the data */
-	CopyOneRowTo(cstate, slot->tts_values, slot->tts_isnull);
+	/* Send the data */
+	CopyOneRowTo(cstate, slot);
 	myState->processed++;
 
 	return true;
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 4c077755d5..ed0e2de144 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -36,6 +36,7 @@
 #define HEAP_INSERT_SPECULATIVE 0x0010
 
 typedef struct BulkInsertStateData *BulkInsertState;
+struct TupleTableSlot;
 
 #define MaxLockTupleMode	LockTupleExclusive
 
@@ -143,7 +144,7 @@ extern void ReleaseBulkInsertStatePin(BulkInsertState bistate);
 
 extern void heap_insert(Relation relation, HeapTuple tup, CommandId cid,
 			int options, BulkInsertState bistate);
-extern void heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
+extern void heap_multi_insert(Relation relation, struct TupleTableSlot **slots, int ntuples,
 				  CommandId cid, int options, BulkInsertState bistate);
 extern TM_Result heap_delete(Relation relation, ItemPointer tid,
 			CommandId cid, Snapshot crosscheck, bool wait,
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index 873ad15313..3e06d78eea 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -322,6 +322,9 @@ typedef struct TableAmRoutine
 	 * ------------------------------------------------------------------------
 	 */
 
+	void		(*multi_insert) (Relation rel, TupleTableSlot **slots, int nslots,
+								 CommandId cid, int options, struct BulkInsertStateData *bistate);
+
 	/* see table_insert() for reference about parameters */
 	void		(*tuple_insert) (Relation rel, TupleTableSlot *slot, CommandId cid,
 								 int options, struct BulkInsertStateData *bistate);
@@ -1021,6 +1024,17 @@ table_update(Relation rel, ItemPointer otid, TupleTableSlot *slot,
 										 lockmode, update_indexes);
 }
 
+/*
+ *	table_multi_insert	- insert multiple tuple into a table
+ */
+static inline void
+table_multi_insert(Relation rel, TupleTableSlot **slots, int nslots,
+				   CommandId cid, int options, struct BulkInsertStateData *bistate)
+{
+	rel->rd_tableam->multi_insert(rel, slots, nslots,
+								  cid, options, bistate);
+}
+
 /*
  * Lock a tuple in the specified mode.
  *
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index dbd7ed0363..6f901dc8a9 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -479,6 +479,12 @@ typedef struct ResultRelInfo
 	/* relation descriptor for root partitioned table */
 	Relation	ri_PartitionRoot;
 
+	/*
+	 * When batch inserting into a table, it might be necessary to have one
+	 * slot per input row, up to the largest possible batch size.
+	 */
+	TupleTableSlot **ri_batchInsertSlots;
+
 	/* Additional information specific to partition tuple routing */
 	struct PartitionRoutingInfo *ri_PartitionInfo;
 } ResultRelInfo;
