Hi,

Attached is a v6 of this patch, rebased to current master and with some minor improvements (mostly comments and renaming the "end" struct field to "values_end" which I think is more descriptive).

The one thing that keeps bugging me is convert_prep_stmt_params - it dies the right thing, but the code is somewhat confusing.


AFAICS the discussions about making this use COPY and/or libpq pipelining (neither of which is committed yet) ended with the conclusion that those changes are somewhat independent, and that it's worth getting this committed in the current form. Barring objections, I'll push this within the next couple days.


regards

--
Tomas Vondra
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
>From 1e4a99c6d4a5221dadc9e7a9922bdd9e3ebe1310 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas.von...@postgresql.org>
Date: Tue, 12 Jan 2021 01:36:01 +0100
Subject: [PATCH] Add bulk insert for foreign tables

---
 contrib/postgres_fdw/deparse.c                |  43 ++-
 .../postgres_fdw/expected/postgres_fdw.out    | 116 ++++++-
 contrib/postgres_fdw/option.c                 |  14 +
 contrib/postgres_fdw/postgres_fdw.c           | 291 ++++++++++++++----
 contrib/postgres_fdw/postgres_fdw.h           |   5 +-
 contrib/postgres_fdw/sql/postgres_fdw.sql     |  91 ++++++
 doc/src/sgml/fdwhandler.sgml                  |  89 +++++-
 doc/src/sgml/postgres-fdw.sgml                |  13 +
 src/backend/executor/execPartition.c          |  11 +
 src/backend/executor/nodeModifyTable.c        | 161 ++++++++++
 src/backend/nodes/list.c                      |  15 +
 src/include/executor/execPartition.h          |   1 +
 src/include/foreign/fdwapi.h                  |  10 +
 src/include/nodes/execnodes.h                 |   6 +
 src/include/nodes/pg_list.h                   |  15 +
 15 files changed, 815 insertions(+), 66 deletions(-)

diff --git a/contrib/postgres_fdw/deparse.c b/contrib/postgres_fdw/deparse.c
index 3cf7b4eb1e..2d38ab25cb 100644
--- a/contrib/postgres_fdw/deparse.c
+++ b/contrib/postgres_fdw/deparse.c
@@ -1711,7 +1711,7 @@ deparseInsertSql(StringInfo buf, RangeTblEntry *rte,
 				 Index rtindex, Relation rel,
 				 List *targetAttrs, bool doNothing,
 				 List *withCheckOptionList, List *returningList,
-				 List **retrieved_attrs)
+				 List **retrieved_attrs, int *values_end_len)
 {
 	AttrNumber	pindex;
 	bool		first;
@@ -1754,6 +1754,7 @@ deparseInsertSql(StringInfo buf, RangeTblEntry *rte,
 	}
 	else
 		appendStringInfoString(buf, " DEFAULT VALUES");
+	*values_end_len = buf->len;
 
 	if (doNothing)
 		appendStringInfoString(buf, " ON CONFLICT DO NOTHING");
@@ -1763,6 +1764,46 @@ deparseInsertSql(StringInfo buf, RangeTblEntry *rte,
 						 withCheckOptionList, returningList, retrieved_attrs);
 }
 
+/*
+ * rebuild remote INSERT statement
+ *
+ */
+void
+rebuildInsertSql(StringInfo buf, char *orig_query,
+				 int values_end_len, int num_cols,
+				 int num_rows)
+{
+	int			i, j;
+	int			pindex;
+	bool		first;
+
+	/* Copy up to the end of the first record from the original query */
+	appendBinaryStringInfo(buf, orig_query, values_end_len);
+
+	/* Add records to VALUES clause */
+	pindex = num_cols + 1;
+	for (i = 0; i < num_rows; i++)
+	{
+		appendStringInfoString(buf, ", (");
+
+		first = true;
+		for (j = 0; j < num_cols; j++)
+		{
+			if (!first)
+				appendStringInfoString(buf, ", ");
+			first = false;
+
+			appendStringInfo(buf, "$%d", pindex);
+			pindex++;
+		}
+
+		appendStringInfoChar(buf, ')');
+	}
+
+	/* Copy stuff after VALUES clause from the original query */
+	appendStringInfoString(buf, orig_query + values_end_len);
+}
+
 /*
  * deparse remote UPDATE statement
  *
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index c11092f8cc..96bad17ded 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -8911,7 +8911,7 @@ DO $d$
     END;
 $d$;
 ERROR:  invalid option "password"
-HINT:  Valid options in this context are: service, passfile, channel_binding, connect_timeout, dbname, host, hostaddr, port, options, application_name, keepalives, keepalives_idle, keepalives_interval, keepalives_count, tcp_user_timeout, sslmode, sslcompression, sslcert, sslkey, sslrootcert, sslcrl, requirepeer, ssl_min_protocol_version, ssl_max_protocol_version, gssencmode, krbsrvname, gsslib, target_session_attrs, use_remote_estimate, fdw_startup_cost, fdw_tuple_cost, extensions, updatable, fetch_size
+HINT:  Valid options in this context are: service, passfile, channel_binding, connect_timeout, dbname, host, hostaddr, port, options, application_name, keepalives, keepalives_idle, keepalives_interval, keepalives_count, tcp_user_timeout, sslmode, sslcompression, sslcert, sslkey, sslrootcert, sslcrl, requirepeer, ssl_min_protocol_version, ssl_max_protocol_version, gssencmode, krbsrvname, gsslib, target_session_attrs, use_remote_estimate, fdw_startup_cost, fdw_tuple_cost, extensions, updatable, fetch_size, batch_size
 CONTEXT:  SQL statement "ALTER SERVER loopback_nopw OPTIONS (ADD password 'dummypw')"
 PL/pgSQL function inline_code_block line 3 at EXECUTE
 -- If we add a password for our user mapping instead, we should get a different
@@ -9053,3 +9053,117 @@ SELECT 1 FROM ft1 LIMIT 1;
 ALTER SERVER loopback OPTIONS (ADD use_remote_estimate 'off');
 -- The invalid connection gets closed in pgfdw_xact_callback during commit.
 COMMIT;
+-- ===================================================================
+-- batch insert
+-- ===================================================================
+BEGIN;
+CREATE SERVER batch10 FOREIGN DATA WRAPPER postgres_fdw OPTIONS( batch_size '10' );
+SELECT count(*)
+FROM pg_foreign_server
+WHERE srvname = 'batch10'
+AND srvoptions @> array['batch_size=10'];
+ count 
+-------
+     1
+(1 row)
+
+ALTER SERVER batch10 OPTIONS( SET batch_size '20' );
+SELECT count(*)
+FROM pg_foreign_server
+WHERE srvname = 'batch10'
+AND srvoptions @> array['batch_size=10'];
+ count 
+-------
+     0
+(1 row)
+
+SELECT count(*)
+FROM pg_foreign_server
+WHERE srvname = 'batch10'
+AND srvoptions @> array['batch_size=20'];
+ count 
+-------
+     1
+(1 row)
+
+CREATE FOREIGN TABLE table30 ( x int ) SERVER batch10 OPTIONS ( batch_size '30' );
+SELECT COUNT(*)
+FROM pg_foreign_table
+WHERE ftrelid = 'table30'::regclass
+AND ftoptions @> array['batch_size=30'];
+ count 
+-------
+     1
+(1 row)
+
+ALTER FOREIGN TABLE table30 OPTIONS ( SET batch_size '40');
+SELECT COUNT(*)
+FROM pg_foreign_table
+WHERE ftrelid = 'table30'::regclass
+AND ftoptions @> array['batch_size=30'];
+ count 
+-------
+     0
+(1 row)
+
+SELECT COUNT(*)
+FROM pg_foreign_table
+WHERE ftrelid = 'table30'::regclass
+AND ftoptions @> array['batch_size=40'];
+ count 
+-------
+     1
+(1 row)
+
+ROLLBACK;
+CREATE TABLE batch_table ( x int );
+CREATE FOREIGN TABLE ftable ( x int ) SERVER loopback OPTIONS ( table_name 'batch_table', batch_size '10' );
+INSERT INTO ftable SELECT * FROM generate_series(1, 10) i;
+INSERT INTO ftable SELECT * FROM generate_series(11, 31) i;
+INSERT INTO ftable VALUES (32);
+INSERT INTO ftable VALUES (33), (34);
+SELECT COUNT(*) FROM ftable;
+ count 
+-------
+    34
+(1 row)
+
+TRUNCATE batch_table;
+DROP FOREIGN TABLE ftable;
+-- Disable batch insert
+CREATE FOREIGN TABLE ftable ( x int ) SERVER loopback OPTIONS ( table_name 'batch_table', batch_size '1' );
+INSERT INTO ftable VALUES (1), (2);
+SELECT COUNT(*) FROM ftable;
+ count 
+-------
+     2
+(1 row)
+
+DROP FOREIGN TABLE ftable;
+DROP TABLE batch_table;
+-- Use partitioning
+CREATE TABLE batch_table ( x int ) PARTITION BY HASH (x);
+CREATE TABLE batch_table_p0 (LIKE batch_table);
+CREATE FOREIGN TABLE batch_table_p0f
+	PARTITION OF batch_table
+	FOR VALUES WITH (MODULUS 3, REMAINDER 0)
+	SERVER loopback
+	OPTIONS (table_name 'batch_table_p0', batch_size '10');
+CREATE TABLE batch_table_p1 (LIKE batch_table);
+CREATE FOREIGN TABLE batch_table_p1f
+	PARTITION OF batch_table
+	FOR VALUES WITH (MODULUS 3, REMAINDER 1)
+	SERVER loopback
+	OPTIONS (table_name 'batch_table_p1', batch_size '1');
+CREATE TABLE batch_table_p2
+	PARTITION OF batch_table
+	FOR VALUES WITH (MODULUS 3, REMAINDER 2);
+INSERT INTO batch_table SELECT * FROM generate_series(1, 66) i;
+SELECT COUNT(*) FROM batch_table;
+ count 
+-------
+    66
+(1 row)
+
+-- Clean up
+DROP TABLE batch_table CASCADE;
diff --git a/contrib/postgres_fdw/option.c b/contrib/postgres_fdw/option.c
index 1fec3c3eea..64698c4da3 100644
--- a/contrib/postgres_fdw/option.c
+++ b/contrib/postgres_fdw/option.c
@@ -142,6 +142,17 @@ postgres_fdw_validator(PG_FUNCTION_ARGS)
 						 errmsg("%s requires a non-negative integer value",
 								def->defname)));
 		}
+		else if (strcmp(def->defname, "batch_size") == 0)
+		{
+			int			batch_size;
+
+			batch_size = strtol(defGetString(def), NULL, 10);
+			if (batch_size <= 0)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("%s requires a non-negative integer value",
+								def->defname)));
+		}
 		else if (strcmp(def->defname, "password_required") == 0)
 		{
 			bool		pw_required = defGetBoolean(def);
@@ -203,6 +214,9 @@ InitPgFdwOptions(void)
 		/* fetch_size is available on both server and table */
 		{"fetch_size", ForeignServerRelationId, false},
 		{"fetch_size", ForeignTableRelationId, false},
+		/* batch_size is available on both server and table */
+		{"batch_size", ForeignServerRelationId, false},
+		{"batch_size", ForeignTableRelationId, false},
 		{"password_required", UserMappingRelationId, false},
 
 		/*
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 2f2d4d171c..e6b1403ff1 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -87,8 +87,10 @@ enum FdwScanPrivateIndex
  * 1) INSERT/UPDATE/DELETE statement text to be sent to the remote server
  * 2) Integer list of target attribute numbers for INSERT/UPDATE
  *	  (NIL for a DELETE)
- * 3) Boolean flag showing if the remote query has a RETURNING clause
- * 4) Integer list of attribute numbers retrieved by RETURNING, if any
+ * 3) Length till the end of VALUES clause for INSERT
+ *	  (-1 for a DELETE/UPDATE)
+ * 4) Boolean flag showing if the remote query has a RETURNING clause
+ * 5) Integer list of attribute numbers retrieved by RETURNING, if any
  */
 enum FdwModifyPrivateIndex
 {
@@ -96,6 +98,8 @@ enum FdwModifyPrivateIndex
 	FdwModifyPrivateUpdateSql,
 	/* Integer list of target attribute numbers for INSERT/UPDATE */
 	FdwModifyPrivateTargetAttnums,
+	/* Length till the end of VALUES clause (as an integer Value node) */
+	FdwModifyPrivateLen,
 	/* has-returning flag (as an integer Value node) */
 	FdwModifyPrivateHasReturning,
 	/* Integer list of attribute numbers retrieved by RETURNING */
@@ -176,7 +180,10 @@ typedef struct PgFdwModifyState
 
 	/* extracted fdw_private data */
 	char	   *query;			/* text of INSERT/UPDATE/DELETE command */
+	char	   *orig_query;		/* original text of INSERT command */
 	List	   *target_attrs;	/* list of target attribute numbers */
+	int			values_end;		/* length up to the end of VALUES */
+	int			batch_size;		/* value of FDW option "batch_size" */
 	bool		has_returning;	/* is there a RETURNING clause? */
 	List	   *retrieved_attrs;	/* attr numbers retrieved by RETURNING */
 
@@ -185,6 +192,9 @@ typedef struct PgFdwModifyState
 	int			p_nums;			/* number of parameters to transmit */
 	FmgrInfo   *p_flinfo;		/* output conversion functions for them */
 
+	/* batch operation stuff */
+	int			num_slots;		/* number of slots to insert */
+
 	/* working memory context */
 	MemoryContext temp_cxt;		/* context for per-tuple temporary data */
 
@@ -343,6 +353,12 @@ static TupleTableSlot *postgresExecForeignInsert(EState *estate,
 												 ResultRelInfo *resultRelInfo,
 												 TupleTableSlot *slot,
 												 TupleTableSlot *planSlot);
+static TupleTableSlot **postgresExecForeignBatchInsert(EState *estate,
+												 ResultRelInfo *resultRelInfo,
+												 TupleTableSlot **slots,
+												 TupleTableSlot **planSlots,
+												 int *numSlots);
+static int	postgresGetModifyBatchSize(ResultRelInfo *resultRelInfo);
 static TupleTableSlot *postgresExecForeignUpdate(EState *estate,
 												 ResultRelInfo *resultRelInfo,
 												 TupleTableSlot *slot,
@@ -429,20 +445,24 @@ static PgFdwModifyState *create_foreign_modify(EState *estate,
 											   Plan *subplan,
 											   char *query,
 											   List *target_attrs,
+											   int len,
 											   bool has_returning,
 											   List *retrieved_attrs);
-static TupleTableSlot *execute_foreign_modify(EState *estate,
+static TupleTableSlot **execute_foreign_modify(EState *estate,
 											  ResultRelInfo *resultRelInfo,
 											  CmdType operation,
-											  TupleTableSlot *slot,
-											  TupleTableSlot *planSlot);
+											  TupleTableSlot **slots,
+											  TupleTableSlot **planSlots,
+											  int *numSlots);
 static void prepare_foreign_modify(PgFdwModifyState *fmstate);
 static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
 											 ItemPointer tupleid,
-											 TupleTableSlot *slot);
+											 TupleTableSlot **slots,
+											 int numSlots);
 static void store_returning_result(PgFdwModifyState *fmstate,
 								   TupleTableSlot *slot, PGresult *res);
 static void finish_foreign_modify(PgFdwModifyState *fmstate);
+static void deallocate_query(PgFdwModifyState *fmstate);
 static List *build_remote_returning(Index rtindex, Relation rel,
 									List *returningList);
 static void rebuild_fdw_scan_tlist(ForeignScan *fscan, List *tlist);
@@ -530,6 +550,8 @@ postgres_fdw_handler(PG_FUNCTION_ARGS)
 	routine->PlanForeignModify = postgresPlanForeignModify;
 	routine->BeginForeignModify = postgresBeginForeignModify;
 	routine->ExecForeignInsert = postgresExecForeignInsert;
+	routine->ExecForeignBatchInsert = postgresExecForeignBatchInsert;
+	routine->GetModifyBatchSize = postgresGetModifyBatchSize;
 	routine->ExecForeignUpdate = postgresExecForeignUpdate;
 	routine->ExecForeignDelete = postgresExecForeignDelete;
 	routine->EndForeignModify = postgresEndForeignModify;
@@ -1665,6 +1687,7 @@ postgresPlanForeignModify(PlannerInfo *root,
 	List	   *returningList = NIL;
 	List	   *retrieved_attrs = NIL;
 	bool		doNothing = false;
+	int			values_end_len = -1;
 
 	initStringInfo(&sql);
 
@@ -1752,7 +1775,7 @@ postgresPlanForeignModify(PlannerInfo *root,
 			deparseInsertSql(&sql, rte, resultRelation, rel,
 							 targetAttrs, doNothing,
 							 withCheckOptionList, returningList,
-							 &retrieved_attrs);
+							 &retrieved_attrs, &values_end_len);
 			break;
 		case CMD_UPDATE:
 			deparseUpdateSql(&sql, rte, resultRelation, rel,
@@ -1776,8 +1799,9 @@ postgresPlanForeignModify(PlannerInfo *root,
 	 * Build the fdw_private list that will be available to the executor.
 	 * Items in the list must match enum FdwModifyPrivateIndex, above.
 	 */
-	return list_make4(makeString(sql.data),
+	return list_make5(makeString(sql.data),
 					  targetAttrs,
+					  makeInteger(values_end_len),
 					  makeInteger((retrieved_attrs != NIL)),
 					  retrieved_attrs);
 }
@@ -1797,6 +1821,7 @@ postgresBeginForeignModify(ModifyTableState *mtstate,
 	char	   *query;
 	List	   *target_attrs;
 	bool		has_returning;
+	int			values_end_len;
 	List	   *retrieved_attrs;
 	RangeTblEntry *rte;
 
@@ -1812,6 +1837,8 @@ postgresBeginForeignModify(ModifyTableState *mtstate,
 							FdwModifyPrivateUpdateSql));
 	target_attrs = (List *) list_nth(fdw_private,
 									 FdwModifyPrivateTargetAttnums);
+	values_end_len = intVal(list_nth(fdw_private,
+									FdwModifyPrivateLen));
 	has_returning = intVal(list_nth(fdw_private,
 									FdwModifyPrivateHasReturning));
 	retrieved_attrs = (List *) list_nth(fdw_private,
@@ -1829,6 +1856,7 @@ postgresBeginForeignModify(ModifyTableState *mtstate,
 									mtstate->mt_plans[subplan_index]->plan,
 									query,
 									target_attrs,
+									values_end_len,
 									has_returning,
 									retrieved_attrs);
 
@@ -1846,7 +1874,8 @@ postgresExecForeignInsert(EState *estate,
 						  TupleTableSlot *planSlot)
 {
 	PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
-	TupleTableSlot *rslot;
+	TupleTableSlot **rslot;
+	int 			numSlots = 1;
 
 	/*
 	 * If the fmstate has aux_fmstate set, use the aux_fmstate (see
@@ -1855,7 +1884,36 @@ postgresExecForeignInsert(EState *estate,
 	if (fmstate->aux_fmstate)
 		resultRelInfo->ri_FdwState = fmstate->aux_fmstate;
 	rslot = execute_foreign_modify(estate, resultRelInfo, CMD_INSERT,
-								   slot, planSlot);
+								   &slot, &planSlot, &numSlots);
+	/* Revert that change */
+	if (fmstate->aux_fmstate)
+		resultRelInfo->ri_FdwState = fmstate;
+
+	return rslot ? *rslot : NULL;
+}
+
+/*
+ * postgresExecForeignBatchInsert
+ *		Insert multiple rows into a foreign table
+ */
+static TupleTableSlot **
+postgresExecForeignBatchInsert(EState *estate,
+						  ResultRelInfo *resultRelInfo,
+						  TupleTableSlot **slots,
+						  TupleTableSlot **planSlots,
+						  int *numSlots)
+{
+	PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
+	TupleTableSlot **rslot;
+
+	/*
+	 * If the fmstate has aux_fmstate set, use the aux_fmstate (see
+	 * postgresBeginForeignInsert())
+	 */
+	if (fmstate->aux_fmstate)
+		resultRelInfo->ri_FdwState = fmstate->aux_fmstate;
+	rslot = execute_foreign_modify(estate, resultRelInfo, CMD_INSERT,
+								   slots, planSlots, numSlots);
 	/* Revert that change */
 	if (fmstate->aux_fmstate)
 		resultRelInfo->ri_FdwState = fmstate;
@@ -1863,6 +1921,16 @@ postgresExecForeignInsert(EState *estate,
 	return rslot;
 }
 
+/*
+ * postgresGetModifyBatchSize
+ *		Report the maximum number of tuples that can be inserted in bulk
+ */
+static int
+postgresGetModifyBatchSize(ResultRelInfo *resultRelInfo)
+{
+	return ((PgFdwModifyState *) resultRelInfo->ri_FdwState)->batch_size;
+}
+
 /*
  * postgresExecForeignUpdate
  *		Update one row in a foreign table
@@ -1873,8 +1941,13 @@ postgresExecForeignUpdate(EState *estate,
 						  TupleTableSlot *slot,
 						  TupleTableSlot *planSlot)
 {
-	return execute_foreign_modify(estate, resultRelInfo, CMD_UPDATE,
-								  slot, planSlot);
+	TupleTableSlot **rslot;
+	int 			numSlots = 1;
+
+	rslot = execute_foreign_modify(estate, resultRelInfo, CMD_UPDATE,
+								  &slot, &planSlot, &numSlots);
+
+	return rslot ? rslot[0] : NULL;
 }
 
 /*
@@ -1887,8 +1960,13 @@ postgresExecForeignDelete(EState *estate,
 						  TupleTableSlot *slot,
 						  TupleTableSlot *planSlot)
 {
-	return execute_foreign_modify(estate, resultRelInfo, CMD_DELETE,
-								  slot, planSlot);
+	TupleTableSlot **rslot;
+	int 			numSlots = 1;
+
+	rslot = execute_foreign_modify(estate, resultRelInfo, CMD_DELETE,
+								  &slot, &planSlot, &numSlots);
+
+	return rslot ? rslot[0] : NULL;
 }
 
 /*
@@ -1925,6 +2003,7 @@ postgresBeginForeignInsert(ModifyTableState *mtstate,
 	RangeTblEntry *rte;
 	TupleDesc	tupdesc = RelationGetDescr(rel);
 	int			attnum;
+	int			values_end_len;
 	StringInfoData sql;
 	List	   *targetAttrs = NIL;
 	List	   *retrieved_attrs = NIL;
@@ -2001,7 +2080,7 @@ postgresBeginForeignInsert(ModifyTableState *mtstate,
 	deparseInsertSql(&sql, rte, resultRelation, rel, targetAttrs, doNothing,
 					 resultRelInfo->ri_WithCheckOptions,
 					 resultRelInfo->ri_returningList,
-					 &retrieved_attrs);
+					 &retrieved_attrs, &values_end_len);
 
 	/* Construct an execution state. */
 	fmstate = create_foreign_modify(mtstate->ps.state,
@@ -2011,6 +2090,7 @@ postgresBeginForeignInsert(ModifyTableState *mtstate,
 									NULL,
 									sql.data,
 									targetAttrs,
+									values_end_len,
 									retrieved_attrs != NIL,
 									retrieved_attrs);
 
@@ -2636,6 +2716,9 @@ postgresExplainForeignModify(ModifyTableState *mtstate,
 										  FdwModifyPrivateUpdateSql));
 
 		ExplainPropertyText("Remote SQL", sql, es);
+
+		if (rinfo->ri_BatchSize > 0)
+			ExplainPropertyInteger("Batch Size", NULL, rinfo->ri_BatchSize, es);
 	}
 }
 
@@ -3530,6 +3613,7 @@ create_foreign_modify(EState *estate,
 					  Plan *subplan,
 					  char *query,
 					  List *target_attrs,
+					  int values_end,
 					  bool has_returning,
 					  List *retrieved_attrs)
 {
@@ -3538,6 +3622,7 @@ create_foreign_modify(EState *estate,
 	TupleDesc	tupdesc = RelationGetDescr(rel);
 	Oid			userid;
 	ForeignTable *table;
+	ForeignServer *server;
 	UserMapping *user;
 	AttrNumber	n_params;
 	Oid			typefnoid;
@@ -3564,7 +3649,10 @@ create_foreign_modify(EState *estate,
 
 	/* Set up remote query information. */
 	fmstate->query = query;
+	if (operation == CMD_INSERT)
+		fmstate->orig_query = pstrdup(fmstate->query);
 	fmstate->target_attrs = target_attrs;
+	fmstate->values_end = values_end;
 	fmstate->has_returning = has_returning;
 	fmstate->retrieved_attrs = retrieved_attrs;
 
@@ -3616,6 +3704,44 @@ create_foreign_modify(EState *estate,
 
 	Assert(fmstate->p_nums <= n_params);
 
+	/* Set batch_size from foreign server/table options. */
+	if (operation == CMD_INSERT)
+	{
+		/* Check the foreign table option. */
+		foreach(lc, table->options)
+		{
+			DefElem    *def = (DefElem *) lfirst(lc);
+
+			if (strcmp(def->defname, "batch_size") == 0)
+			{
+				fmstate->batch_size = strtol(defGetString(def), NULL, 10);
+				break;
+			}
+		}
+
+		/* Check the foreign server option if the table option is not set. */
+		if (fmstate->batch_size == 0)
+		{
+			server = GetForeignServer(table->serverid);
+			foreach(lc, server->options)
+			{
+				DefElem    *def = (DefElem *) lfirst(lc);
+
+				if (strcmp(def->defname, "batch_size") == 0)
+				{
+					fmstate->batch_size = strtol(defGetString(def), NULL, 10);
+					break;
+				}
+			}
+		}
+
+		/* If neither the table nor server option is set, set the default. */
+		if (fmstate->batch_size == 0)
+			fmstate->batch_size = 100;
+	}
+
+	fmstate->num_slots = 1;
+
 	/* Initialize auxiliary state */
 	fmstate->aux_fmstate = NULL;
 
@@ -3626,26 +3752,50 @@ create_foreign_modify(EState *estate,
  * execute_foreign_modify
  *		Perform foreign-table modification as required, and fetch RETURNING
  *		result if any.  (This is the shared guts of postgresExecForeignInsert,
- *		postgresExecForeignUpdate, and postgresExecForeignDelete.)
+ *		postgresExecForeignBatchInsert, postgresExecForeignUpdate, and
+ *		postgresExecForeignDelete.)
  */
-static TupleTableSlot *
+static TupleTableSlot **
 execute_foreign_modify(EState *estate,
 					   ResultRelInfo *resultRelInfo,
 					   CmdType operation,
-					   TupleTableSlot *slot,
-					   TupleTableSlot *planSlot)
+					   TupleTableSlot **slots,
+					   TupleTableSlot **planSlots,
+					   int *numSlots)
 {
 	PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
 	ItemPointer ctid = NULL;
 	const char **p_values;
 	PGresult   *res;
 	int			n_rows;
+	StringInfoData sql;
 
 	/* The operation should be INSERT, UPDATE, or DELETE */
 	Assert(operation == CMD_INSERT ||
 		   operation == CMD_UPDATE ||
 		   operation == CMD_DELETE);
 
+	/*
+	 * If the existing query was deparsed and prepared for a different number
+	 * of rows, rebuild it for the proper number.
+	 */
+	if (operation == CMD_INSERT && fmstate->num_slots != *numSlots)
+	{
+		/* Destroy the prepared statement created previously */
+		if (fmstate->p_name)
+			deallocate_query(fmstate);
+
+		/*
+		 * Build INSERT string with numSlots records in its VALUES clause.
+		 */
+		initStringInfo(&sql);
+		rebuildInsertSql(&sql, fmstate->orig_query, fmstate->values_end,
+						 fmstate->p_nums, *numSlots - 1);
+		pfree(fmstate->query);
+		fmstate->query = sql.data;
+		fmstate->num_slots = *numSlots;
+	}
+
 	/* Set up the prepared statement on the remote server, if we didn't yet */
 	if (!fmstate->p_name)
 		prepare_foreign_modify(fmstate);
@@ -3658,7 +3808,7 @@ execute_foreign_modify(EState *estate,
 		Datum		datum;
 		bool		isNull;
 
-		datum = ExecGetJunkAttribute(planSlot,
+		datum = ExecGetJunkAttribute(planSlots[0],
 									 fmstate->ctidAttno,
 									 &isNull);
 		/* shouldn't ever get a null result... */
@@ -3668,14 +3818,14 @@ execute_foreign_modify(EState *estate,
 	}
 
 	/* Convert parameters needed by prepared statement to text form */
-	p_values = convert_prep_stmt_params(fmstate, ctid, slot);
+	p_values = convert_prep_stmt_params(fmstate, ctid, slots, *numSlots);
 
 	/*
 	 * Execute the prepared statement.
 	 */
 	if (!PQsendQueryPrepared(fmstate->conn,
 							 fmstate->p_name,
-							 fmstate->p_nums,
+							 fmstate->p_nums * (*numSlots),
 							 p_values,
 							 NULL,
 							 NULL,
@@ -3696,9 +3846,10 @@ execute_foreign_modify(EState *estate,
 	/* Check number of rows affected, and fetch RETURNING tuple if any */
 	if (fmstate->has_returning)
 	{
+		Assert(*numSlots == 1);
 		n_rows = PQntuples(res);
 		if (n_rows > 0)
-			store_returning_result(fmstate, slot, res);
+			store_returning_result(fmstate, slots[0], res);
 	}
 	else
 		n_rows = atoi(PQcmdTuples(res));
@@ -3708,10 +3859,12 @@ execute_foreign_modify(EState *estate,
 
 	MemoryContextReset(fmstate->temp_cxt);
 
+	*numSlots = n_rows;
+
 	/*
 	 * Return NULL if nothing was inserted/updated/deleted on the remote end
 	 */
-	return (n_rows > 0) ? slot : NULL;
+	return (n_rows > 0) ? slots : NULL;
 }
 
 /*
@@ -3771,52 +3924,64 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
 static const char **
 convert_prep_stmt_params(PgFdwModifyState *fmstate,
 						 ItemPointer tupleid,
-						 TupleTableSlot *slot)
+						 TupleTableSlot **slots,
+						 int numSlots)
 {
 	const char **p_values;
+	int			i;
+	int			j;
 	int			pindex = 0;
 	MemoryContext oldcontext;
 
 	oldcontext = MemoryContextSwitchTo(fmstate->temp_cxt);
 
-	p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums);
+	p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums * numSlots);
+
+	/* ctid is provided only for UPDATE/DELETE, which don't allow batching */
+	Assert(!(tupleid != NULL && numSlots > 1));
 
 	/* 1st parameter should be ctid, if it's in use */
 	if (tupleid != NULL)
 	{
+		Assert(numSlots == 1);
 		/* don't need set_transmission_modes for TID output */
 		p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
 											  PointerGetDatum(tupleid));
 		pindex++;
 	}
 
-	/* get following parameters from slot */
-	if (slot != NULL && fmstate->target_attrs != NIL)
+	/* get following parameters from slots */
+	if (slots != NULL && fmstate->target_attrs != NIL)
 	{
 		int			nestlevel;
 		ListCell   *lc;
 
 		nestlevel = set_transmission_modes();
 
-		foreach(lc, fmstate->target_attrs)
+		for (i = 0; i < numSlots; i++)
 		{
-			int			attnum = lfirst_int(lc);
-			Datum		value;
-			bool		isnull;
+			j = (tupleid != NULL) ? 1 : 0;
+			foreach(lc, fmstate->target_attrs)
+			{
+				int			attnum = lfirst_int(lc);
+				Datum		value;
+				bool		isnull;
 
-			value = slot_getattr(slot, attnum, &isnull);
-			if (isnull)
-				p_values[pindex] = NULL;
-			else
-				p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
-													  value);
-			pindex++;
+				value = slot_getattr(slots[i], attnum, &isnull);
+				if (isnull)
+					p_values[pindex] = NULL;
+				else
+					p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[j],
+														  value);
+				pindex++;
+				j++;
+			}
 		}
 
 		reset_transmission_modes(nestlevel);
 	}
 
-	Assert(pindex == fmstate->p_nums);
+	Assert(pindex == fmstate->p_nums * numSlots);
 
 	MemoryContextSwitchTo(oldcontext);
 
@@ -3870,29 +4035,41 @@ finish_foreign_modify(PgFdwModifyState *fmstate)
 	Assert(fmstate != NULL);
 
 	/* If we created a prepared statement, destroy it */
-	if (fmstate->p_name)
-	{
-		char		sql[64];
-		PGresult   *res;
-
-		snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name);
-
-		/*
-		 * We don't use a PG_TRY block here, so be careful not to throw error
-		 * without releasing the PGresult.
-		 */
-		res = pgfdw_exec_query(fmstate->conn, sql);
-		if (PQresultStatus(res) != PGRES_COMMAND_OK)
-			pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
-		PQclear(res);
-		fmstate->p_name = NULL;
-	}
+	deallocate_query(fmstate);
 
 	/* Release remote connection */
 	ReleaseConnection(fmstate->conn);
 	fmstate->conn = NULL;
 }
 
+/*
+ * deallocate_query
+ *		Deallocate a prepared statement for a foreign insert/update/delete
+ *		operation
+ */
+static void
+deallocate_query(PgFdwModifyState *fmstate)
+{
+	char		sql[64];
+	PGresult   *res;
+
+	/* do nothing if the query is not allocated */
+	if (!fmstate->p_name)
+		return;
+
+	snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name);
+
+	/*
+	 * We don't use a PG_TRY block here, so be careful not to throw error
+	 * without releasing the PGresult.
+	 */
+	res = pgfdw_exec_query(fmstate->conn, sql);
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
+	PQclear(res);
+	fmstate->p_name = NULL;
+}
+
 /*
  * build_remote_returning
  *		Build a RETURNING targetlist of a remote query for performing an
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index 19ea27a1bc..1f67b4d9fd 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -161,7 +161,10 @@ extern void deparseInsertSql(StringInfo buf, RangeTblEntry *rte,
 							 Index rtindex, Relation rel,
 							 List *targetAttrs, bool doNothing,
 							 List *withCheckOptionList, List *returningList,
-							 List **retrieved_attrs);
+							 List **retrieved_attrs, int *values_end_len);
+extern void rebuildInsertSql(StringInfo buf, char *orig_query,
+							 int values_end_len, int num_cols,
+							 int num_rows);
 extern void deparseUpdateSql(StringInfo buf, RangeTblEntry *rte,
 							 Index rtindex, Relation rel,
 							 List *targetAttrs,
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index 25dbc08b98..fd5abf2471 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -2711,3 +2711,94 @@ SELECT 1 FROM ft1 LIMIT 1;
 ALTER SERVER loopback OPTIONS (ADD use_remote_estimate 'off');
 -- The invalid connection gets closed in pgfdw_xact_callback during commit.
 COMMIT;
+
+-- ===================================================================
+-- batch insert
+-- ===================================================================
+
+BEGIN;
+
+CREATE SERVER batch10 FOREIGN DATA WRAPPER postgres_fdw OPTIONS( batch_size '10' );
+
+SELECT count(*)
+FROM pg_foreign_server
+WHERE srvname = 'batch10'
+AND srvoptions @> array['batch_size=10'];
+
+ALTER SERVER batch10 OPTIONS( SET batch_size '20' );
+
+SELECT count(*)
+FROM pg_foreign_server
+WHERE srvname = 'batch10'
+AND srvoptions @> array['batch_size=10'];
+
+SELECT count(*)
+FROM pg_foreign_server
+WHERE srvname = 'batch10'
+AND srvoptions @> array['batch_size=20'];
+
+CREATE FOREIGN TABLE table30 ( x int ) SERVER batch10 OPTIONS ( batch_size '30' );
+
+SELECT COUNT(*)
+FROM pg_foreign_table
+WHERE ftrelid = 'table30'::regclass
+AND ftoptions @> array['batch_size=30'];
+
+ALTER FOREIGN TABLE table30 OPTIONS ( SET batch_size '40');
+
+SELECT COUNT(*)
+FROM pg_foreign_table
+WHERE ftrelid = 'table30'::regclass
+AND ftoptions @> array['batch_size=30'];
+
+SELECT COUNT(*)
+FROM pg_foreign_table
+WHERE ftrelid = 'table30'::regclass
+AND ftoptions @> array['batch_size=40'];
+
+ROLLBACK;
+
+CREATE TABLE batch_table ( x int );
+
+CREATE FOREIGN TABLE ftable ( x int ) SERVER loopback OPTIONS ( table_name 'batch_table', batch_size '10' );
+INSERT INTO ftable SELECT * FROM generate_series(1, 10) i;
+INSERT INTO ftable SELECT * FROM generate_series(11, 31) i;
+INSERT INTO ftable VALUES (32);
+INSERT INTO ftable VALUES (33), (34);
+SELECT COUNT(*) FROM ftable;
+TRUNCATE batch_table;
+DROP FOREIGN TABLE ftable;
+
+-- Disable batch insert
+CREATE FOREIGN TABLE ftable ( x int ) SERVER loopback OPTIONS ( table_name 'batch_table', batch_size '1' );
+INSERT INTO ftable VALUES (1), (2);
+SELECT COUNT(*) FROM ftable;
+DROP FOREIGN TABLE ftable;
+DROP TABLE batch_table;
+
+-- Use partitioning
+CREATE TABLE batch_table ( x int ) PARTITION BY HASH (x);
+
+CREATE TABLE batch_table_p0 (LIKE batch_table);
+CREATE FOREIGN TABLE batch_table_p0f
+	PARTITION OF batch_table
+	FOR VALUES WITH (MODULUS 3, REMAINDER 0)
+	SERVER loopback
+	OPTIONS (table_name 'batch_table_p0', batch_size '10');
+
+CREATE TABLE batch_table_p1 (LIKE batch_table);
+CREATE FOREIGN TABLE batch_table_p1f
+	PARTITION OF batch_table
+	FOR VALUES WITH (MODULUS 3, REMAINDER 1)
+	SERVER loopback
+	OPTIONS (table_name 'batch_table_p1', batch_size '1');
+
+CREATE TABLE batch_table_p2
+	PARTITION OF batch_table
+	FOR VALUES WITH (MODULUS 3, REMAINDER 2);
+
+INSERT INTO batch_table SELECT * FROM generate_series(1, 66) i;
+SELECT COUNT(*) FROM batch_table;
+
+-- Clean up
+DROP TABLE batch_table CASCADE;
diff --git a/doc/src/sgml/fdwhandler.sgml b/doc/src/sgml/fdwhandler.sgml
index 9c9293414c..02a34b40b3 100644
--- a/doc/src/sgml/fdwhandler.sgml
+++ b/doc/src/sgml/fdwhandler.sgml
@@ -523,8 +523,9 @@ BeginForeignModify(ModifyTableState *mtstate,
      Begin executing a foreign table modification operation.  This routine is
      called during executor startup.  It should perform any initialization
      needed prior to the actual table modifications.  Subsequently,
-     <function>ExecForeignInsert</function>, <function>ExecForeignUpdate</function> or
-     <function>ExecForeignDelete</function> will be called for each tuple to be
+     <function>ExecForeignInsert/ExecForeignBatchInsert</function>,
+     <function>ExecForeignUpdate</function> or
+     <function>ExecForeignDelete</function> will be called for tuple(s) to be
      inserted, updated, or deleted.
     </para>
 
@@ -614,6 +615,81 @@ ExecForeignInsert(EState *estate,
 
     <para>
 <programlisting>
+TupleTableSlot **
+ExecForeignBatchInsert(EState *estate,
+                  ResultRelInfo *rinfo,
+                  TupleTableSlot **slots,
+                  TupleTableSlot *planSlots,
+                  int *numSlots);
+</programlisting>
+
+     Insert multiple tuples in bulk into the foreign table.
+     The parameters are the same for <function>ExecForeignInsert</function>
+     except <literal>slots</literal> and <literal>planSlots</literal> contain
+     multiple tuples and <literal>*numSlots></literal> specifies the number of
+     tuples in those arrays.
+    </para>
+
+    <para>
+     The return value is an array of slots containing the data that was
+     actually inserted (this might differ from the data supplied, for
+     example as a result of trigger actions.)
+     The passed-in <literal>slots</literal> can be re-used for this purpose.
+     The number of successfully inserted tuples is returned in
+     <literal>*numSlots</literal>.
+    </para>
+
+    <para>
+     The data in the returned slot is used only if the <command>INSERT</command>
+     statement involves a view
+     <literal>WITH CHECK OPTION</literal>; or if the foreign table has
+     an <literal>AFTER ROW</literal> trigger.  Triggers require all columns,
+     but the FDW could choose to optimize away returning some or all columns
+     depending on the contents of the
+     <literal>WITH CHECK OPTION</literal> constraints.
+    </para>
+
+    <para>
+     If the <function>ExecForeignBatchInsert</function> or
+     <function>GetModifyBatchSize</function> pointer is set to
+     <literal>NULL</literal>, attempts to insert into the foreign table will
+     use <function>ExecForeignInsert</function>.
+     This function is not used if the <command>INSERT</command> has the
+     <literal>RETURNING></literal> clause.
+    </para>
+
+    <para>
+     Note that this function is also called when inserting routed tuples into
+     a foreign-table partition.  See the callback functions
+     described below that allow the FDW to support that.
+    </para>
+
+    <para>
+<programlisting>
+int
+GetModifyBatchSize(ResultRelInfo *rinfo);
+</programlisting>
+
+     Report the maximum number of tuples that a single
+     <function>ExecForeignBatchInsert</function> call can handle for
+     the specified foreign table.  That is, The executor passes at most
+     the number of tuples that this function returns to
+     <function>ExecForeignBatchInsert</function>.
+     <literal>rinfo</literal> is the <structname>ResultRelInfo</structname> struct describing
+     the target foreign table.
+     The FDW is expected to provide a foreign server and/or foreign
+     table option for the user to set this value, or some hard-coded value.
+    </para>
+
+    <para>
+     If the <function>ExecForeignBatchInsert</function> or
+     <function>GetModifyBatchSize</function> pointer is set to
+     <literal>NULL</literal>, attempts to insert into the foreign table will
+     use <function>ExecForeignInsert</function>.
+    </para>
+
+    <para>
+<programlisting>
 TupleTableSlot *
 ExecForeignUpdate(EState *estate,
                   ResultRelInfo *rinfo,
@@ -741,8 +817,9 @@ BeginForeignInsert(ModifyTableState *mtstate,
      in both cases when it is the partition chosen for tuple routing and the
      target specified in a <command>COPY FROM</command> command.  It should
      perform any initialization needed prior to the actual insertion.
-     Subsequently, <function>ExecForeignInsert</function> will be called for
-     each tuple to be inserted into the foreign table.
+     Subsequently, <function>ExecForeignInsert</function> or
+     <function>ExecForeignBatchInsert</function> will be called for
+     tuple(s) to be inserted into the foreign table.
     </para>
 
     <para>
@@ -773,8 +850,8 @@ BeginForeignInsert(ModifyTableState *mtstate,
     <para>
      Note that if the FDW does not support routable foreign-table partitions
      and/or executing <command>COPY FROM</command> on foreign tables, this
-     function or <function>ExecForeignInsert</function> subsequently called
-     must throw error as needed.
+     function or <function>ExecForeignInsert/ExecForeignBatchInsert</function>
+     subsequently called must throw error as needed.
     </para>
 
     <para>
diff --git a/doc/src/sgml/postgres-fdw.sgml b/doc/src/sgml/postgres-fdw.sgml
index e6fd2143c1..97eeb64a02 100644
--- a/doc/src/sgml/postgres-fdw.sgml
+++ b/doc/src/sgml/postgres-fdw.sgml
@@ -354,6 +354,19 @@ OPTIONS (ADD password_required 'false');
      </listitem>
     </varlistentry>
 
+    <varlistentry>
+     <term><literal>batch_size</literal></term>
+     <listitem>
+      <para>
+       This option specifies the number of rows <filename>postgres_fdw</filename>
+       should insert in each insert operation. It can be specified for a
+       foreign table or a foreign server. The option specified on a table
+       overrides an option specified for the server.
+       The default is <literal>100</literal>.
+      </para>
+     </listitem>
+    </varlistentry>
+
    </variablelist>
 
   </sect3>
diff --git a/src/backend/executor/execPartition.c b/src/backend/executor/execPartition.c
index 941731a0a9..b0a354ad6f 100644
--- a/src/backend/executor/execPartition.c
+++ b/src/backend/executor/execPartition.c
@@ -2192,3 +2192,14 @@ find_matching_subplans_recurse(PartitionPruningData *prunedata,
 		}
 	}
 }
+
+/*
+ * ExecGetTouchedPartitions -- Get the partitions touched by
+ * this routing
+ */
+ResultRelInfo **
+ExecGetTouchedPartitions(PartitionTupleRouting *proute, int *count)
+{
+	*count = proute->num_partitions;
+	return proute->partitions;
+}
diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c
index d7b8f65591..bf77d2491c 100644
--- a/src/backend/executor/nodeModifyTable.c
+++ b/src/backend/executor/nodeModifyTable.c
@@ -58,6 +58,13 @@
 #include "utils/rel.h"
 
 
+static void ExecBatchInsert(ModifyTableState *mtstate,
+								 ResultRelInfo *resultRelInfo,
+								 TupleTableSlot **slots,
+								 TupleTableSlot **planSlots,
+								 int numSlots,
+								 EState *estate,
+								 bool canSetTag);
 static bool ExecOnConflictUpdate(ModifyTableState *mtstate,
 								 ResultRelInfo *resultRelInfo,
 								 ItemPointer conflictTid,
@@ -389,6 +396,7 @@ ExecInsert(ModifyTableState *mtstate,
 	ModifyTable *node = (ModifyTable *) mtstate->ps.plan;
 	OnConflictAction onconflict = node->onConflictAction;
 	PartitionTupleRouting *proute = mtstate->mt_partition_tuple_routing;
+	MemoryContext oldContext;
 
 	/*
 	 * If the input result relation is a partitioned table, find the leaf
@@ -441,6 +449,70 @@ ExecInsert(ModifyTableState *mtstate,
 			ExecComputeStoredGenerated(resultRelInfo, estate, slot,
 									   CMD_INSERT);
 
+		/*
+		 * Determine if the FDW supports batch insert and determine the batch
+		 * size (a FDW may support batching, but it mayb e disabled for the
+		 * server/table). Do this only once, at the beginning - we don't want
+		 * the batch size to change during execution.
+		 */
+		if (resultRelInfo->ri_FdwRoutine->GetModifyBatchSize &&
+			resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert &&
+			resultRelInfo->ri_projectReturning == NULL &&
+			resultRelInfo->ri_BatchSize == 0)
+			resultRelInfo->ri_BatchSize =
+				resultRelInfo->ri_FdwRoutine->GetModifyBatchSize(resultRelInfo);
+
+		Assert(resultRelInfo->ri_BatchSize >= 0);
+
+		/*
+		 * If the FDW supports batching, and batching is requested, accumulate
+		 * rows and insert them in batches. Otherwise use the per-row inserts.
+		 */
+		if (resultRelInfo->ri_BatchSize > 1)
+		{
+			/*
+			 * If a certain number of tuples have already been accumulated,
+			 * or a tuple has come for a different relation than that for
+			 * the accumulated tuples, perform the batch insert
+			 */
+			if (resultRelInfo->ri_NumSlots == resultRelInfo->ri_BatchSize)
+			{
+				ExecBatchInsert(mtstate, resultRelInfo,
+							   resultRelInfo->ri_Slots,
+							   resultRelInfo->ri_PlanSlots,
+							   resultRelInfo->ri_NumSlots,
+							   estate, canSetTag);
+				resultRelInfo->ri_NumSlots = 0;
+			}
+
+			oldContext = MemoryContextSwitchTo(estate->es_query_cxt);
+
+			if (resultRelInfo->ri_Slots == NULL)
+			{
+				resultRelInfo->ri_Slots = palloc(sizeof(TupleTableSlot *) *
+										   resultRelInfo->ri_BatchSize);
+				resultRelInfo->ri_PlanSlots = palloc(sizeof(TupleTableSlot *) *
+										   resultRelInfo->ri_BatchSize);
+			}
+
+			resultRelInfo->ri_Slots[resultRelInfo->ri_NumSlots] =
+				MakeSingleTupleTableSlot(slot->tts_tupleDescriptor,
+										 slot->tts_ops);
+			ExecCopySlot(resultRelInfo->ri_Slots[resultRelInfo->ri_NumSlots],
+						 slot);
+			resultRelInfo->ri_PlanSlots[resultRelInfo->ri_NumSlots] =
+				MakeSingleTupleTableSlot(planSlot->tts_tupleDescriptor,
+										 planSlot->tts_ops);
+			ExecCopySlot(resultRelInfo->ri_PlanSlots[resultRelInfo->ri_NumSlots],
+						 planSlot);
+
+			resultRelInfo->ri_NumSlots++;
+
+			MemoryContextSwitchTo(oldContext);
+
+			return NULL;
+		}
+
 		/*
 		 * insert into foreign table: let the FDW do it
 		 */
@@ -698,6 +770,70 @@ ExecInsert(ModifyTableState *mtstate,
 	return result;
 }
 
+/* ----------------------------------------------------------------
+ *		ExecBatchInsert
+ *
+ *		Insert multiple tuples in an efficient way.
+ *		Currently, this handles inserting into a foreign table without
+ *		RETURNING clause.
+ * ----------------------------------------------------------------
+ */
+static void
+ExecBatchInsert(ModifyTableState *mtstate,
+		   ResultRelInfo *resultRelInfo,
+		   TupleTableSlot **slots,
+		   TupleTableSlot **planSlots,
+		   int numSlots,
+		   EState *estate,
+		   bool canSetTag)
+{
+	int			i;
+	int			numInserted = numSlots;
+	TupleTableSlot *slot = NULL;
+	TupleTableSlot **rslots;
+
+	/*
+	 * insert into foreign table: let the FDW do it
+	 */
+	rslots = resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert(estate,
+																 resultRelInfo,
+																 slots,
+																 planSlots,
+																 &numInserted);
+
+	for (i = 0; i < numInserted; i++)
+	{
+		slot = rslots[i];
+
+		/*
+		 * AFTER ROW Triggers or RETURNING expressions might reference the
+		 * tableoid column, so (re-)initialize tts_tableOid before evaluating
+		 * them.
+		 */
+		slot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
+
+		/* AFTER ROW INSERT Triggers */
+		ExecARInsertTriggers(estate, resultRelInfo, slot, NIL,
+							 mtstate->mt_transition_capture);
+
+		/*
+		 * Check any WITH CHECK OPTION constraints from parent views.  See the
+		 * comment in ExecInsert.
+		 */
+		if (resultRelInfo->ri_WithCheckOptions != NIL)
+			ExecWithCheckOptions(WCO_VIEW_CHECK, resultRelInfo, slot, estate);
+	}
+
+	if (canSetTag && numInserted > 0)
+		estate->es_processed += numInserted;
+
+	for (i = 0; i < numSlots; i++)
+	{
+		ExecDropSingleTupleTableSlot(slots[i]);
+		ExecDropSingleTupleTableSlot(planSlots[i]);
+	}
+}
+
 /* ----------------------------------------------------------------
  *		ExecDelete
  *
@@ -1937,6 +2073,9 @@ ExecModifyTable(PlanState *pstate)
 	ItemPointerData tuple_ctid;
 	HeapTupleData oldtupdata;
 	HeapTuple	oldtuple;
+	PartitionTupleRouting *proute = node->mt_partition_tuple_routing;
+	ResultRelInfo **resultRelInfos;
+	int			num_partitions;
 
 	CHECK_FOR_INTERRUPTS();
 
@@ -2152,6 +2291,28 @@ ExecModifyTable(PlanState *pstate)
 			return slot;
 	}
 
+	/*
+	 * Insert remaining tuples for batch insert.
+	 */
+	if (proute)
+		resultRelInfos = ExecGetTouchedPartitions(proute, &num_partitions);
+	else
+	{
+		resultRelInfos = &resultRelInfo;
+		num_partitions = 1;
+	}
+
+	for (int i = 0; i < num_partitions; i++)
+	{
+		resultRelInfo = resultRelInfos[i];
+		if (resultRelInfo->ri_NumSlots > 0)
+			ExecBatchInsert(node, resultRelInfo,
+						   resultRelInfo->ri_Slots,
+						   resultRelInfo->ri_PlanSlots,
+						   resultRelInfo->ri_NumSlots,
+						   estate, node->canSetTag);
+	}
+
 	/*
 	 * We're done, but fire AFTER STATEMENT triggers before exiting.
 	 */
diff --git a/src/backend/nodes/list.c b/src/backend/nodes/list.c
index c4eba6b053..dbf6b30233 100644
--- a/src/backend/nodes/list.c
+++ b/src/backend/nodes/list.c
@@ -277,6 +277,21 @@ list_make4_impl(NodeTag t, ListCell datum1, ListCell datum2,
 	return list;
 }
 
+List *
+list_make5_impl(NodeTag t, ListCell datum1, ListCell datum2,
+				ListCell datum3, ListCell datum4, ListCell datum5)
+{
+	List	   *list = new_list(t, 5);
+
+	list->elements[0] = datum1;
+	list->elements[1] = datum2;
+	list->elements[2] = datum3;
+	list->elements[3] = datum4;
+	list->elements[4] = datum5;
+	check_list_invariants(list);
+	return list;
+}
+
 /*
  * Make room for a new head cell in the given (non-NIL) list.
  *
diff --git a/src/include/executor/execPartition.h b/src/include/executor/execPartition.h
index d30ffde7d9..2bb5a85fb1 100644
--- a/src/include/executor/execPartition.h
+++ b/src/include/executor/execPartition.h
@@ -125,5 +125,6 @@ extern PartitionPruneState *ExecCreatePartitionPruneState(PlanState *planstate,
 extern Bitmapset *ExecFindMatchingSubPlans(PartitionPruneState *prunestate);
 extern Bitmapset *ExecFindInitialMatchingSubPlans(PartitionPruneState *prunestate,
 												  int nsubplans);
+extern ResultRelInfo **ExecGetTouchedPartitions(PartitionTupleRouting *proute, int *count);
 
 #endif							/* EXECPARTITION_H */
diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h
index 2953499fb1..7946ca82f6 100644
--- a/src/include/foreign/fdwapi.h
+++ b/src/include/foreign/fdwapi.h
@@ -85,6 +85,14 @@ typedef TupleTableSlot *(*ExecForeignInsert_function) (EState *estate,
 													   TupleTableSlot *slot,
 													   TupleTableSlot *planSlot);
 
+typedef TupleTableSlot **(*ExecForeignBatchInsert_function) (EState *estate,
+													   ResultRelInfo *rinfo,
+													   TupleTableSlot **slots,
+													   TupleTableSlot **planSlots,
+													   int *numSlots);
+
+typedef int (*GetModifyBatchSize_function) (ResultRelInfo *rinfo);
+
 typedef TupleTableSlot *(*ExecForeignUpdate_function) (EState *estate,
 													   ResultRelInfo *rinfo,
 													   TupleTableSlot *slot,
@@ -209,6 +217,8 @@ typedef struct FdwRoutine
 	PlanForeignModify_function PlanForeignModify;
 	BeginForeignModify_function BeginForeignModify;
 	ExecForeignInsert_function ExecForeignInsert;
+	ExecForeignBatchInsert_function ExecForeignBatchInsert;
+	GetModifyBatchSize_function GetModifyBatchSize;
 	ExecForeignUpdate_function ExecForeignUpdate;
 	ExecForeignDelete_function ExecForeignDelete;
 	EndForeignModify_function EndForeignModify;
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 48c3f570fa..d65099c94a 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -446,6 +446,12 @@ typedef struct ResultRelInfo
 	/* true when modifying foreign table directly */
 	bool		ri_usesFdwDirectModify;
 
+	/* batch insert stuff */
+	int			ri_NumSlots;		/* number of slots in the array */
+	int			ri_BatchSize;		/* max slots inserted in a single batch */
+	TupleTableSlot **ri_Slots;		/* input tuples for batch insert */
+	TupleTableSlot **ri_PlanSlots;
+
 	/* list of WithCheckOption's to be checked */
 	List	   *ri_WithCheckOptions;
 
diff --git a/src/include/nodes/pg_list.h b/src/include/nodes/pg_list.h
index 710dcd37ef..404e03f132 100644
--- a/src/include/nodes/pg_list.h
+++ b/src/include/nodes/pg_list.h
@@ -213,6 +213,10 @@ list_length(const List *l)
 #define list_make4(x1,x2,x3,x4) \
 	list_make4_impl(T_List, list_make_ptr_cell(x1), list_make_ptr_cell(x2), \
 					list_make_ptr_cell(x3), list_make_ptr_cell(x4))
+#define list_make5(x1,x2,x3,x4,x5) \
+	list_make5_impl(T_List, list_make_ptr_cell(x1), list_make_ptr_cell(x2), \
+					list_make_ptr_cell(x3), list_make_ptr_cell(x4), \
+					list_make_ptr_cell(x5))
 
 #define list_make1_int(x1) \
 	list_make1_impl(T_IntList, list_make_int_cell(x1))
@@ -224,6 +228,10 @@ list_length(const List *l)
 #define list_make4_int(x1,x2,x3,x4) \
 	list_make4_impl(T_IntList, list_make_int_cell(x1), list_make_int_cell(x2), \
 					list_make_int_cell(x3), list_make_int_cell(x4))
+#define list_make5_int(x1,x2,x3,x4,x5) \
+	list_make5_impl(T_IntList, list_make_int_cell(x1), list_make_int_cell(x2), \
+					list_make_int_cell(x3), list_make_int_cell(x4), \
+					list_make_int_cell(x5))
 
 #define list_make1_oid(x1) \
 	list_make1_impl(T_OidList, list_make_oid_cell(x1))
@@ -235,6 +243,10 @@ list_length(const List *l)
 #define list_make4_oid(x1,x2,x3,x4) \
 	list_make4_impl(T_OidList, list_make_oid_cell(x1), list_make_oid_cell(x2), \
 					list_make_oid_cell(x3), list_make_oid_cell(x4))
+#define list_make5_oid(x1,x2,x3,x4,x5) \
+	list_make5_impl(T_OidList, list_make_oid_cell(x1), list_make_oid_cell(x2), \
+					list_make_oid_cell(x3), list_make_oid_cell(x4), \
+					list_make_oid_cell(x5))
 
 /*
  * Locate the n'th cell (counting from 0) of the list.
@@ -520,6 +532,9 @@ extern List *list_make3_impl(NodeTag t, ListCell datum1, ListCell datum2,
 							 ListCell datum3);
 extern List *list_make4_impl(NodeTag t, ListCell datum1, ListCell datum2,
 							 ListCell datum3, ListCell datum4);
+extern List *list_make5_impl(NodeTag t, ListCell datum1, ListCell datum2,
+							 ListCell datum3, ListCell datum4,
+							 ListCell datum5);
 
 extern pg_nodiscard List *lappend(List *list, void *datum);
 extern pg_nodiscard List *lappend_int(List *list, int datum);
-- 
2.26.2

Reply via email to