From 29ff4fbd97386eb0f2ab8a9bc80420c74fc4fd82 Mon Sep 17 00:00:00 2001
From: Takayuki Tsunakawa <tsunakawa.takay@fujitsu.com>
Date: Tue, 10 Nov 2020 09:27:56 +0900
Subject: [PATCH v1] Add bulk insert for foreign tables

---
 contrib/postgres_fdw/deparse.c         |   3 +-
 contrib/postgres_fdw/postgres_fdw.c    | 233 ++++++++++++++++++++++++++-------
 contrib/postgres_fdw/postgres_fdw.h    |   2 +-
 doc/src/sgml/fdwhandler.sgml           |  64 ++++++++-
 src/backend/executor/nodeModifyTable.c | 135 +++++++++++++++++++
 src/include/foreign/fdwapi.h           |   7 +
 src/include/nodes/execnodes.h          |   6 +
 7 files changed, 395 insertions(+), 55 deletions(-)

diff --git a/contrib/postgres_fdw/deparse.c b/contrib/postgres_fdw/deparse.c
index 2d44df1..5aa81db 100644
--- a/contrib/postgres_fdw/deparse.c
+++ b/contrib/postgres_fdw/deparse.c
@@ -1706,7 +1706,7 @@ deparseInsertSql(StringInfo buf, RangeTblEntry *rte,
 				 Index rtindex, Relation rel,
 				 List *targetAttrs, bool doNothing,
 				 List *withCheckOptionList, List *returningList,
-				 List **retrieved_attrs)
+				 List **retrieved_attrs, int *values_end_len)
 {
 	AttrNumber	pindex;
 	bool		first;
@@ -1749,6 +1749,7 @@ deparseInsertSql(StringInfo buf, RangeTblEntry *rte,
 	}
 	else
 		appendStringInfoString(buf, " DEFAULT VALUES");
+	*values_end_len = buf->len;
 
 	if (doNothing)
 		appendStringInfoString(buf, " ON CONFLICT DO NOTHING");
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 9c5aaac..f7be4be 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -86,8 +86,10 @@ enum FdwScanPrivateIndex
  * 1) INSERT/UPDATE/DELETE statement text to be sent to the remote server
  * 2) Integer list of target attribute numbers for INSERT/UPDATE
  *	  (NIL for a DELETE)
- * 3) Boolean flag showing if the remote query has a RETURNING clause
- * 4) Integer list of attribute numbers retrieved by RETURNING, if any
+ * 3) Length till the end of VALUES clause for INSERT
+ *	  (-1 for a DELETE/UPDATE)
+ * 4) Boolean flag showing if the remote query has a RETURNING clause
+ * 5) Integer list of attribute numbers retrieved by RETURNING, if any
  */
 enum FdwModifyPrivateIndex
 {
@@ -95,6 +97,8 @@ enum FdwModifyPrivateIndex
 	FdwModifyPrivateUpdateSql,
 	/* Integer list of target attribute numbers for INSERT/UPDATE */
 	FdwModifyPrivateTargetAttnums,
+	/* Length till the end of VALUES clause (as an integer Value node) */
+	FdwModifyPrivateLen,
 	/* has-returning flag (as an integer Value node) */
 	FdwModifyPrivateHasReturning,
 	/* Integer list of attribute numbers retrieved by RETURNING */
@@ -175,7 +179,9 @@ typedef struct PgFdwModifyState
 
 	/* extracted fdw_private data */
 	char	   *query;			/* text of INSERT/UPDATE/DELETE command */
+	char	   *orig_query;		/* original text of INSERT command */
 	List	   *target_attrs;	/* list of target attribute numbers */
+	int			len;			/* length of some part of query */
 	bool		has_returning;	/* is there a RETURNING clause? */
 	List	   *retrieved_attrs;	/* attr numbers retrieved by RETURNING */
 
@@ -184,6 +190,9 @@ typedef struct PgFdwModifyState
 	int			p_nums;			/* number of parameters to transmit */
 	FmgrInfo   *p_flinfo;		/* output conversion functions for them */
 
+	/* bulk operation stuff */
+	int			num_slots;		/* number of slots to insert */
+
 	/* working memory context */
 	MemoryContext temp_cxt;		/* context for per-tuple temporary data */
 
@@ -342,6 +351,11 @@ static TupleTableSlot *postgresExecForeignInsert(EState *estate,
 												 ResultRelInfo *resultRelInfo,
 												 TupleTableSlot *slot,
 												 TupleTableSlot *planSlot);
+static TupleTableSlot **postgresExecForeignBulkInsert(EState *estate,
+												 ResultRelInfo *resultRelInfo,
+												 TupleTableSlot **slots,
+												 TupleTableSlot **planSlots,
+												 int *numSlots);
 static TupleTableSlot *postgresExecForeignUpdate(EState *estate,
 												 ResultRelInfo *resultRelInfo,
 												 TupleTableSlot *slot,
@@ -428,20 +442,23 @@ static PgFdwModifyState *create_foreign_modify(EState *estate,
 											   Plan *subplan,
 											   char *query,
 											   List *target_attrs,
+											   int len,
 											   bool has_returning,
 											   List *retrieved_attrs);
-static TupleTableSlot *execute_foreign_modify(EState *estate,
+static TupleTableSlot **execute_foreign_modify(EState *estate,
 											  ResultRelInfo *resultRelInfo,
 											  CmdType operation,
-											  TupleTableSlot *slot,
-											  TupleTableSlot *planSlot);
+											  TupleTableSlot **slots,
+											  TupleTableSlot **planSlots,
+											  int *numSlots);
 static void prepare_foreign_modify(PgFdwModifyState *fmstate);
 static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
 											 ItemPointer tupleid,
-											 TupleTableSlot *slot);
+											 TupleTableSlot **slots,
+											 int numSlots);
 static void store_returning_result(PgFdwModifyState *fmstate,
 								   TupleTableSlot *slot, PGresult *res);
-static void finish_foreign_modify(PgFdwModifyState *fmstate);
+static void finish_foreign_modify(PgFdwModifyState *fmstate, bool release_conn);
 static List *build_remote_returning(Index rtindex, Relation rel,
 									List *returningList);
 static void rebuild_fdw_scan_tlist(ForeignScan *fscan, List *tlist);
@@ -529,6 +546,7 @@ postgres_fdw_handler(PG_FUNCTION_ARGS)
 	routine->PlanForeignModify = postgresPlanForeignModify;
 	routine->BeginForeignModify = postgresBeginForeignModify;
 	routine->ExecForeignInsert = postgresExecForeignInsert;
+	routine->ExecForeignBulkInsert = postgresExecForeignBulkInsert;
 	routine->ExecForeignUpdate = postgresExecForeignUpdate;
 	routine->ExecForeignDelete = postgresExecForeignDelete;
 	routine->EndForeignModify = postgresEndForeignModify;
@@ -1663,7 +1681,9 @@ postgresPlanForeignModify(PlannerInfo *root,
 	List	   *withCheckOptionList = NIL;
 	List	   *returningList = NIL;
 	List	   *retrieved_attrs = NIL;
+	List	   *retvalList;
 	bool		doNothing = false;
+	int			values_end_len = -1;
 
 	initStringInfo(&sql);
 
@@ -1751,7 +1771,7 @@ postgresPlanForeignModify(PlannerInfo *root,
 			deparseInsertSql(&sql, rte, resultRelation, rel,
 							 targetAttrs, doNothing,
 							 withCheckOptionList, returningList,
-							 &retrieved_attrs);
+							 &retrieved_attrs, &values_end_len);
 			break;
 		case CMD_UPDATE:
 			deparseUpdateSql(&sql, rte, resultRelation, rel,
@@ -1775,10 +1795,12 @@ postgresPlanForeignModify(PlannerInfo *root,
 	 * Build the fdw_private list that will be available to the executor.
 	 * Items in the list must match enum FdwModifyPrivateIndex, above.
 	 */
-	return list_make4(makeString(sql.data),
+	retvalList = list_make4(makeString(sql.data),
 					  targetAttrs,
-					  makeInteger((retrieved_attrs != NIL)),
-					  retrieved_attrs);
+					  makeInteger(values_end_len),
+					  makeInteger((retrieved_attrs != NIL)));
+	retvalList = lappend(retvalList, retrieved_attrs);
+	return retvalList;
 }
 
 /*
@@ -1796,6 +1818,7 @@ postgresBeginForeignModify(ModifyTableState *mtstate,
 	char	   *query;
 	List	   *target_attrs;
 	bool		has_returning;
+	int			values_end_len;
 	List	   *retrieved_attrs;
 	RangeTblEntry *rte;
 
@@ -1811,6 +1834,8 @@ postgresBeginForeignModify(ModifyTableState *mtstate,
 							FdwModifyPrivateUpdateSql));
 	target_attrs = (List *) list_nth(fdw_private,
 									 FdwModifyPrivateTargetAttnums);
+	values_end_len = intVal(list_nth(fdw_private,
+									FdwModifyPrivateLen));
 	has_returning = intVal(list_nth(fdw_private,
 									FdwModifyPrivateHasReturning));
 	retrieved_attrs = (List *) list_nth(fdw_private,
@@ -1828,6 +1853,7 @@ postgresBeginForeignModify(ModifyTableState *mtstate,
 									mtstate->mt_plans[subplan_index]->plan,
 									query,
 									target_attrs,
+									values_end_len,
 									has_returning,
 									retrieved_attrs);
 
@@ -1845,7 +1871,37 @@ postgresExecForeignInsert(EState *estate,
 						  TupleTableSlot *planSlot)
 {
 	PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
-	TupleTableSlot *rslot;
+	TupleTableSlot **rslot;
+	int 			numSlots = 1;
+
+	/*
+	 * If the fmstate has aux_fmstate set, use the aux_fmstate (see
+	 * postgresBeginForeignInsert())
+	 */
+	if (fmstate->aux_fmstate)
+		resultRelInfo->ri_FdwState = fmstate->aux_fmstate;
+	rslot = execute_foreign_modify(estate, resultRelInfo, CMD_INSERT,
+								   &slot, &planSlot, &numSlots);
+	/* Revert that change */
+	if (fmstate->aux_fmstate)
+		resultRelInfo->ri_FdwState = fmstate;
+
+	return rslot ? *rslot : NULL;
+}
+
+/*
+ * postgresExecForeignBulkInsert
+ *		Insert multiple rows into a foreign table
+ */
+static TupleTableSlot **
+postgresExecForeignBulkInsert(EState *estate,
+						  ResultRelInfo *resultRelInfo,
+						  TupleTableSlot **slots,
+						  TupleTableSlot **planSlots,
+						  int *numSlots)
+{
+	PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
+	TupleTableSlot **rslot;
 
 	/*
 	 * If the fmstate has aux_fmstate set, use the aux_fmstate (see
@@ -1854,7 +1910,7 @@ postgresExecForeignInsert(EState *estate,
 	if (fmstate->aux_fmstate)
 		resultRelInfo->ri_FdwState = fmstate->aux_fmstate;
 	rslot = execute_foreign_modify(estate, resultRelInfo, CMD_INSERT,
-								   slot, planSlot);
+								   slots, planSlots, numSlots);
 	/* Revert that change */
 	if (fmstate->aux_fmstate)
 		resultRelInfo->ri_FdwState = fmstate;
@@ -1872,8 +1928,13 @@ postgresExecForeignUpdate(EState *estate,
 						  TupleTableSlot *slot,
 						  TupleTableSlot *planSlot)
 {
-	return execute_foreign_modify(estate, resultRelInfo, CMD_UPDATE,
-								  slot, planSlot);
+	TupleTableSlot **rslot;
+	int 			numSlots = 1;
+
+	rslot = execute_foreign_modify(estate, resultRelInfo, CMD_UPDATE,
+								  &slot, &planSlot, &numSlots);
+
+	return rslot ? *rslot : NULL;
 }
 
 /*
@@ -1886,8 +1947,13 @@ postgresExecForeignDelete(EState *estate,
 						  TupleTableSlot *slot,
 						  TupleTableSlot *planSlot)
 {
-	return execute_foreign_modify(estate, resultRelInfo, CMD_DELETE,
-								  slot, planSlot);
+	TupleTableSlot **rslot;
+	int 			numSlots = 1;
+
+	rslot = execute_foreign_modify(estate, resultRelInfo, CMD_DELETE,
+								  &slot, &planSlot, &numSlots);
+
+	return rslot ? *rslot : NULL;
 }
 
 /*
@@ -1905,7 +1971,7 @@ postgresEndForeignModify(EState *estate,
 		return;
 
 	/* Destroy the execution state */
-	finish_foreign_modify(fmstate);
+	finish_foreign_modify(fmstate, true);
 }
 
 /*
@@ -1924,6 +1990,7 @@ postgresBeginForeignInsert(ModifyTableState *mtstate,
 	RangeTblEntry *rte;
 	TupleDesc	tupdesc = RelationGetDescr(rel);
 	int			attnum;
+	int			values_end_len;
 	StringInfoData sql;
 	List	   *targetAttrs = NIL;
 	List	   *retrieved_attrs = NIL;
@@ -2000,7 +2067,7 @@ postgresBeginForeignInsert(ModifyTableState *mtstate,
 	deparseInsertSql(&sql, rte, resultRelation, rel, targetAttrs, doNothing,
 					 resultRelInfo->ri_WithCheckOptions,
 					 resultRelInfo->ri_returningList,
-					 &retrieved_attrs);
+					 &retrieved_attrs, &values_end_len);
 
 	/* Construct an execution state. */
 	fmstate = create_foreign_modify(mtstate->ps.state,
@@ -2010,6 +2077,7 @@ postgresBeginForeignInsert(ModifyTableState *mtstate,
 									NULL,
 									sql.data,
 									targetAttrs,
+									values_end_len,
 									retrieved_attrs != NIL,
 									retrieved_attrs);
 
@@ -2048,7 +2116,7 @@ postgresEndForeignInsert(EState *estate,
 		fmstate = fmstate->aux_fmstate;
 
 	/* Destroy the execution state */
-	finish_foreign_modify(fmstate);
+	finish_foreign_modify(fmstate, true);
 }
 
 /*
@@ -3538,6 +3606,7 @@ create_foreign_modify(EState *estate,
 					  Plan *subplan,
 					  char *query,
 					  List *target_attrs,
+					  int len,
 					  bool has_returning,
 					  List *retrieved_attrs)
 {
@@ -3572,7 +3641,10 @@ create_foreign_modify(EState *estate,
 
 	/* Set up remote query information. */
 	fmstate->query = query;
+	if (operation == CMD_INSERT)
+		fmstate->orig_query = pstrdup(fmstate->query);
 	fmstate->target_attrs = target_attrs;
+	fmstate->len = len;
 	fmstate->has_returning = has_returning;
 	fmstate->retrieved_attrs = retrieved_attrs;
 
@@ -3624,6 +3696,8 @@ create_foreign_modify(EState *estate,
 
 	Assert(fmstate->p_nums <= n_params);
 
+	fmstate->num_slots = 1;
+
 	/* Initialize auxiliary state */
 	fmstate->aux_fmstate = NULL;
 
@@ -3634,26 +3708,75 @@ create_foreign_modify(EState *estate,
  * execute_foreign_modify
  *		Perform foreign-table modification as required, and fetch RETURNING
  *		result if any.  (This is the shared guts of postgresExecForeignInsert,
- *		postgresExecForeignUpdate, and postgresExecForeignDelete.)
+ *		postgresExecForeignBulkInsert, postgresExecForeignUpdate, and
+ *		postgresExecForeignDelete.)
  */
-static TupleTableSlot *
+static TupleTableSlot **
 execute_foreign_modify(EState *estate,
 					   ResultRelInfo *resultRelInfo,
 					   CmdType operation,
-					   TupleTableSlot *slot,
-					   TupleTableSlot *planSlot)
+					   TupleTableSlot **slots,
+					   TupleTableSlot **planSlots,
+					   int *numSlots)
 {
 	PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
 	ItemPointer ctid = NULL;
 	const char **p_values;
 	PGresult   *res;
 	int			n_rows;
+	int			i, j;
+	int			pindex;
+	bool		first;
+	StringInfoData sql;
 
 	/* The operation should be INSERT, UPDATE, or DELETE */
 	Assert(operation == CMD_INSERT ||
 		   operation == CMD_UPDATE ||
 		   operation == CMD_DELETE);
 
+	if (operation == CMD_INSERT && fmstate->num_slots != *numSlots)
+	{
+		/* Destroy the prepared statement created previously */
+		if (fmstate->p_name)
+			finish_foreign_modify(fmstate, false);
+
+		/*
+		 * Recreate INSERT command string with numSlots records in its
+		 * VALUES clause
+		 */
+
+		/* Copy up to the end of the first record from the original query */
+		initStringInfo(&sql);
+		appendBinaryStringInfo(&sql, fmstate->orig_query, fmstate->len);
+
+		/* Add records to VALUES clause */
+		pindex = fmstate->p_nums + 1;
+		for (i = 0; i < *numSlots - 1; i++)
+		{
+			appendStringInfoString(&sql, ", (");
+
+			first = true;
+			for (j = 0; j < fmstate->p_nums; j++)
+			{
+				if (!first)
+					appendStringInfoString(&sql, ", ");
+				first = false;
+
+				appendStringInfo(&sql, "$%d", pindex);
+				pindex++;
+			}
+
+			appendStringInfoChar(&sql, ')');
+		}
+
+		/* Copy stuff after VALUES clause from the original query */
+		appendStringInfoString(&sql, fmstate->orig_query + fmstate->len);
+
+		pfree(fmstate->query);
+		fmstate->query = sql.data;
+		fmstate->num_slots = *numSlots;
+	}
+
 	/* Set up the prepared statement on the remote server, if we didn't yet */
 	if (!fmstate->p_name)
 		prepare_foreign_modify(fmstate);
@@ -3666,7 +3789,7 @@ execute_foreign_modify(EState *estate,
 		Datum		datum;
 		bool		isNull;
 
-		datum = ExecGetJunkAttribute(planSlot,
+		datum = ExecGetJunkAttribute(planSlots[0],
 									 fmstate->ctidAttno,
 									 &isNull);
 		/* shouldn't ever get a null result... */
@@ -3676,14 +3799,14 @@ execute_foreign_modify(EState *estate,
 	}
 
 	/* Convert parameters needed by prepared statement to text form */
-	p_values = convert_prep_stmt_params(fmstate, ctid, slot);
+	p_values = convert_prep_stmt_params(fmstate, ctid, slots, *numSlots);
 
 	/*
 	 * Execute the prepared statement.
 	 */
 	if (!PQsendQueryPrepared(fmstate->conn,
 							 fmstate->p_name,
-							 fmstate->p_nums,
+							 fmstate->p_nums * (*numSlots),
 							 p_values,
 							 NULL,
 							 NULL,
@@ -3704,9 +3827,10 @@ execute_foreign_modify(EState *estate,
 	/* Check number of rows affected, and fetch RETURNING tuple if any */
 	if (fmstate->has_returning)
 	{
+		Assert(*numSlots == 1);
 		n_rows = PQntuples(res);
 		if (n_rows > 0)
-			store_returning_result(fmstate, slot, res);
+			store_returning_result(fmstate, slots[0], res);
 	}
 	else
 		n_rows = atoi(PQcmdTuples(res));
@@ -3716,10 +3840,12 @@ execute_foreign_modify(EState *estate,
 
 	MemoryContextReset(fmstate->temp_cxt);
 
+	*numSlots = n_rows;
+
 	/*
 	 * Return NULL if nothing was inserted/updated/deleted on the remote end
 	 */
-	return (n_rows > 0) ? slot : NULL;
+	return (n_rows > 0) ? slots : NULL;
 }
 
 /*
@@ -3779,19 +3905,23 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
 static const char **
 convert_prep_stmt_params(PgFdwModifyState *fmstate,
 						 ItemPointer tupleid,
-						 TupleTableSlot *slot)
+						 TupleTableSlot **slots,
+						 int numSlots)
 {
 	const char **p_values;
+	int			i;
+	int			j;
 	int			pindex = 0;
 	MemoryContext oldcontext;
 
 	oldcontext = MemoryContextSwitchTo(fmstate->temp_cxt);
 
-	p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums);
+	p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums * numSlots);
 
 	/* 1st parameter should be ctid, if it's in use */
 	if (tupleid != NULL)
 	{
+		Assert(numSlots == 1);
 		/* don't need set_transmission_modes for TID output */
 		p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
 											  PointerGetDatum(tupleid));
@@ -3799,32 +3929,37 @@ convert_prep_stmt_params(PgFdwModifyState *fmstate,
 	}
 
 	/* get following parameters from slot */
-	if (slot != NULL && fmstate->target_attrs != NIL)
+	if (slots != NULL && fmstate->target_attrs != NIL)
 	{
 		int			nestlevel;
 		ListCell   *lc;
 
 		nestlevel = set_transmission_modes();
 
-		foreach(lc, fmstate->target_attrs)
+		for (i = 0; i < numSlots; i++)
 		{
-			int			attnum = lfirst_int(lc);
-			Datum		value;
-			bool		isnull;
+			j = (tupleid != NULL) ? 1 : 0;
+			foreach(lc, fmstate->target_attrs)
+			{
+				int			attnum = lfirst_int(lc);
+				Datum		value;
+				bool		isnull;
 
-			value = slot_getattr(slot, attnum, &isnull);
-			if (isnull)
-				p_values[pindex] = NULL;
-			else
-				p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
-													  value);
-			pindex++;
+				value = slot_getattr(slots[i], attnum, &isnull);
+				if (isnull)
+					p_values[pindex] = NULL;
+				else
+					p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[j],
+														  value);
+				pindex++;
+				j++;
+			}
 		}
 
 		reset_transmission_modes(nestlevel);
 	}
 
-	Assert(pindex == fmstate->p_nums);
+	Assert(pindex == fmstate->p_nums * numSlots);
 
 	MemoryContextSwitchTo(oldcontext);
 
@@ -3873,7 +4008,8 @@ store_returning_result(PgFdwModifyState *fmstate,
  *		Release resources for a foreign insert/update/delete operation
  */
 static void
-finish_foreign_modify(PgFdwModifyState *fmstate)
+finish_foreign_modify(PgFdwModifyState *fmstate,
+	bool release_conn)
 {
 	Assert(fmstate != NULL);
 
@@ -3897,8 +4033,11 @@ finish_foreign_modify(PgFdwModifyState *fmstate)
 	}
 
 	/* Release remote connection */
-	ReleaseConnection(fmstate->conn);
-	fmstate->conn = NULL;
+	if (release_conn)
+	{
+		ReleaseConnection(fmstate->conn);
+		fmstate->conn = NULL;
+	}
 }
 
 /*
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index eef410d..459a9ca 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -161,7 +161,7 @@ extern void deparseInsertSql(StringInfo buf, RangeTblEntry *rte,
 							 Index rtindex, Relation rel,
 							 List *targetAttrs, bool doNothing,
 							 List *withCheckOptionList, List *returningList,
-							 List **retrieved_attrs);
+							 List **retrieved_attrs, int *values_end_len);
 extern void deparseUpdateSql(StringInfo buf, RangeTblEntry *rte,
 							 Index rtindex, Relation rel,
 							 List *targetAttrs,
diff --git a/doc/src/sgml/fdwhandler.sgml b/doc/src/sgml/fdwhandler.sgml
index 9c92934..cdf2959 100644
--- a/doc/src/sgml/fdwhandler.sgml
+++ b/doc/src/sgml/fdwhandler.sgml
@@ -523,8 +523,9 @@ BeginForeignModify(ModifyTableState *mtstate,
      Begin executing a foreign table modification operation.  This routine is
      called during executor startup.  It should perform any initialization
      needed prior to the actual table modifications.  Subsequently,
-     <function>ExecForeignInsert</function>, <function>ExecForeignUpdate</function> or
-     <function>ExecForeignDelete</function> will be called for each tuple to be
+     <function>ExecForeignInsert/ExecForeignBulkInsert</function>,
+     <function>ExecForeignUpdate</function> or
+     <function>ExecForeignDelete</function> will be called for tuple(s) to be
      inserted, updated, or deleted.
     </para>
 
@@ -614,6 +615,56 @@ ExecForeignInsert(EState *estate,
 
     <para>
 <programlisting>
+TupleTableSlot **
+ExecForeignBulkInsert(EState *estate,
+                  ResultRelInfo *rinfo,
+                  TupleTableSlot **slots,
+                  TupleTableSlot *planSlots,
+                  int *numSlots);
+</programlisting>
+
+     Insert multiple tuples in bulk into the foreign table.
+     The parameters are the same for <function>ExecForeignInsert</function>
+     except <literal>slots</literal> and <literal>planSlots</literal> contain
+     multiple tuples and <literal>*numSlots></literal> specifies the number of
+     tuples in those arrays.
+    </para>
+
+    <para>
+     The return value is an array of slots containing the data that was
+     actually inserted (this might differ from the data supplied, for
+     example as a result of trigger actions.)
+     The passed-in <literal>slots</literal> can be re-used for this purpose.
+     The number of successfully inserted tuples is returned in
+     <literal>*numSlots</literal>.
+    </para>
+
+    <para>
+     The data in the returned slot is used only if the <command>INSERT</command>
+     statement involves a view
+     <literal>WITH CHECK OPTION</literal>; or if the foreign table has
+     an <literal>AFTER ROW</literal> trigger.  Triggers require all columns,
+     but the FDW could choose to optimize away returning some or all columns
+     depending on the contents of the
+     <literal>WITH CHECK OPTION</literal> constraints.
+    </para>
+
+    <para>
+     If the <function>ExecForeignBulkInsert</function> pointer is set to
+     <literal>NULL</literal>, attempts to insert into the foreign table will
+     use <function>ExecForeignInsert</function>.
+     This function is not used if the <command>INSERT</command> has the
+     <literal>RETURNING></literal> clause.
+    </para>
+
+    <para>
+     Note that this function is also called when inserting routed tuples into
+     a foreign-table partition.  See the callback functions
+     described below that allow the FDW to support that.
+    </para>
+
+    <para>
+<programlisting>
 TupleTableSlot *
 ExecForeignUpdate(EState *estate,
                   ResultRelInfo *rinfo,
@@ -741,8 +792,9 @@ BeginForeignInsert(ModifyTableState *mtstate,
      in both cases when it is the partition chosen for tuple routing and the
      target specified in a <command>COPY FROM</command> command.  It should
      perform any initialization needed prior to the actual insertion.
-     Subsequently, <function>ExecForeignInsert</function> will be called for
-     each tuple to be inserted into the foreign table.
+     Subsequently, <function>ExecForeignInsert</function> or
+     <function>ExecForeignBulkInsert</function> will be called for
+     tuple(s) to be inserted into the foreign table.
     </para>
 
     <para>
@@ -773,8 +825,8 @@ BeginForeignInsert(ModifyTableState *mtstate,
     <para>
      Note that if the FDW does not support routable foreign-table partitions
      and/or executing <command>COPY FROM</command> on foreign tables, this
-     function or <function>ExecForeignInsert</function> subsequently called
-     must throw error as needed.
+     function or <function>ExecForeignInsert/ExecForeignBulkInsert</function>
+     subsequently called must throw error as needed.
     </para>
 
     <para>
diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c
index 29e07b7..6c4b33e 100644
--- a/src/backend/executor/nodeModifyTable.c
+++ b/src/backend/executor/nodeModifyTable.c
@@ -58,6 +58,15 @@
 #include "utils/rel.h"
 
 
+#define BULK_INSERT_ROWS 100
+
+static void ExecBulkInsert(ModifyTableState *mtstate,
+								 ResultRelInfo *resultRelInfo,
+								 TupleTableSlot **slots,
+								 TupleTableSlot **planSlots,
+								 int numSlots,
+								 EState *estate,
+								 bool canSetTag);
 static bool ExecOnConflictUpdate(ModifyTableState *mtstate,
 								 ResultRelInfo *resultRelInfo,
 								 ItemPointer conflictTid,
@@ -389,6 +398,7 @@ ExecInsert(ModifyTableState *mtstate,
 	ModifyTable *node = (ModifyTable *) mtstate->ps.plan;
 	OnConflictAction onconflict = node->onConflictAction;
 	PartitionTupleRouting *proute = mtstate->mt_partition_tuple_routing;
+	MemoryContext oldContext;
 
 	/*
 	 * If the input result relation is a partitioned table, find the leaf
@@ -442,6 +452,55 @@ ExecInsert(ModifyTableState *mtstate,
 									   CMD_INSERT);
 
 		/*
+		 * If the FDW supports bulk insert, accumulate tuples and insert them
+		 * in bulk
+		 */
+		if (resultRelInfo->ri_FdwRoutine->ExecForeignBulkInsert &&
+			resultRelInfo->ri_projectReturning == NULL)
+		{
+			/*
+			 * If a certain number of tuples have already been accumulated,
+			 * or	a tuple has come for a different relation than that for
+			 * the accumulated tuples, perform the bulk insert
+			 */
+			if (mtstate->mt_nslots == BULK_INSERT_ROWS ||
+				(mtstate->mt_nslots > 0 &&
+				 mtstate->bulk_rri != resultRelInfo))
+			{
+				ExecBulkInsert(mtstate, resultRelInfo,
+							   mtstate->mt_slots, mtstate->mt_planslots,
+							   mtstate->mt_nslots,
+							   estate, canSetTag);
+				mtstate->mt_nslots = 0;
+			}
+
+			oldContext = MemoryContextSwitchTo(estate->es_query_cxt);
+
+			if (mtstate->mt_slots == NULL)
+			{
+				mtstate->mt_slots = palloc(sizeof(TupleTableSlot *) *
+										   BULK_INSERT_ROWS);
+				mtstate->mt_planslots = palloc(sizeof(TupleTableSlot *) *
+										   BULK_INSERT_ROWS);
+			}
+
+			mtstate->mt_slots[mtstate->mt_nslots] =
+				MakeSingleTupleTableSlot(slot->tts_tupleDescriptor,
+										 slot->tts_ops);
+			ExecCopySlot(mtstate->mt_slots[mtstate->mt_nslots], slot);
+			mtstate->mt_planslots[mtstate->mt_nslots] =
+				MakeSingleTupleTableSlot(planSlot->tts_tupleDescriptor,
+										 planSlot->tts_ops);
+			ExecCopySlot(mtstate->mt_planslots[mtstate->mt_nslots], planSlot);
+
+			mtstate->mt_nslots++;
+			mtstate->bulk_rri = resultRelInfo;
+
+			MemoryContextSwitchTo(oldContext);
+			return NULL;
+		}
+
+		/*
 		 * insert into foreign table: let the FDW do it
 		 */
 		slot = resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate,
@@ -702,6 +761,73 @@ ExecInsert(ModifyTableState *mtstate,
 }
 
 /* ----------------------------------------------------------------
+ *		ExecBulkInsert
+ *
+ *		Insert multiple tuples in an efficient way.
+ *		Currently, this handles inserting into a foreign table without
+ *		RETURNING clause.
+ * ----------------------------------------------------------------
+ */
+static void
+ExecBulkInsert(ModifyTableState *mtstate,
+		   ResultRelInfo *resultRelInfo,
+		   TupleTableSlot **slots,
+		   TupleTableSlot **planSlots,
+		   int numSlots,
+		   EState *estate,
+		   bool canSetTag)
+{
+	int			i;
+	int			numInserted = numSlots;
+	TupleTableSlot *slot = NULL;
+	TupleTableSlot **rslots;
+
+	/*
+	 * insert into foreign table: let the FDW do it
+	 */
+	rslots = resultRelInfo->ri_FdwRoutine->ExecForeignBulkInsert(estate,
+																 resultRelInfo,
+																 slots,
+																 planSlots,
+																 &numInserted);
+
+	for (i = 0; i < numInserted; i++)
+	{
+		slot = rslots[i];
+
+		/*
+		 * AFTER ROW Triggers or RETURNING expressions might reference the
+		 * tableoid column, so (re-)initialize tts_tableOid before evaluating
+		 * them.
+		 */
+		slot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
+
+		/* AFTER ROW INSERT Triggers */
+		ExecARInsertTriggers(estate, resultRelInfo, slot, NIL,
+							 mtstate->mt_transition_capture);
+
+		/*
+		 * Check any WITH CHECK OPTION constraints from parent views.  See the
+		 * comment in ExecInsert.
+		 */
+		if (resultRelInfo->ri_WithCheckOptions != NIL)
+			ExecWithCheckOptions(WCO_VIEW_CHECK, resultRelInfo, slot, estate);
+	}
+
+	if (canSetTag && numInserted > 0)
+	{
+		estate->es_processed += numInserted;
+		setLastTid(&slot->tts_tid);
+	}
+
+	for (i = 0; i < numSlots; i++)
+	{
+		ExecDropSingleTupleTableSlot(slots[i]);
+		ExecDropSingleTupleTableSlot(planSlots[i]);
+	}
+}
+
+/* ----------------------------------------------------------------
  *		ExecDelete
  *
  *		DELETE is like UPDATE, except that we delete the tuple and no
@@ -2156,6 +2282,15 @@ ExecModifyTable(PlanState *pstate)
 	}
 
 	/*
+	 * Insert remaining tuples for bulk insert.
+	 */
+	if (node->mt_nslots > 0)
+		ExecBulkInsert(node, node->bulk_rri,
+					   node->mt_slots, node->mt_planslots,
+					   node->mt_nslots,
+					   estate, node->canSetTag);
+
+	/*
 	 * We're done, but fire AFTER STATEMENT triggers before exiting.
 	 */
 	fireASTriggers(node);
diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h
index 95556df..c7eeff2 100644
--- a/src/include/foreign/fdwapi.h
+++ b/src/include/foreign/fdwapi.h
@@ -85,6 +85,12 @@ typedef TupleTableSlot *(*ExecForeignInsert_function) (EState *estate,
 													   TupleTableSlot *slot,
 													   TupleTableSlot *planSlot);
 
+typedef TupleTableSlot **(*ExecForeignBulkInsert_function) (EState *estate,
+													   ResultRelInfo *rinfo,
+													   TupleTableSlot **slots,
+													   TupleTableSlot **planSlots,
+													   int *numSlots);
+
 typedef TupleTableSlot *(*ExecForeignUpdate_function) (EState *estate,
 													   ResultRelInfo *rinfo,
 													   TupleTableSlot *slot,
@@ -209,6 +215,7 @@ typedef struct FdwRoutine
 	PlanForeignModify_function PlanForeignModify;
 	BeginForeignModify_function BeginForeignModify;
 	ExecForeignInsert_function ExecForeignInsert;
+	ExecForeignBulkInsert_function ExecForeignBulkInsert;
 	ExecForeignUpdate_function ExecForeignUpdate;
 	ExecForeignDelete_function ExecForeignDelete;
 	EndForeignModify_function EndForeignModify;
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 6c0a7d6..e16d7a1 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -1191,6 +1191,12 @@ typedef struct ModifyTableState
 
 	/* controls transition table population for INSERT...ON CONFLICT UPDATE */
 	struct TransitionCaptureState *mt_oc_transition_capture;
+
+	/* bulk insert stuff */
+	int			mt_nslots;		/* number of slots in the array */
+	TupleTableSlot **mt_slots;	/* input tuples for bulk insert */
+	TupleTableSlot **mt_planslots;
+	ResultRelInfo *bulk_rri;	/* target relation for bulk insert */
 } ModifyTableState;
 
 /* ----------------
-- 
2.10.1

