(2018/02/02 19:33), Etsuro Fujita wrote:
(2018/01/25 23:33), Stephen Frost wrote:
I'm afraid a good bit of this patch is now failing to apply. I don't
have much else to say except to echo the performance concern that Amit
Langote raised about expanding the inheritence tree twice.

To address that concern, I'm thinking to redesign the patch so that it
wouldn't expand the tree at planning time anymore. I don't have any
clear solution for that yet, but what I have in mind now is to add new
FDW APIs to the executor, instead, so that the FDW could 1) create stuff
such as a query for remote INSERT as PlanForeignModify and 2)
initialize/end the remote INSERT operation as BeginForeignModify and
EndForeignModify, somewhere in the executor.

New FDW APIs I would like to propose for that are:

void
BeginForeignRouting(ModifyTableState *mtstate,
                    ResultRelInfo *resultRelInfo,
                    int partition_index);

Prepare for a tuple-routing operation on a foreign table. This is called from ExecSetupPartitionTupleRouting and ExecInitPartitionInfo.

TupleTableSlot *
ExecForeignRouting(EState *estate,
                   ResultRelInfo *resultRelInfo,
                   TupleTableSlot *slot);

Route one tuple to the foreign table.  This is called from ExecInsert.

void
EndForeignRouting(EState *estate,
                  ResultRelInfo *resultRelInfo);

End the operation and release resources. This is called from ExecCleanupTupleRouting.

Attached are WIP patches for that:

Patch postgres-fdw-refactoring-WIP.patch: refactoring patch for postgres_fdw.c to reduce duplicate code.

Patch foreign-routing-fdwapi-WIP.patch: main patch to add new FDW APIs, which is created on top of patch postgres-fdw-refactoring-WIP.patch and the lazy-initialization-of-partition-info patch [1].

By this change we don't need to expand the inheritance tree at planning time, so no need to worry about the performance concern. Maybe I'm missing something, though. Early feedback would be greatly appreciated.

Best regards,
Etsuro Fujita

[1] https://www.postgresql.org/message-id/5a8bfb31.6030...@lab.ntt.co.jp
*** a/contrib/file_fdw/output/file_fdw.source
--- b/contrib/file_fdw/output/file_fdw.source
***************
*** 315,321 **** SELECT tableoid::regclass, * FROM p2;
  (0 rows)
  
  COPY pt FROM '@abs_srcdir@/data/list2.bad' with (format 'csv', delimiter ','); -- ERROR
! ERROR:  cannot route inserted tuples to a foreign table
  CONTEXT:  COPY pt, line 2: "1,qux"
  COPY pt FROM '@abs_srcdir@/data/list2.csv' with (format 'csv', delimiter ',');
  SELECT tableoid::regclass, * FROM pt;
--- 315,321 ----
  (0 rows)
  
  COPY pt FROM '@abs_srcdir@/data/list2.bad' with (format 'csv', delimiter ','); -- ERROR
! ERROR:  cannot route copied tuples to a foreign table
  CONTEXT:  COPY pt, line 2: "1,qux"
  COPY pt FROM '@abs_srcdir@/data/list2.csv' with (format 'csv', delimiter ',');
  SELECT tableoid::regclass, * FROM pt;
***************
*** 342,351 **** SELECT tableoid::regclass, * FROM p2;
  (2 rows)
  
  INSERT INTO pt VALUES (1, 'xyzzy'); -- ERROR
! ERROR:  cannot route inserted tuples to a foreign table
  INSERT INTO pt VALUES (2, 'xyzzy');
  UPDATE pt set a = 1 where a = 2; -- ERROR
! ERROR:  cannot route inserted tuples to a foreign table
  SELECT tableoid::regclass, * FROM pt;
   tableoid | a |   b   
  ----------+---+-------
--- 342,351 ----
  (2 rows)
  
  INSERT INTO pt VALUES (1, 'xyzzy'); -- ERROR
! ERROR:  cannot route inserted tuples to foreign table "p1"
  INSERT INTO pt VALUES (2, 'xyzzy');
  UPDATE pt set a = 1 where a = 2; -- ERROR
! ERROR:  cannot route inserted tuples to foreign table "p1"
  SELECT tableoid::regclass, * FROM pt;
   tableoid | a |   b   
  ----------+---+-------
*** a/contrib/postgres_fdw/expected/postgres_fdw.out
--- b/contrib/postgres_fdw/expected/postgres_fdw.out
***************
*** 7364,7369 **** NOTICE:  drop cascades to foreign table bar2
--- 7364,7448 ----
  drop table loct1;
  drop table loct2;
  -- ===================================================================
+ -- test tuple routing for foreign-table partitions
+ -- ===================================================================
+ create table pt (a int, b int, c text) partition by list (a);
+ create table loct1 (a int check (a in (1)), b int, c text, constraint locp1_pkey primary key (b));
+ create table loct2 (b int, c text, a int check (a in (2)), constraint locp2_pkey primary key (b));
+ create foreign table ptp1 partition of pt for values in (1) server loopback options (table_name 'loct1');
+ create foreign table ptp2 partition of pt for values in (2) server loopback options (table_name 'loct2');
+ insert into pt values (1, 1, 'foo');
+ insert into pt values (1, 2, 'bar') returning *;
+  a | b |  c  
+ ---+---+-----
+  1 | 2 | bar
+ (1 row)
+ 
+ insert into pt values (2, 1, 'baz') returning *;
+  a | b |  c  
+ ---+---+-----
+  2 | 1 | baz
+ (1 row)
+ 
+ select tableoid::regclass, * FROM pt;
+  tableoid | a | b |  c  
+ ----------+---+---+-----
+  ptp1     | 1 | 1 | foo
+  ptp1     | 1 | 2 | bar
+  ptp2     | 2 | 1 | baz
+ (3 rows)
+ 
+ select tableoid::regclass, * FROM ptp1;
+  tableoid | a | b |  c  
+ ----------+---+---+-----
+  ptp1     | 1 | 1 | foo
+  ptp1     | 1 | 2 | bar
+ (2 rows)
+ 
+ select tableoid::regclass, * FROM ptp2;
+  tableoid | a | b |  c  
+ ----------+---+---+-----
+  ptp2     | 2 | 1 | baz
+ (1 row)
+ 
+ insert into pt values (2, 1, 'baz');
+ ERROR:  duplicate key value violates unique constraint "locp2_pkey"
+ DETAIL:  Key (b)=(1) already exists.
+ CONTEXT:  Remote SQL command: INSERT INTO public.loct2(a, b, c) VALUES ($1, $2, $3)
+ insert into pt values (2, 1, 'baz') on conflict do nothing;
+ insert into pt values (2, 2, 'qux') on conflict do nothing returning *;
+  a | b |  c  
+ ---+---+-----
+  2 | 2 | qux
+ (1 row)
+ 
+ select tableoid::regclass, * FROM pt;
+  tableoid | a | b |  c  
+ ----------+---+---+-----
+  ptp1     | 1 | 1 | foo
+  ptp1     | 1 | 2 | bar
+  ptp2     | 2 | 1 | baz
+  ptp2     | 2 | 2 | qux
+ (4 rows)
+ 
+ select tableoid::regclass, * FROM ptp1;
+  tableoid | a | b |  c  
+ ----------+---+---+-----
+  ptp1     | 1 | 1 | foo
+  ptp1     | 1 | 2 | bar
+ (2 rows)
+ 
+ select tableoid::regclass, * FROM ptp2;
+  tableoid | a | b |  c  
+ ----------+---+---+-----
+  ptp2     | 2 | 1 | baz
+  ptp2     | 2 | 2 | qux
+ (2 rows)
+ 
+ drop table pt;
+ drop table loct1;
+ drop table loct2;
+ -- ===================================================================
  -- test IMPORT FOREIGN SCHEMA
  -- ===================================================================
  CREATE SCHEMA import_source;
*** a/contrib/postgres_fdw/postgres_fdw.c
--- b/contrib/postgres_fdw/postgres_fdw.c
***************
*** 319,324 **** static TupleTableSlot *postgresExecForeignDelete(EState *estate,
--- 319,332 ----
  						  TupleTableSlot *planSlot);
  static void postgresEndForeignModify(EState *estate,
  						 ResultRelInfo *resultRelInfo);
+ static void postgresBeginForeignRouting(ModifyTableState *mtstate,
+ 							ResultRelInfo *resultRelInfo,
+ 							int partition_index);
+ static TupleTableSlot *postgresExecForeignRouting(EState *estate,
+ 						   ResultRelInfo *resultRelInfo,
+ 						   TupleTableSlot *slot);
+ static void postgresEndForeignRouting(EState *estate,
+ 						  ResultRelInfo *resultRelInfo);
  static int	postgresIsForeignRelUpdatable(Relation rel);
  static bool postgresPlanDirectModify(PlannerInfo *root,
  						 ModifyTable *plan,
***************
*** 473,478 **** postgres_fdw_handler(PG_FUNCTION_ARGS)
--- 481,489 ----
  	routine->ExecForeignUpdate = postgresExecForeignUpdate;
  	routine->ExecForeignDelete = postgresExecForeignDelete;
  	routine->EndForeignModify = postgresEndForeignModify;
+ 	routine->BeginForeignRouting = postgresBeginForeignRouting;
+ 	routine->ExecForeignRouting = postgresExecForeignRouting;
+ 	routine->EndForeignRouting = postgresEndForeignRouting;
  	routine->IsForeignRelUpdatable = postgresIsForeignRelUpdatable;
  	routine->PlanDirectModify = postgresPlanDirectModify;
  	routine->BeginDirectModify = postgresBeginDirectModify;
***************
*** 1857,1862 **** postgresEndForeignModify(EState *estate,
--- 1868,1986 ----
  }
  
  /*
+  * postgresBeginForeignRouting
+  *		Begin a row-routing operation on a foreign table
+  */
+ static void
+ postgresBeginForeignRouting(ModifyTableState *mtstate,
+ 							ResultRelInfo *resultRelInfo,
+ 							int partition_index)
+ {
+ 	PgFdwModifyState *fmstate;
+ 	Relation	rel = resultRelInfo->ri_RelationDesc;
+ 	TupleDesc	tupdesc = RelationGetDescr(rel);
+ 	int			attnum;
+ 	RangeTblEntry *rte;
+ 	Query	   *query;
+ 	PlannerInfo *root;
+ 	StringInfoData sql;
+ 	List	   *targetAttrs = NIL;
+ 	List	   *retrieved_attrs = NIL;
+ 	bool		doNothing = false;
+ 
+ 	initStringInfo(&sql);
+ 
+ 	/* Set up largely-dummy planner state */
+ 	rte = makeNode(RangeTblEntry);
+ 	rte->rtekind = RTE_RELATION;
+ 	rte->relid = RelationGetRelid(rel);
+ 	rte->relkind = RELKIND_FOREIGN_TABLE;
+ 	query = makeNode(Query);
+ 	query->commandType = CMD_INSERT;
+ 	query->resultRelation = 1;
+ 	query->rtable = list_make1(rte);
+ 	root = makeNode(PlannerInfo);
+ 	root->parse = query;
+ 
+ 	/* We transmit all columns that are defined in the foreign table. */
+ 	for (attnum = 1; attnum <= tupdesc->natts; attnum++)
+ 	{
+ 		Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
+ 
+ 		if (!attr->attisdropped)
+ 			targetAttrs = lappend_int(targetAttrs, attnum);
+ 	}
+ 
+ 	/* We only support DO NOTHING without an inference specification. */
+ 	if (mtstate->mt_onconflict == ONCONFLICT_NOTHING)
+ 		doNothing = true;
+ 	else if (mtstate->mt_onconflict != ONCONFLICT_NONE)
+ 		elog(ERROR, "unexpected ON CONFLICT specification: %d",
+ 			 (int) mtstate->mt_onconflict);
+ 
+ 	/* Construct the SQL command string. */
+ 	deparseInsertSql(&sql, root, 1, rel, targetAttrs, doNothing,
+ 					 resultRelInfo->ri_returningList, &retrieved_attrs);
+ 
+ 	/* Construct an execution state. */
+ 	fmstate = create_fdw_modify_state(mtstate,
+ 									  resultRelInfo,
+ 									  CMD_INSERT,
+ 									  0,	/* dummy subplan index */
+ 									  sql.data,
+ 									  targetAttrs,
+ 									  retrieved_attrs != NIL,
+ 									  retrieved_attrs);
+ 
+ 	resultRelInfo->ri_FdwState = fmstate;
+ }
+ 
+ /*
+  * postgresExecForeignRouting
+  *		Route one row to a foreign table
+  */
+ static TupleTableSlot *
+ postgresExecForeignRouting(EState *estate,
+ 						   ResultRelInfo *resultRelInfo,
+ 						   TupleTableSlot *slot)
+ {
+ 	PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
+ 	const char **p_values;
+ 	int			n_rows;
+ 
+ 	/* Set up the prepared statement on the remote server, if we didn't yet */
+ 	if (!fmstate->p_name)
+ 		prepare_foreign_modify(fmstate);
+ 
+ 	/* Convert parameters needed by prepared statement to text form */
+ 	p_values = convert_prep_stmt_params(fmstate, NULL, slot);
+ 
+ 	/* Execute the prepared statement and fetch RETURNING tuple if any */
+ 	n_rows = execute_prep_stmt(fmstate, p_values, slot);
+ 
+ 	MemoryContextReset(fmstate->temp_cxt);
+ 
+ 	/* Return NULL if nothing was inserted on the remote end */
+ 	return (n_rows > 0) ? slot : NULL;
+ }
+ 
+ /*
+  * postgresEndForeignRouting
+  *		Finish a row-routing operation on a foreign table
+  */
+ static void
+ postgresEndForeignRouting(EState *estate,
+ 						  ResultRelInfo *resultRelInfo)
+ {
+ 	PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
+ 
+ 	Assert(fmstate != NULL);
+ 
+ 	/* Destroy the execution state. */
+ 	finish_foreign_modify(fmstate);
+ }
+ 
+ /*
   * postgresIsForeignRelUpdatable
   *		Determine whether a foreign table supports INSERT, UPDATE and/or
   *		DELETE.
*** a/contrib/postgres_fdw/sql/postgres_fdw.sql
--- b/contrib/postgres_fdw/sql/postgres_fdw.sql
***************
*** 1759,1764 **** drop table loct1;
--- 1759,1794 ----
  drop table loct2;
  
  -- ===================================================================
+ -- test tuple routing for foreign-table partitions
+ -- ===================================================================
+ 
+ create table pt (a int, b int, c text) partition by list (a);
+ create table loct1 (a int check (a in (1)), b int, c text, constraint locp1_pkey primary key (b));
+ create table loct2 (b int, c text, a int check (a in (2)), constraint locp2_pkey primary key (b));
+ create foreign table ptp1 partition of pt for values in (1) server loopback options (table_name 'loct1');
+ create foreign table ptp2 partition of pt for values in (2) server loopback options (table_name 'loct2');
+ 
+ insert into pt values (1, 1, 'foo');
+ insert into pt values (1, 2, 'bar') returning *;
+ insert into pt values (2, 1, 'baz') returning *;
+ 
+ select tableoid::regclass, * FROM pt;
+ select tableoid::regclass, * FROM ptp1;
+ select tableoid::regclass, * FROM ptp2;
+ 
+ insert into pt values (2, 1, 'baz');
+ insert into pt values (2, 1, 'baz') on conflict do nothing;
+ insert into pt values (2, 2, 'qux') on conflict do nothing returning *;
+ 
+ select tableoid::regclass, * FROM pt;
+ select tableoid::regclass, * FROM ptp1;
+ select tableoid::regclass, * FROM ptp2;
+ 
+ drop table pt;
+ drop table loct1;
+ drop table loct2;
+ 
+ -- ===================================================================
  -- test IMPORT FOREIGN SCHEMA
  -- ===================================================================
  
*** a/src/backend/commands/copy.c
--- b/src/backend/commands/copy.c
***************
*** 2616,2626 **** CopyFrom(CopyState cstate)
  				Assert(resultRelInfo != NULL);
  			}
  
! 			/* We do not yet have a way to insert into a foreign partition */
  			if (resultRelInfo->ri_FdwRoutine)
  				ereport(ERROR,
  						(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
! 						 errmsg("cannot route inserted tuples to a foreign table")));
  
  			/*
  			 * For ExecInsertIndexTuples() to work on the partition's indexes
--- 2616,2626 ----
  				Assert(resultRelInfo != NULL);
  			}
  
! 			/* We do not yet have a way to copy into a foreign partition */
  			if (resultRelInfo->ri_FdwRoutine)
  				ereport(ERROR,
  						(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
! 						 errmsg("cannot route copied tuples to a foreign table")));
  
  			/*
  			 * For ExecInsertIndexTuples() to work on the partition's indexes
***************
*** 2814,2820 **** CopyFrom(CopyState cstate)
  
  	/* Close all the partitioned tables, leaf partitions, and their indices */
  	if (cstate->partition_tuple_routing)
! 		ExecCleanupTupleRouting(cstate->partition_tuple_routing);
  
  	/* Close any trigger target relations */
  	ExecCleanUpTriggerState(estate);
--- 2814,2820 ----
  
  	/* Close all the partitioned tables, leaf partitions, and their indices */
  	if (cstate->partition_tuple_routing)
! 		ExecCleanupTupleRouting(NULL, cstate->partition_tuple_routing);
  
  	/* Close any trigger target relations */
  	ExecCleanUpTriggerState(estate);
*** a/src/backend/executor/execMain.c
--- b/src/backend/executor/execMain.c
***************
*** 1172,1189 **** CheckValidResultRel(ResultRelInfo *resultRelInfo, CmdType operation)
  			switch (operation)
  			{
  				case CMD_INSERT:
- 
- 					/*
- 					 * If foreign partition to do tuple-routing for, skip the
- 					 * check; it's disallowed elsewhere.
- 					 */
  					if (resultRelInfo->ri_PartitionRoot)
! 						break;
! 					if (fdwroutine->ExecForeignInsert == NULL)
! 						ereport(ERROR,
! 								(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
! 								 errmsg("cannot insert into foreign table \"%s\"",
! 										RelationGetRelationName(resultRel))));
  					if (fdwroutine->IsForeignRelUpdatable != NULL &&
  						(fdwroutine->IsForeignRelUpdatable(resultRel) & (1 << CMD_INSERT)) == 0)
  						ereport(ERROR,
--- 1172,1193 ----
  			switch (operation)
  			{
  				case CMD_INSERT:
  					if (resultRelInfo->ri_PartitionRoot)
! 					{
! 						if (fdwroutine->ExecForeignRouting == NULL)
! 							ereport(ERROR,
! 									(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
! 									 errmsg("cannot route inserted tuples to foreign table \"%s\"",
! 											RelationGetRelationName(resultRel))));
! 					}
! 					else
! 					{
! 						if (fdwroutine->ExecForeignInsert == NULL)
! 							ereport(ERROR,
! 									(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
! 									 errmsg("cannot insert into foreign table \"%s\"",
! 											RelationGetRelationName(resultRel))));
! 					}
  					if (fdwroutine->IsForeignRelUpdatable != NULL &&
  						(fdwroutine->IsForeignRelUpdatable(resultRel) & (1 << CMD_INSERT)) == 0)
  						ereport(ERROR,
***************
*** 1364,1369 **** InitResultRelInfo(ResultRelInfo *resultRelInfo,
--- 1368,1374 ----
  
  	resultRelInfo->ri_PartitionCheck = partition_check;
  	resultRelInfo->ri_PartitionRoot = partition_root;
+ 	resultRelInfo->ri_PartitionIsValid = false;
  }
  
  /*
*** a/src/backend/executor/execPartition.c
--- b/src/backend/executor/execPartition.c
***************
*** 17,22 ****
--- 17,23 ----
  #include "catalog/pg_inherits_fn.h"
  #include "executor/execPartition.h"
  #include "executor/executor.h"
+ #include "foreign/fdwapi.h"
  #include "mb/pg_wchar.h"
  #include "miscadmin.h"
  #include "utils/lsyscache.h"
***************
*** 162,173 **** ExecSetupPartitionTupleRouting(ModifyTableState *mtstate, Relation rel)
  							   gettext_noop("could not convert row type"));
  
  				/*
! 				 * Verify result relation is a valid target for an INSERT.  An
! 				 * UPDATE of a partition-key becomes a DELETE+INSERT operation,
! 				 * so this check is required even when the operation is
! 				 * CMD_UPDATE.
  				 */
! 				CheckValidResultRel(leaf_part_rri, CMD_INSERT);
  			}
  		}
  
--- 163,175 ----
  							   gettext_noop("could not convert row type"));
  
  				/*
! 				 * Also let the FDW init itself if this parition is foreign.
  				 */
! 				if (leaf_part_rri->ri_FdwRoutine != NULL &&
! 					leaf_part_rri->ri_FdwRoutine->BeginForeignRouting != NULL)
! 					leaf_part_rri->ri_FdwRoutine->BeginForeignRouting(mtstate,
! 																	  leaf_part_rri,
! 																	  i);
  			}
  		}
  
***************
*** 342,354 **** ExecInitPartitionInfo(ModifyTableState *mtstate,
  					  estate->es_instrument);
  
  	/*
- 	 * Verify result relation is a valid target for an INSERT.  An UPDATE
- 	 * of a partition-key becomes a DELETE+INSERT operation, so this check
- 	 * is still required when the operation is CMD_UPDATE.
- 	 */
- 	CheckValidResultRel(leaf_part_rri, CMD_INSERT);
- 
- 	/*
  	 * Since we've just initialized this ResultRelInfo, it's not in
  	 * any list attached to the estate as yet.  Add it, so that it can
  	 * be found later.
--- 344,349 ----
***************
*** 468,473 **** ExecInitPartitionInfo(ModifyTableState *mtstate,
--- 463,469 ----
  		returningList = map_partition_varattnos(returningList, firstVarno,
  												partrel, firstResultRel,
  												NULL);
+ 		leaf_part_rri->ri_returningList = returningList;
  
  		/*
  		 * Initialize the projection itself.
***************
*** 498,503 **** ExecInitPartitionInfo(ModifyTableState *mtstate,
--- 494,509 ----
  
  	MemoryContextSwitchTo(oldContext);
  
+ 	/*
+ 	 * Also let the FDW init itself if this parition is foreign.
+ 	 */
+ 	if (mtstate &&
+ 		leaf_part_rri->ri_FdwRoutine != NULL &&
+ 		leaf_part_rri->ri_FdwRoutine->BeginForeignRouting != NULL)
+ 		leaf_part_rri->ri_FdwRoutine->BeginForeignRouting(mtstate,
+ 														  leaf_part_rri,
+ 														  partidx);
+ 
  	return leaf_part_rri;
  }
  
***************
*** 603,609 **** ConvertPartitionTupleSlot(TupleConversionMap *map,
   * Close all the partitioned tables, leaf partitions, and their indices.
   */
  void
! ExecCleanupTupleRouting(PartitionTupleRouting *proute)
  {
  	int			i;
  	int			subplan_index = 0;
--- 609,616 ----
   * Close all the partitioned tables, leaf partitions, and their indices.
   */
  void
! ExecCleanupTupleRouting(ModifyTableState *node,
! 						PartitionTupleRouting *proute)
  {
  	int			i;
  	int			subplan_index = 0;
***************
*** 632,637 **** ExecCleanupTupleRouting(PartitionTupleRouting *proute)
--- 639,653 ----
  			continue;
  
  		/*
+ 		 * Allow any FDWs to shut down
+ 		 */
+ 		if (node &&
+ 			resultRelInfo->ri_FdwRoutine != NULL &&
+ 			resultRelInfo->ri_FdwRoutine->EndForeignRouting != NULL)
+ 			resultRelInfo->ri_FdwRoutine->EndForeignRouting(node->ps.state,
+ 															resultRelInfo);
+ 
+ 		/*
  		 * If this result rel is one of the UPDATE subplan result rels, let
  		 * ExecEndPlan() close it. For INSERT or COPY,
  		 * proute->subplan_partition_offsets will always be NULL. Note that
*** a/src/backend/executor/nodeModifyTable.c
--- b/src/backend/executor/nodeModifyTable.c
***************
*** 319,329 **** ExecInsert(ModifyTableState *mtstate,
  			Assert(resultRelInfo != NULL);
  		}
  
! 		/* We do not yet have a way to insert into a foreign partition */
! 		if (resultRelInfo->ri_FdwRoutine)
! 			ereport(ERROR,
! 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
! 					 errmsg("cannot route inserted tuples to a foreign table")));
  
  		/* For ExecInsertIndexTuples() to work on the partition's indexes */
  		estate->es_result_relation_info = resultRelInfo;
--- 319,333 ----
  			Assert(resultRelInfo != NULL);
  		}
  
! 		/*
! 		 * Verify the specified partition is a valid target for tuple routing
! 		 * if not already done.
! 		 */
! 		if (!resultRelInfo->ri_PartitionIsValid)
! 		{
! 			CheckValidResultRel(resultRelInfo, CMD_INSERT);
! 			resultRelInfo->ri_PartitionIsValid = true;
! 		}
  
  		/* For ExecInsertIndexTuples() to work on the partition's indexes */
  		estate->es_result_relation_info = resultRelInfo;
***************
*** 433,442 **** ExecInsert(ModifyTableState *mtstate,
  		/*
  		 * insert into foreign table: let the FDW do it
  		 */
! 		slot = resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate,
! 															   resultRelInfo,
! 															   slot,
! 															   planSlot);
  
  		if (slot == NULL)		/* "do nothing" */
  			return NULL;
--- 437,451 ----
  		/*
  		 * insert into foreign table: let the FDW do it
  		 */
! 		if (resultRelInfo->ri_PartitionRoot)
! 			slot = resultRelInfo->ri_FdwRoutine->ExecForeignRouting(estate,
! 																	resultRelInfo,
! 																	slot);
! 		else
! 			slot = resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate,
! 																   resultRelInfo,
! 																   slot,
! 																   planSlot);
  
  		if (slot == NULL)		/* "do nothing" */
  			return NULL;
***************
*** 2311,2316 **** ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
--- 2320,2326 ----
  		{
  			List	   *rlist = (List *) lfirst(l);
  
+ 			resultRelInfo->ri_returningList = rlist;
  			resultRelInfo->ri_projectReturning =
  				ExecBuildProjectionInfo(rlist, econtext, slot, &mtstate->ps,
  										resultRelInfo->ri_RelationDesc->rd_att);
***************
*** 2569,2575 **** ExecEndModifyTable(ModifyTableState *node)
  
  	/* Close all the partitioned tables, leaf partitions, and their indices */
  	if (node->mt_partition_tuple_routing)
! 		ExecCleanupTupleRouting(node->mt_partition_tuple_routing);
  
  	/*
  	 * Free the exprcontext
--- 2579,2585 ----
  
  	/* Close all the partitioned tables, leaf partitions, and their indices */
  	if (node->mt_partition_tuple_routing)
! 		ExecCleanupTupleRouting(node, node->mt_partition_tuple_routing);
  
  	/*
  	 * Free the exprcontext
*** a/src/include/executor/execPartition.h
--- b/src/include/executor/execPartition.h
***************
*** 121,126 **** extern HeapTuple ConvertPartitionTupleSlot(TupleConversionMap *map,
  						  HeapTuple tuple,
  						  TupleTableSlot *new_slot,
  						  TupleTableSlot **p_my_slot);
! extern void ExecCleanupTupleRouting(PartitionTupleRouting *proute);
  
  #endif							/* EXECPARTITION_H */
--- 121,127 ----
  						  HeapTuple tuple,
  						  TupleTableSlot *new_slot,
  						  TupleTableSlot **p_my_slot);
! extern void ExecCleanupTupleRouting(ModifyTableState *node,
! 						PartitionTupleRouting *proute);
  
  #endif							/* EXECPARTITION_H */
*** a/src/include/foreign/fdwapi.h
--- b/src/include/foreign/fdwapi.h
***************
*** 97,102 **** typedef TupleTableSlot *(*ExecForeignDelete_function) (EState *estate,
--- 97,113 ----
  typedef void (*EndForeignModify_function) (EState *estate,
  										   ResultRelInfo *rinfo);
  
+ typedef void (*BeginForeignRouting_function) (ModifyTableState *mtstate,
+ 											  ResultRelInfo *rinfo,
+ 											  int partition_index);
+ 
+ typedef TupleTableSlot *(*ExecForeignRouting_function) (EState *estate,
+ 														ResultRelInfo *rinfo,
+ 														TupleTableSlot *slot);
+ 
+ typedef void (*EndForeignRouting_function) (EState *estate,
+ 											ResultRelInfo *rinfo);
+ 
  typedef int (*IsForeignRelUpdatable_function) (Relation rel);
  
  typedef bool (*PlanDirectModify_function) (PlannerInfo *root,
***************
*** 204,209 **** typedef struct FdwRoutine
--- 215,223 ----
  	ExecForeignUpdate_function ExecForeignUpdate;
  	ExecForeignDelete_function ExecForeignDelete;
  	EndForeignModify_function EndForeignModify;
+ 	BeginForeignRouting_function BeginForeignRouting;
+ 	ExecForeignRouting_function ExecForeignRouting;
+ 	EndForeignRouting_function EndForeignRouting;
  	IsForeignRelUpdatable_function IsForeignRelUpdatable;
  	PlanDirectModify_function PlanDirectModify;
  	BeginDirectModify_function BeginDirectModify;
*** a/src/include/nodes/execnodes.h
--- b/src/include/nodes/execnodes.h
***************
*** 403,408 **** typedef struct ResultRelInfo
--- 403,411 ----
  	/* list of WithCheckOption expr states */
  	List	   *ri_WithCheckOptionExprs;
  
+ 	/* list of RETURNING expressions */
+ 	List	   *ri_returningList;
+ 
  	/* array of constraint-checking expr states */
  	ExprState **ri_ConstraintExprs;
  
***************
*** 426,431 **** typedef struct ResultRelInfo
--- 429,437 ----
  
  	/* relation descriptor for root partitioned table */
  	Relation	ri_PartitionRoot;
+ 
+ 	/* true if valid target for tuple routing */
+ 	bool		ri_PartitionIsValid;
  } ResultRelInfo;
  
  /* ----------------
*** a/contrib/postgres_fdw/postgres_fdw.c
--- b/contrib/postgres_fdw/postgres_fdw.c
***************
*** 375,386 **** static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
--- 375,398 ----
  static void create_cursor(ForeignScanState *node);
  static void fetch_more_data(ForeignScanState *node);
  static void close_cursor(PGconn *conn, unsigned int cursor_number);
+ static PgFdwModifyState *create_fdw_modify_state(ModifyTableState *mtstate,
+ 						ResultRelInfo *resultRelInfo,
+ 						CmdType operation,
+ 						int subplan_index,
+ 						char *query,
+ 						List *target_attrs,
+ 						bool has_returning,
+ 						List *retrieved_attrs);
  static void prepare_foreign_modify(PgFdwModifyState *fmstate);
  static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
  						 ItemPointer tupleid,
  						 TupleTableSlot *slot);
+ static int execute_prep_stmt(PgFdwModifyState *fmstate,
+ 				  const char **p_values,
+ 				  TupleTableSlot *slot);
  static void store_returning_result(PgFdwModifyState *fmstate,
  					   TupleTableSlot *slot, PGresult *res);
+ static void finish_foreign_modify(PgFdwModifyState *fmstate);
  static List *build_remote_returning(Index rtindex, Relation rel,
  					   List *returningList);
  static void rebuild_fdw_scan_tlist(ForeignScan *fscan, List *tlist);
***************
*** 1678,1695 **** postgresBeginForeignModify(ModifyTableState *mtstate,
  						   int eflags)
  {
  	PgFdwModifyState *fmstate;
! 	EState	   *estate = mtstate->ps.state;
! 	CmdType		operation = mtstate->operation;
! 	Relation	rel = resultRelInfo->ri_RelationDesc;
! 	RangeTblEntry *rte;
! 	Oid			userid;
! 	ForeignTable *table;
! 	UserMapping *user;
! 	AttrNumber	n_params;
! 	Oid			typefnoid;
! 	bool		isvarlena;
! 	ListCell   *lc;
! 	TupleDesc	tupdesc = RelationGetDescr(rel);
  
  	/*
  	 * Do nothing in EXPLAIN (no ANALYZE) case.  resultRelInfo->ri_FdwState
--- 1690,1699 ----
  						   int eflags)
  {
  	PgFdwModifyState *fmstate;
! 	char	   *query;
! 	List	   *target_attrs;
! 	bool		has_returning;
! 	List	   *retrieved_attrs;
  
  	/*
  	 * Do nothing in EXPLAIN (no ANALYZE) case.  resultRelInfo->ri_FdwState
***************
*** 1698,1779 **** postgresBeginForeignModify(ModifyTableState *mtstate,
  	if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
  		return;
  
- 	/* Begin constructing PgFdwModifyState. */
- 	fmstate = (PgFdwModifyState *) palloc0(sizeof(PgFdwModifyState));
- 	fmstate->rel = rel;
- 
- 	/*
- 	 * Identify which user to do the remote access as.  This should match what
- 	 * ExecCheckRTEPerms() does.
- 	 */
- 	rte = rt_fetch(resultRelInfo->ri_RangeTableIndex, estate->es_range_table);
- 	userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
- 
- 	/* Get info about foreign table. */
- 	table = GetForeignTable(RelationGetRelid(rel));
- 	user = GetUserMapping(userid, table->serverid);
- 
- 	/* Open connection; report that we'll create a prepared statement. */
- 	fmstate->conn = GetConnection(user, true);
- 	fmstate->p_name = NULL;		/* prepared statement not made yet */
- 
  	/* Deconstruct fdw_private data. */
! 	fmstate->query = strVal(list_nth(fdw_private,
! 									 FdwModifyPrivateUpdateSql));
! 	fmstate->target_attrs = (List *) list_nth(fdw_private,
! 											  FdwModifyPrivateTargetAttnums);
! 	fmstate->has_returning = intVal(list_nth(fdw_private,
! 											 FdwModifyPrivateHasReturning));
! 	fmstate->retrieved_attrs = (List *) list_nth(fdw_private,
! 												 FdwModifyPrivateRetrievedAttrs);
! 
! 	/* Create context for per-tuple temp workspace. */
! 	fmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
! 											  "postgres_fdw temporary data",
! 											  ALLOCSET_SMALL_SIZES);
! 
! 	/* Prepare for input conversion of RETURNING results. */
! 	if (fmstate->has_returning)
! 		fmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc);
! 
! 	/* Prepare for output conversion of parameters used in prepared stmt. */
! 	n_params = list_length(fmstate->target_attrs) + 1;
! 	fmstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * n_params);
! 	fmstate->p_nums = 0;
! 
! 	if (operation == CMD_UPDATE || operation == CMD_DELETE)
! 	{
! 		/* Find the ctid resjunk column in the subplan's result */
! 		Plan	   *subplan = mtstate->mt_plans[subplan_index]->plan;
! 
! 		fmstate->ctidAttno = ExecFindJunkAttributeInTlist(subplan->targetlist,
! 														  "ctid");
! 		if (!AttributeNumberIsValid(fmstate->ctidAttno))
! 			elog(ERROR, "could not find junk ctid column");
! 
! 		/* First transmittable parameter will be ctid */
! 		getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena);
! 		fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
! 		fmstate->p_nums++;
! 	}
! 
! 	if (operation == CMD_INSERT || operation == CMD_UPDATE)
! 	{
! 		/* Set up for remaining transmittable parameters */
! 		foreach(lc, fmstate->target_attrs)
! 		{
! 			int			attnum = lfirst_int(lc);
! 			Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
  
! 			Assert(!attr->attisdropped);
! 
! 			getTypeOutputInfo(attr->atttypid, &typefnoid, &isvarlena);
! 			fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
! 			fmstate->p_nums++;
! 		}
! 	}
! 
! 	Assert(fmstate->p_nums <= n_params);
  
  	resultRelInfo->ri_FdwState = fmstate;
  }
--- 1702,1725 ----
  	if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
  		return;
  
  	/* Deconstruct fdw_private data. */
! 	query = strVal(list_nth(fdw_private,
! 							FdwModifyPrivateUpdateSql));
! 	target_attrs = (List *) list_nth(fdw_private,
! 									 FdwModifyPrivateTargetAttnums);
! 	has_returning = intVal(list_nth(fdw_private,
! 									FdwModifyPrivateHasReturning));
! 	retrieved_attrs = (List *) list_nth(fdw_private,
! 										FdwModifyPrivateRetrievedAttrs);
  
! 	/* Construct an execution state. */
! 	fmstate = create_fdw_modify_state(mtstate, resultRelInfo,
! 									  mtstate->operation,
! 									  subplan_index,
! 									  query,
! 									  target_attrs,
! 									  has_returning,
! 									  retrieved_attrs);
  
  	resultRelInfo->ri_FdwState = fmstate;
  }
***************
*** 1790,1796 **** postgresExecForeignInsert(EState *estate,
  {
  	PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
  	const char **p_values;
- 	PGresult   *res;
  	int			n_rows;
  
  	/* Set up the prepared statement on the remote server, if we didn't yet */
--- 1736,1741 ----
***************
*** 1800,1840 **** postgresExecForeignInsert(EState *estate,
  	/* Convert parameters needed by prepared statement to text form */
  	p_values = convert_prep_stmt_params(fmstate, NULL, slot);
  
! 	/*
! 	 * Execute the prepared statement.
! 	 */
! 	if (!PQsendQueryPrepared(fmstate->conn,
! 							 fmstate->p_name,
! 							 fmstate->p_nums,
! 							 p_values,
! 							 NULL,
! 							 NULL,
! 							 0))
! 		pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
! 
! 	/*
! 	 * Get the result, and check for success.
! 	 *
! 	 * We don't use a PG_TRY block here, so be careful not to throw error
! 	 * without releasing the PGresult.
! 	 */
! 	res = pgfdw_get_result(fmstate->conn, fmstate->query);
! 	if (PQresultStatus(res) !=
! 		(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
! 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
! 
! 	/* Check number of rows affected, and fetch RETURNING tuple if any */
! 	if (fmstate->has_returning)
! 	{
! 		n_rows = PQntuples(res);
! 		if (n_rows > 0)
! 			store_returning_result(fmstate, slot, res);
! 	}
! 	else
! 		n_rows = atoi(PQcmdTuples(res));
! 
! 	/* And clean up */
! 	PQclear(res);
  
  	MemoryContextReset(fmstate->temp_cxt);
  
--- 1745,1752 ----
  	/* Convert parameters needed by prepared statement to text form */
  	p_values = convert_prep_stmt_params(fmstate, NULL, slot);
  
! 	/* Execute the prepared statement and fetch RETURNING tuple if any */
! 	n_rows = execute_prep_stmt(fmstate, p_values, slot);
  
  	MemoryContextReset(fmstate->temp_cxt);
  
***************
*** 1856,1862 **** postgresExecForeignUpdate(EState *estate,
  	Datum		datum;
  	bool		isNull;
  	const char **p_values;
- 	PGresult   *res;
  	int			n_rows;
  
  	/* Set up the prepared statement on the remote server, if we didn't yet */
--- 1768,1773 ----
***************
*** 1876,1916 **** postgresExecForeignUpdate(EState *estate,
  										(ItemPointer) DatumGetPointer(datum),
  										slot);
  
! 	/*
! 	 * Execute the prepared statement.
! 	 */
! 	if (!PQsendQueryPrepared(fmstate->conn,
! 							 fmstate->p_name,
! 							 fmstate->p_nums,
! 							 p_values,
! 							 NULL,
! 							 NULL,
! 							 0))
! 		pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
! 
! 	/*
! 	 * Get the result, and check for success.
! 	 *
! 	 * We don't use a PG_TRY block here, so be careful not to throw error
! 	 * without releasing the PGresult.
! 	 */
! 	res = pgfdw_get_result(fmstate->conn, fmstate->query);
! 	if (PQresultStatus(res) !=
! 		(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
! 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
! 
! 	/* Check number of rows affected, and fetch RETURNING tuple if any */
! 	if (fmstate->has_returning)
! 	{
! 		n_rows = PQntuples(res);
! 		if (n_rows > 0)
! 			store_returning_result(fmstate, slot, res);
! 	}
! 	else
! 		n_rows = atoi(PQcmdTuples(res));
! 
! 	/* And clean up */
! 	PQclear(res);
  
  	MemoryContextReset(fmstate->temp_cxt);
  
--- 1787,1794 ----
  										(ItemPointer) DatumGetPointer(datum),
  										slot);
  
! 	/* Execute the prepared statement and fetch RETURNING tuple if any */
! 	n_rows = execute_prep_stmt(fmstate, p_values, slot);
  
  	MemoryContextReset(fmstate->temp_cxt);
  
***************
*** 1932,1938 **** postgresExecForeignDelete(EState *estate,
  	Datum		datum;
  	bool		isNull;
  	const char **p_values;
- 	PGresult   *res;
  	int			n_rows;
  
  	/* Set up the prepared statement on the remote server, if we didn't yet */
--- 1810,1815 ----
***************
*** 1952,1992 **** postgresExecForeignDelete(EState *estate,
  										(ItemPointer) DatumGetPointer(datum),
  										NULL);
  
! 	/*
! 	 * Execute the prepared statement.
! 	 */
! 	if (!PQsendQueryPrepared(fmstate->conn,
! 							 fmstate->p_name,
! 							 fmstate->p_nums,
! 							 p_values,
! 							 NULL,
! 							 NULL,
! 							 0))
! 		pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
! 
! 	/*
! 	 * Get the result, and check for success.
! 	 *
! 	 * We don't use a PG_TRY block here, so be careful not to throw error
! 	 * without releasing the PGresult.
! 	 */
! 	res = pgfdw_get_result(fmstate->conn, fmstate->query);
! 	if (PQresultStatus(res) !=
! 		(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
! 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
! 
! 	/* Check number of rows affected, and fetch RETURNING tuple if any */
! 	if (fmstate->has_returning)
! 	{
! 		n_rows = PQntuples(res);
! 		if (n_rows > 0)
! 			store_returning_result(fmstate, slot, res);
! 	}
! 	else
! 		n_rows = atoi(PQcmdTuples(res));
! 
! 	/* And clean up */
! 	PQclear(res);
  
  	MemoryContextReset(fmstate->temp_cxt);
  
--- 1829,1836 ----
  										(ItemPointer) DatumGetPointer(datum),
  										NULL);
  
! 	/* Execute the prepared statement and fetch RETURNING tuple if any */
! 	n_rows = execute_prep_stmt(fmstate, p_values, slot);
  
  	MemoryContextReset(fmstate->temp_cxt);
  
***************
*** 2008,2035 **** postgresEndForeignModify(EState *estate,
  	if (fmstate == NULL)
  		return;
  
! 	/* If we created a prepared statement, destroy it */
! 	if (fmstate->p_name)
! 	{
! 		char		sql[64];
! 		PGresult   *res;
! 
! 		snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name);
! 
! 		/*
! 		 * We don't use a PG_TRY block here, so be careful not to throw error
! 		 * without releasing the PGresult.
! 		 */
! 		res = pgfdw_exec_query(fmstate->conn, sql);
! 		if (PQresultStatus(res) != PGRES_COMMAND_OK)
! 			pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
! 		PQclear(res);
! 		fmstate->p_name = NULL;
! 	}
! 
! 	/* Release remote connection */
! 	ReleaseConnection(fmstate->conn);
! 	fmstate->conn = NULL;
  }
  
  /*
--- 1852,1859 ----
  	if (fmstate == NULL)
  		return;
  
! 	/* Destroy the execution state. */
! 	finish_foreign_modify(fmstate);
  }
  
  /*
***************
*** 3217,3222 **** close_cursor(PGconn *conn, unsigned int cursor_number)
--- 3041,3150 ----
  }
  
  /*
+  * create_fdw_modify_state
+  *		Construct an execution state of a foreign insert/update/delete
+  *		operation.
+  */
+ static PgFdwModifyState *
+ create_fdw_modify_state(ModifyTableState *mtstate,
+ 						ResultRelInfo *resultRelInfo,
+ 						CmdType operation,
+ 						int subplan_index,
+ 						char *query,
+ 						List *target_attrs,
+ 						bool has_returning,
+ 						List *retrieved_attrs)
+ {
+ 	PgFdwModifyState *fmstate;
+ 	EState	   *estate = mtstate->ps.state;
+ 	Relation	rel = resultRelInfo->ri_RelationDesc;
+ 	RangeTblEntry *rte;
+ 	Oid			userid;
+ 	ForeignTable *table;
+ 	UserMapping *user;
+ 	AttrNumber	n_params;
+ 	Oid			typefnoid;
+ 	bool		isvarlena;
+ 	ListCell   *lc;
+ 	TupleDesc	tupdesc = RelationGetDescr(rel);
+ 
+ 	/* Begin constructing PgFdwModifyState. */
+ 	fmstate = (PgFdwModifyState *) palloc0(sizeof(PgFdwModifyState));
+ 	fmstate->rel = rel;
+ 
+ 	/*
+ 	 * Identify which user to do the remote access as.  This should match what
+ 	 * ExecCheckRTEPerms() does.
+ 	 */
+ 	rte = rt_fetch(resultRelInfo->ri_RangeTableIndex, estate->es_range_table);
+ 	userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
+ 
+ 	/* Get info about foreign table. */
+ 	table = GetForeignTable(RelationGetRelid(rel));
+ 	user = GetUserMapping(userid, table->serverid);
+ 
+ 	/* Open connection; report that we'll create a prepared statement. */
+ 	fmstate->conn = GetConnection(user, true);
+ 	fmstate->p_name = NULL;		/* prepared statement not made yet */
+ 
+ 	/* Set remote query information. */
+ 	fmstate->query = query;
+ 	fmstate->target_attrs = target_attrs;
+ 	fmstate->has_returning = has_returning;
+ 	fmstate->retrieved_attrs = retrieved_attrs;
+ 
+ 	/* Create context for per-tuple temp workspace. */
+ 	fmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
+ 											  "postgres_fdw temporary data",
+ 											  ALLOCSET_SMALL_SIZES);
+ 
+ 	/* Prepare for input conversion of RETURNING results. */
+ 	if (fmstate->has_returning)
+ 		fmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc);
+ 
+ 	/* Prepare for output conversion of parameters used in prepared stmt. */
+ 	n_params = list_length(fmstate->target_attrs) + 1;
+ 	fmstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * n_params);
+ 	fmstate->p_nums = 0;
+ 
+ 	if (operation == CMD_UPDATE || operation == CMD_DELETE)
+ 	{
+ 		/* Find the ctid resjunk column in the subplan's result */
+ 		Plan	   *subplan = mtstate->mt_plans[subplan_index]->plan;
+ 
+ 		fmstate->ctidAttno = ExecFindJunkAttributeInTlist(subplan->targetlist,
+ 														  "ctid");
+ 		if (!AttributeNumberIsValid(fmstate->ctidAttno))
+ 			elog(ERROR, "could not find junk ctid column");
+ 
+ 		/* First transmittable parameter will be ctid */
+ 		getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena);
+ 		fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
+ 		fmstate->p_nums++;
+ 	}
+ 
+ 	if (operation == CMD_INSERT || operation == CMD_UPDATE)
+ 	{
+ 		/* Set up for remaining transmittable parameters */
+ 		foreach(lc, fmstate->target_attrs)
+ 		{
+ 			int			attnum = lfirst_int(lc);
+ 			Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
+ 
+ 			Assert(!attr->attisdropped);
+ 
+ 			getTypeOutputInfo(attr->atttypid, &typefnoid, &isvarlena);
+ 			fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
+ 			fmstate->p_nums++;
+ 		}
+ 	}
+ 
+ 	Assert(fmstate->p_nums <= n_params);
+ 
+ 	return fmstate;
+ }
+ 
+ /*
   * prepare_foreign_modify
   *		Establish a prepared statement for execution of INSERT/UPDATE/DELETE
   */
***************
*** 3326,3331 **** convert_prep_stmt_params(PgFdwModifyState *fmstate,
--- 3254,3310 ----
  }
  
  /*
+  * execute_prep_stmt
+  *		Execute the prepared statement and fetch RETURNING tuple if any
+  */
+ static int
+ execute_prep_stmt(PgFdwModifyState *fmstate,
+ 				  const char **p_values,
+ 				  TupleTableSlot *slot)
+ {
+ 	PGresult   *res;
+ 	int			n_rows;
+ 
+ 	/*
+ 	 * Execute the prepared statement.
+ 	 */
+ 	if (!PQsendQueryPrepared(fmstate->conn,
+ 							 fmstate->p_name,
+ 							 fmstate->p_nums,
+ 							 p_values,
+ 							 NULL,
+ 							 NULL,
+ 							 0))
+ 		pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
+ 
+ 	/*
+ 	 * Get the result, and check for success.
+ 	 *
+ 	 * We don't use a PG_TRY block here, so be careful not to throw error
+ 	 * without releasing the PGresult.
+ 	 */
+ 	res = pgfdw_get_result(fmstate->conn, fmstate->query);
+ 	if (PQresultStatus(res) !=
+ 		(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
+ 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
+ 
+ 	/* Check number of rows affected, and fetch RETURNING tuple if any */
+ 	if (fmstate->has_returning)
+ 	{
+ 		n_rows = PQntuples(res);
+ 		if (n_rows > 0)
+ 			store_returning_result(fmstate, slot, res);
+ 	}
+ 	else
+ 		n_rows = atoi(PQcmdTuples(res));
+ 
+ 	/* And clean up */
+ 	PQclear(res);
+ 
+ 	return n_rows;
+ }
+ 
+ /*
   * store_returning_result
   *		Store the result of a RETURNING clause
   *
***************
*** 3359,3364 **** store_returning_result(PgFdwModifyState *fmstate,
--- 3338,3376 ----
  }
  
  /*
+  * finish_foreign_modify
+  *		Release resources for a foreign insert/update/delete operation.
+  */
+ static void
+ finish_foreign_modify(PgFdwModifyState *fmstate)
+ {
+ 	Assert(fmstate != NULL);
+ 
+ 	/* If we created a prepared statement, destroy it */
+ 	if (fmstate->p_name)
+ 	{
+ 		char		sql[64];
+ 		PGresult   *res;
+ 
+ 		snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name);
+ 
+ 		/*
+ 		 * We don't use a PG_TRY block here, so be careful not to throw error
+ 		 * without releasing the PGresult.
+ 		 */
+ 		res = pgfdw_exec_query(fmstate->conn, sql);
+ 		if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ 			pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
+ 		PQclear(res);
+ 		fmstate->p_name = NULL;
+ 	}
+ 
+ 	/* Release remote connection */
+ 	ReleaseConnection(fmstate->conn);
+ 	fmstate->conn = NULL;
+ }
+ 
+ /*
   * build_remote_returning
   *		Build a RETURNING targetlist of a remote query for performing an
   *		UPDATE/DELETE .. RETURNING on a join directly

Reply via email to