While working on the issue "Foreign join pushdown vs EvalPlanQual", I
happened to notice odd behaviors of late row locking in FDWs.  An
example will be shown below using the attached postgres_fdw patch, which
is basically the same as [1], but I changed its isolation level to READ
COMMITTED and modified it so that the output parameter "updated" of
RefetchForeignRow is updated accordingly.

[Create an environment]

mydatabase=# create table mytable (a int);
mydatabase=# insert into mytable values (1);
postgres=# create extension postgres_fdw;
postgres=# create server myserver foreign data wrapper postgres_fdw
options (dbname 'mydatabase');
postgres=# create user mapping for current_user server myserver;
postgres=# create foreign table ftable (a int) server myserver options
(table_name 'mytable');

[Run concurrent transactions that behave oddly]

mydatabase=# begin;
mydatabase=# update mytable set a = a + 1;
postgres=# select * from ftable where a = 1 for update;
mydatabase=# commit;
(After the commit, the following result will be shown in the terminal
for the postgres database.  Note that the result doesn't satify a = 1.)
 a
---
 2
(1 row)

I think the reason for that is because we don't check pushed-down quals
inside an EPQ testing even if what was fetched by RefetchForeignRow was
an updated version of the tuple rather than the same version previously
obtained.  So, to fix this, I'd like to propose that pushed-down quals
be checked in ForeignRecheck.

Comments welcome!

Best regards,
Etsuro Fujita

[1] http://www.postgresql.org/message-id/16016.1431455...@sss.pgh.pa.us
*** a/contrib/postgres_fdw/connection.c
--- b/contrib/postgres_fdw/connection.c
***************
*** 384,390 **** begin_remote_xact(ConnCacheEntry *entry)
  		if (IsolationIsSerializable())
  			sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
  		else
! 			sql = "START TRANSACTION ISOLATION LEVEL REPEATABLE READ";
  		do_sql_command(entry->conn, sql);
  		entry->xact_depth = 1;
  	}
--- 384,390 ----
  		if (IsolationIsSerializable())
  			sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
  		else
! 			sql = "START TRANSACTION ISOLATION LEVEL READ COMMITTED";
  		do_sql_command(entry->conn, sql);
  		entry->xact_depth = 1;
  	}
*** a/contrib/postgres_fdw/postgres_fdw.c
--- b/contrib/postgres_fdw/postgres_fdw.c
***************
*** 89,94 **** typedef struct PgFdwRelationInfo
--- 89,96 ----
   *
   * 1) SELECT statement text to be sent to the remote server
   * 2) Integer list of attribute numbers retrieved by the SELECT
+  * 3) SELECT statement text to be sent to the remote server
+  * 4) Integer list of attribute numbers retrieved by the SELECT
   *
   * These items are indexed with the enum FdwScanPrivateIndex, so an item
   * can be fetched with list_nth().  For example, to get the SELECT statement:
***************
*** 99,105 **** enum FdwScanPrivateIndex
  	/* SQL statement to execute remotely (as a String node) */
  	FdwScanPrivateSelectSql,
  	/* Integer list of attribute numbers retrieved by the SELECT */
! 	FdwScanPrivateRetrievedAttrs
  };
  
  /*
--- 101,111 ----
  	/* SQL statement to execute remotely (as a String node) */
  	FdwScanPrivateSelectSql,
  	/* Integer list of attribute numbers retrieved by the SELECT */
! 	FdwScanPrivateRetrievedAttrs,
! 	/* SQL statement to execute remotely (as a String node) */
! 	FdwScanPrivateSelectSql2,
! 	/* Integer list of attribute numbers retrieved by SELECT */
! 	FdwScanPrivateRetrievedAttrs2
  };
  
  /*
***************
*** 187,192 **** typedef struct PgFdwModifyState
--- 193,222 ----
  } PgFdwModifyState;
  
  /*
+  * Execution state for fetching/locking foreign rows.
+  */
+ typedef struct PgFdwFetchState
+ {
+ 	Relation	rel;			/* relcache entry for the foreign table */
+ 	AttInMetadata *attinmeta;	/* attribute datatype conversion metadata */
+ 
+ 	/* for remote query execution */
+ 	PGconn	   *conn;			/* connection for the fetch */
+ 	char	   *p_name;			/* name of prepared statement, if created */
+ 
+ 	/* extracted fdw_private data */
+ 	char	   *query;			/* text of SELECT command */
+ 	List	   *retrieved_attrs;	/* attr numbers retrieved by SELECT */
+ 
+ 	/* info about parameters for prepared statement */
+ 	int			p_nums;			/* number of parameters to transmit */
+ 	FmgrInfo   *p_flinfo;		/* output conversion functions for them */
+ 
+ 	/* working memory context */
+ 	MemoryContext temp_cxt;		/* context for per-tuple temporary data */
+ } PgFdwFetchState;
+ 
+ /*
   * Workspace for analyzing a foreign table.
   */
  typedef struct PgFdwAnalyzeState
***************
*** 277,282 **** static TupleTableSlot *postgresExecForeignDelete(EState *estate,
--- 307,318 ----
  static void postgresEndForeignModify(EState *estate,
  						 ResultRelInfo *resultRelInfo);
  static int	postgresIsForeignRelUpdatable(Relation rel);
+ static RowMarkType postgresGetForeignRowMarkType(RangeTblEntry *rte,
+ 							  LockClauseStrength strength);
+ static HeapTuple postgresRefetchForeignRow(EState *estate,
+ 						  ExecRowMark *erm,
+ 						  Datum rowid,
+ 						  bool *updated);
  static void postgresExplainForeignScan(ForeignScanState *node,
  						   ExplainState *es);
  static void postgresExplainForeignModify(ModifyTableState *mtstate,
***************
*** 307,321 **** static void get_remote_estimate(const char *sql,
  static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
  						  EquivalenceClass *ec, EquivalenceMember *em,
  						  void *arg);
  static void create_cursor(ForeignScanState *node);
  static void fetch_more_data(ForeignScanState *node);
  static void close_cursor(PGconn *conn, unsigned int cursor_number);
! static void prepare_foreign_modify(PgFdwModifyState *fmstate);
! static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
! 						 ItemPointer tupleid,
! 						 TupleTableSlot *slot);
  static void store_returning_result(PgFdwModifyState *fmstate,
  					   TupleTableSlot *slot, PGresult *res);
  static int postgresAcquireSampleRowsFunc(Relation relation, int elevel,
  							  HeapTuple *rows, int targrows,
  							  double *totalrows,
--- 343,368 ----
  static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
  						  EquivalenceClass *ec, EquivalenceMember *em,
  						  void *arg);
+ static List *create_foreign_fetch_info(PlannerInfo *root,
+ 						  RelOptInfo *baserel,
+ 						  RowMarkType markType);
  static void create_cursor(ForeignScanState *node);
  static void fetch_more_data(ForeignScanState *node);
  static void close_cursor(PGconn *conn, unsigned int cursor_number);
! static char *setup_prep_stmt(PGconn *conn, char *query);
! static const char **convert_prep_stmt_params(ItemPointer tupleid,
! 						 TupleTableSlot *slot,
! 						 int p_nums,
! 						 FmgrInfo *p_flinfo,
! 						 List *target_attrs,
! 						 MemoryContext temp_context);
  static void store_returning_result(PgFdwModifyState *fmstate,
  					   TupleTableSlot *slot, PGresult *res);
+ static void init_foreign_fetch_state(EState *estate,
+ 						 ExecRowMark *erm,
+ 						 List *fdw_private,
+ 						 int eflags);
+ static void finish_foreign_fetch_state(EState *estate, ExecRowMark *erm);
  static int postgresAcquireSampleRowsFunc(Relation relation, int elevel,
  							  HeapTuple *rows, int targrows,
  							  double *totalrows,
***************
*** 359,364 **** postgres_fdw_handler(PG_FUNCTION_ARGS)
--- 406,415 ----
  	routine->EndForeignModify = postgresEndForeignModify;
  	routine->IsForeignRelUpdatable = postgresIsForeignRelUpdatable;
  
+ 	/* Functions for SELECT FOR UPDATE/SHARE row locking */
+ 	routine->GetForeignRowMarkType = postgresGetForeignRowMarkType;
+ 	routine->RefetchForeignRow = postgresRefetchForeignRow;
+ 
  	/* Support functions for EXPLAIN */
  	routine->ExplainForeignScan = postgresExplainForeignScan;
  	routine->ExplainForeignModify = postgresExplainForeignModify;
***************
*** 747,752 **** postgresGetForeignPlan(PlannerInfo *root,
--- 798,804 ----
  	PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) baserel->fdw_private;
  	Index		scan_relid = baserel->relid;
  	List	   *fdw_private;
+ 	List	   *fdw_private2 = NIL;
  	List	   *remote_conds = NIL;
  	List	   *local_exprs = NIL;
  	List	   *params_list = NIL;
***************
*** 837,856 **** postgresGetForeignPlan(PlannerInfo *root,
  			 * complete information about, and (b) it wouldn't work anyway on
  			 * older remote servers.  Likewise, we don't worry about NOWAIT.
  			 */
! 			switch (rc->strength)
  			{
! 				case LCS_NONE:
! 					/* No locking needed */
! 					break;
! 				case LCS_FORKEYSHARE:
! 				case LCS_FORSHARE:
! 					appendStringInfoString(&sql, " FOR SHARE");
! 					break;
! 				case LCS_FORNOKEYUPDATE:
! 				case LCS_FORUPDATE:
! 					appendStringInfoString(&sql, " FOR UPDATE");
! 					break;
  			}
  		}
  	}
  
--- 889,914 ----
  			 * complete information about, and (b) it wouldn't work anyway on
  			 * older remote servers.  Likewise, we don't worry about NOWAIT.
  			 */
! 			if (rc->markType == ROW_MARK_COPY)
  			{
! 				switch (rc->strength)
! 				{
! 					case LCS_NONE:
! 						/* No locking needed */
! 						break;
! 					case LCS_FORKEYSHARE:
! 					case LCS_FORSHARE:
! 						appendStringInfoString(&sql, " FOR SHARE");
! 						break;
! 					case LCS_FORNOKEYUPDATE:
! 					case LCS_FORUPDATE:
! 						appendStringInfoString(&sql, " FOR UPDATE");
! 						break;
! 				}
  			}
+ 			else
+ 				fdw_private2 = create_foreign_fetch_info(root, baserel,
+ 														 rc->markType);
  		}
  	}
  
***************
*** 860,865 **** postgresGetForeignPlan(PlannerInfo *root,
--- 918,925 ----
  	 */
  	fdw_private = list_make2(makeString(sql.data),
  							 retrieved_attrs);
+ 	if (fdw_private2)
+ 		fdw_private = list_concat(fdw_private, fdw_private2);
  
  	/*
  	 * Create the ForeignScan node from target list, local filtering
***************
*** 888,893 **** postgresBeginForeignScan(ForeignScanState *node, int eflags)
--- 948,954 ----
  	EState	   *estate = node->ss.ps.state;
  	PgFdwScanState *fsstate;
  	RangeTblEntry *rte;
+ 	ExecRowMark *erm;
  	Oid			userid;
  	ForeignTable *table;
  	ForeignServer *server;
***************
*** 988,993 **** postgresBeginForeignScan(ForeignScanState *node, int eflags)
--- 1049,1061 ----
  		fsstate->param_values = (const char **) palloc0(numParams * sizeof(char *));
  	else
  		fsstate->param_values = NULL;
+ 
+ 	/*
+ 	 * Initialize state for fetching/locking foreign rows if needed.
+ 	 */
+ 	erm = ExecFindRowMark(estate, fsplan->scan.scanrelid, true);
+ 	if (erm && erm->relation && erm->ermExtra == NULL)
+ 		init_foreign_fetch_state(estate, erm, fsplan->fdw_private, eflags);
  }
  
  /*
***************
*** 1095,1101 **** postgresReScanForeignScan(ForeignScanState *node)
--- 1163,1172 ----
  static void
  postgresEndForeignScan(ForeignScanState *node)
  {
+ 	ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
+ 	EState	   *estate = node->ss.ps.state;
  	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+ 	ExecRowMark *erm;
  
  	/* if fsstate is NULL, we are in EXPLAIN; nothing to do */
  	if (fsstate == NULL)
***************
*** 1109,1114 **** postgresEndForeignScan(ForeignScanState *node)
--- 1180,1192 ----
  	ReleaseConnection(fsstate->conn);
  	fsstate->conn = NULL;
  
+ 	/*
+ 	 * Finish state for fetching/locking foreign rows if needed.
+ 	 */
+ 	erm = ExecFindRowMark(estate, fsplan->scan.scanrelid, true);
+ 	if (erm && erm->relation && erm->ermExtra != NULL)
+ 		finish_foreign_fetch_state(estate, erm);
+ 
  	/* MemoryContexts will be deleted automatically. */
  }
  
***************
*** 1406,1415 **** postgresExecForeignInsert(EState *estate,
  
  	/* Set up the prepared statement on the remote server, if we didn't yet */
  	if (!fmstate->p_name)
! 		prepare_foreign_modify(fmstate);
  
  	/* Convert parameters needed by prepared statement to text form */
! 	p_values = convert_prep_stmt_params(fmstate, NULL, slot);
  
  	/*
  	 * Execute the prepared statement, and check for success.
--- 1484,1497 ----
  
  	/* Set up the prepared statement on the remote server, if we didn't yet */
  	if (!fmstate->p_name)
! 		fmstate->p_name = setup_prep_stmt(fmstate->conn, fmstate->query);
  
  	/* Convert parameters needed by prepared statement to text form */
! 	p_values = convert_prep_stmt_params(NULL, slot,
! 										fmstate->p_nums,
! 										fmstate->p_flinfo,
! 										fmstate->target_attrs,
! 										fmstate->temp_cxt);
  
  	/*
  	 * Execute the prepared statement, and check for success.
***************
*** 1466,1472 **** postgresExecForeignUpdate(EState *estate,
  
  	/* Set up the prepared statement on the remote server, if we didn't yet */
  	if (!fmstate->p_name)
! 		prepare_foreign_modify(fmstate);
  
  	/* Get the ctid that was passed up as a resjunk column */
  	datum = ExecGetJunkAttribute(planSlot,
--- 1548,1554 ----
  
  	/* Set up the prepared statement on the remote server, if we didn't yet */
  	if (!fmstate->p_name)
! 		fmstate->p_name = setup_prep_stmt(fmstate->conn, fmstate->query);
  
  	/* Get the ctid that was passed up as a resjunk column */
  	datum = ExecGetJunkAttribute(planSlot,
***************
*** 1477,1485 **** postgresExecForeignUpdate(EState *estate,
  		elog(ERROR, "ctid is NULL");
  
  	/* Convert parameters needed by prepared statement to text form */
! 	p_values = convert_prep_stmt_params(fmstate,
! 										(ItemPointer) DatumGetPointer(datum),
! 										slot);
  
  	/*
  	 * Execute the prepared statement, and check for success.
--- 1559,1570 ----
  		elog(ERROR, "ctid is NULL");
  
  	/* Convert parameters needed by prepared statement to text form */
! 	p_values = convert_prep_stmt_params((ItemPointer) DatumGetPointer(datum),
! 										slot,
! 										fmstate->p_nums,
! 										fmstate->p_flinfo,
! 										fmstate->target_attrs,
! 										fmstate->temp_cxt);
  
  	/*
  	 * Execute the prepared statement, and check for success.
***************
*** 1536,1542 **** postgresExecForeignDelete(EState *estate,
  
  	/* Set up the prepared statement on the remote server, if we didn't yet */
  	if (!fmstate->p_name)
! 		prepare_foreign_modify(fmstate);
  
  	/* Get the ctid that was passed up as a resjunk column */
  	datum = ExecGetJunkAttribute(planSlot,
--- 1621,1627 ----
  
  	/* Set up the prepared statement on the remote server, if we didn't yet */
  	if (!fmstate->p_name)
! 		fmstate->p_name = setup_prep_stmt(fmstate->conn, fmstate->query);
  
  	/* Get the ctid that was passed up as a resjunk column */
  	datum = ExecGetJunkAttribute(planSlot,
***************
*** 1547,1555 **** postgresExecForeignDelete(EState *estate,
  		elog(ERROR, "ctid is NULL");
  
  	/* Convert parameters needed by prepared statement to text form */
! 	p_values = convert_prep_stmt_params(fmstate,
! 										(ItemPointer) DatumGetPointer(datum),
! 										NULL);
  
  	/*
  	 * Execute the prepared statement, and check for success.
--- 1632,1643 ----
  		elog(ERROR, "ctid is NULL");
  
  	/* Convert parameters needed by prepared statement to text form */
! 	p_values = convert_prep_stmt_params((ItemPointer) DatumGetPointer(datum),
! 										NULL,
! 										fmstate->p_nums,
! 										fmstate->p_flinfo,
! 										fmstate->target_attrs,
! 										fmstate->temp_cxt);
  
  	/*
  	 * Execute the prepared statement, and check for success.
***************
*** 1671,1676 **** postgresIsForeignRelUpdatable(Relation rel)
--- 1759,1862 ----
  }
  
  /*
+  * postgresGetForeignRowMarkType
+  *		Get rowmark type to use for a particular table
+  */
+ static RowMarkType
+ postgresGetForeignRowMarkType(RangeTblEntry *rte, LockClauseStrength strength)
+ {
+ 	switch (strength)
+ 	{
+ 		case LCS_NONE:
+ 			return ROW_MARK_REFERENCE;
+ 		case LCS_FORKEYSHARE:
+ 			return ROW_MARK_KEYSHARE;
+ 		case LCS_FORSHARE:
+ 			return ROW_MARK_SHARE;
+ 		case LCS_FORNOKEYUPDATE:
+ 			return ROW_MARK_NOKEYEXCLUSIVE;
+ 		case LCS_FORUPDATE:
+ 			return ROW_MARK_EXCLUSIVE;
+ 	}
+ 	return ROW_MARK_COPY;		/* shouldn't happen */
+ }
+ 
+ /*
+  * postgresRefetchForeignRow
+  *		Re-fetch one tuple from a foreign table, possibly locking it
+  */
+ static HeapTuple
+ postgresRefetchForeignRow(EState *estate,
+ 						  ExecRowMark *erm,
+ 						  Datum rowid,
+ 						  bool *updated)
+ {
+ 	PgFdwFetchState *ffstate = (PgFdwFetchState *) erm->ermExtra;
+ 	ItemPointer tupleid = (ItemPointer) DatumGetPointer(rowid);
+ 	const char **p_values;
+ 	PGresult   *res;
+ 	HeapTuple	tuple;
+ 
+ 	*updated = false;
+ 
+ 	/* Set up the prepared statement on the remote server, if we didn't yet */
+ 	if (!ffstate->p_name)
+ 		ffstate->p_name = setup_prep_stmt(ffstate->conn, ffstate->query);
+ 
+ 	/* Convert parameters needed by prepared statement to text form */
+ 	p_values = convert_prep_stmt_params(tupleid, NULL,
+ 										ffstate->p_nums,
+ 										ffstate->p_flinfo,
+ 										NIL,
+ 										ffstate->temp_cxt);
+ 
+ 	/*
+ 	 * Execute the prepared statement, and check for success.
+ 	 *
+ 	 * We don't use a PG_TRY block here, so be careful not to throw error
+ 	 * without releasing the PGresult.
+ 	 */
+ 	res = PQexecPrepared(ffstate->conn,
+ 						 ffstate->p_name,
+ 						 ffstate->p_nums,
+ 						 p_values,
+ 						 NULL,
+ 						 NULL,
+ 						 0);
+ 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ 		pgfdw_report_error(ERROR, res, ffstate->conn, true, ffstate->query);
+ 
+ 	/* PGresult must be released before leaving this function. */
+ 	PG_TRY();
+ 	{
+ 		/* Create the tuple */
+ 		tuple = make_tuple_from_result_row(res, 0,
+ 										   ffstate->rel,
+ 										   ffstate->attinmeta,
+ 										   ffstate->retrieved_attrs,
+ 										   ffstate->temp_cxt);
+ 		tuple->t_tableOid = erm->relid;
+ 
+ 		PQclear(res);
+ 		res = NULL;
+ 	}
+ 	PG_CATCH();
+ 	{
+ 		if (res)
+ 			PQclear(res);
+ 		PG_RE_THROW();
+ 	}
+ 	PG_END_TRY();
+ 
+ 	MemoryContextReset(ffstate->temp_cxt);
+ 
+ 	if (!ItemPointerEquals(tupleid, &(tuple->t_self)))
+ 		*updated = true;
+ 
+ 	return tuple;
+ }
+ 
+ /*
   * postgresExplainForeignScan
   *		Produce extra output for EXPLAIN of a ForeignScan on a foreign table
   */
***************
*** 1933,1938 **** ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
--- 2119,2170 ----
  }
  
  /*
+  * Create the FDW-private information for fetching/locking foreign rows.
+  */
+ static List *
+ create_foreign_fetch_info(PlannerInfo *root,
+ 						  RelOptInfo *baserel,
+ 						  RowMarkType markType)
+ {
+ 	StringInfoData sql;
+ 	List	   *retrieved_attrs;
+ 	Bitmapset  *attrs_used = NULL;
+ 
+ 	/*
+ 	 * Build the query string to be sent for execution.
+ 	 */
+ 	initStringInfo(&sql);
+ 	/* Add ctid to attrs_used. */
+ 	attrs_used = bms_add_member(attrs_used,
+ 								SelfItemPointerAttributeNumber - FirstLowInvalidHeapAttributeNumber);
+ 	/* Add a whole-row var to attrs_used to retrieve all the columns. */
+ 	attrs_used = bms_add_member(attrs_used,
+ 								0 - FirstLowInvalidHeapAttributeNumber);
+ 	deparseSelectSql(&sql, root, baserel, attrs_used, &retrieved_attrs);
+ 	appendStringInfoString(&sql, " WHERE ctid = $1");
+ 
+ 	switch (markType)
+ 	{
+ 		case ROW_MARK_EXCLUSIVE:
+ 		case ROW_MARK_NOKEYEXCLUSIVE:
+ 			appendStringInfoString(&sql, " FOR UPDATE");
+ 			break;
+ 		case ROW_MARK_SHARE:
+ 		case ROW_MARK_KEYSHARE:
+ 			appendStringInfoString(&sql, " FOR SHARE");
+ 			break;
+ 		default:
+ 			break;
+ 	}
+ 
+ 	/*
+ 	 * Build the fdw_private list that will be available to the executor.
+ 	 * Items in the list must match enum FdwFetchPrivateIndex, above.
+ 	 */
+ 	return list_make2(makeString(sql.data), retrieved_attrs);
+ }
+ 
+ /*
   * Create cursor for node's query with current parameter values.
   */
  static void
***************
*** 2169,2179 **** close_cursor(PGconn *conn, unsigned int cursor_number)
  }
  
  /*
!  * prepare_foreign_modify
   *		Establish a prepared statement for execution of INSERT/UPDATE/DELETE
   */
! static void
! prepare_foreign_modify(PgFdwModifyState *fmstate)
  {
  	char		prep_name[NAMEDATALEN];
  	char	   *p_name;
--- 2401,2412 ----
  }
  
  /*
!  * setup_prep_stmt
   *		Establish a prepared statement for execution of INSERT/UPDATE/DELETE
+  *		or re-fetching tuples for EvalPlanQual rechecking
   */
! static char *
! setup_prep_stmt(PGconn *conn, char *query)
  {
  	char		prep_name[NAMEDATALEN];
  	char	   *p_name;
***************
*** 2181,2187 **** prepare_foreign_modify(PgFdwModifyState *fmstate)
  
  	/* Construct name we'll use for the prepared statement. */
  	snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u",
! 			 GetPrepStmtNumber(fmstate->conn));
  	p_name = pstrdup(prep_name);
  
  	/*
--- 2414,2420 ----
  
  	/* Construct name we'll use for the prepared statement. */
  	snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u",
! 			 GetPrepStmtNumber(conn));
  	p_name = pstrdup(prep_name);
  
  	/*
***************
*** 2194,2211 **** prepare_foreign_modify(PgFdwModifyState *fmstate)
  	 * We don't use a PG_TRY block here, so be careful not to throw error
  	 * without releasing the PGresult.
  	 */
! 	res = PQprepare(fmstate->conn,
! 					p_name,
! 					fmstate->query,
! 					0,
! 					NULL);
  
  	if (PQresultStatus(res) != PGRES_COMMAND_OK)
! 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
  	PQclear(res);
  
  	/* This action shows that the prepare has been done. */
! 	fmstate->p_name = p_name;
  }
  
  /*
--- 2427,2440 ----
  	 * We don't use a PG_TRY block here, so be careful not to throw error
  	 * without releasing the PGresult.
  	 */
! 	res = PQprepare(conn, p_name, query, 0, NULL);
  
  	if (PQresultStatus(res) != PGRES_COMMAND_OK)
! 		pgfdw_report_error(ERROR, res, conn, true, query);
  	PQclear(res);
  
  	/* This action shows that the prepare has been done. */
! 	return p_name;
  }
  
  /*
***************
*** 2218,2253 **** prepare_foreign_modify(PgFdwModifyState *fmstate)
   * Data is constructed in temp_cxt; caller should reset that after use.
   */
  static const char **
! convert_prep_stmt_params(PgFdwModifyState *fmstate,
! 						 ItemPointer tupleid,
! 						 TupleTableSlot *slot)
  {
  	const char **p_values;
  	int			pindex = 0;
  	MemoryContext oldcontext;
  
! 	oldcontext = MemoryContextSwitchTo(fmstate->temp_cxt);
  
! 	p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums);
  
  	/* 1st parameter should be ctid, if it's in use */
  	if (tupleid != NULL)
  	{
  		/* don't need set_transmission_modes for TID output */
! 		p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
  											  PointerGetDatum(tupleid));
  		pindex++;
  	}
  
  	/* get following parameters from slot */
! 	if (slot != NULL && fmstate->target_attrs != NIL)
  	{
  		int			nestlevel;
  		ListCell   *lc;
  
  		nestlevel = set_transmission_modes();
  
! 		foreach(lc, fmstate->target_attrs)
  		{
  			int			attnum = lfirst_int(lc);
  			Datum		value;
--- 2447,2485 ----
   * Data is constructed in temp_cxt; caller should reset that after use.
   */
  static const char **
! convert_prep_stmt_params(ItemPointer tupleid,
! 						 TupleTableSlot *slot,
! 						 int p_nums,
! 						 FmgrInfo *p_flinfo,
! 						 List *target_attrs,
! 						 MemoryContext temp_context)
  {
  	const char **p_values;
  	int			pindex = 0;
  	MemoryContext oldcontext;
  
! 	oldcontext = MemoryContextSwitchTo(temp_context);
  
! 	p_values = (const char **) palloc(sizeof(char *) * p_nums);
  
  	/* 1st parameter should be ctid, if it's in use */
  	if (tupleid != NULL)
  	{
  		/* don't need set_transmission_modes for TID output */
! 		p_values[pindex] = OutputFunctionCall(&p_flinfo[pindex],
  											  PointerGetDatum(tupleid));
  		pindex++;
  	}
  
  	/* get following parameters from slot */
! 	if (slot != NULL && target_attrs != NIL)
  	{
  		int			nestlevel;
  		ListCell   *lc;
  
  		nestlevel = set_transmission_modes();
  
! 		foreach(lc, target_attrs)
  		{
  			int			attnum = lfirst_int(lc);
  			Datum		value;
***************
*** 2257,2263 **** convert_prep_stmt_params(PgFdwModifyState *fmstate,
  			if (isnull)
  				p_values[pindex] = NULL;
  			else
! 				p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
  													  value);
  			pindex++;
  		}
--- 2489,2495 ----
  			if (isnull)
  				p_values[pindex] = NULL;
  			else
! 				p_values[pindex] = OutputFunctionCall(&p_flinfo[pindex],
  													  value);
  			pindex++;
  		}
***************
*** 2265,2271 **** convert_prep_stmt_params(PgFdwModifyState *fmstate,
  		reset_transmission_modes(nestlevel);
  	}
  
! 	Assert(pindex == fmstate->p_nums);
  
  	MemoryContextSwitchTo(oldcontext);
  
--- 2497,2503 ----
  		reset_transmission_modes(nestlevel);
  	}
  
! 	Assert(pindex == p_nums);
  
  	MemoryContextSwitchTo(oldcontext);
  
***************
*** 2305,2310 **** store_returning_result(PgFdwModifyState *fmstate,
--- 2537,2645 ----
  }
  
  /*
+  * init_foreign_fetch_state
+  *		Initialize an execution state for fetching/locking foreign rows
+  */
+ static void
+ init_foreign_fetch_state(EState *estate,
+ 						 ExecRowMark *erm,
+ 						 List *fdw_private,
+ 						 int eflags)
+ {
+ 	PgFdwFetchState *ffstate;
+ 	Relation	rel = erm->relation;
+ 	RangeTblEntry *rte;
+ 	Oid			userid;
+ 	ForeignTable *table;
+ 	ForeignServer *server;
+ 	UserMapping *user;
+ 	Oid			typefnoid;
+ 	bool		isvarlena;
+ 
+ 	/* Begin constructing PgFdwFetchState. */
+ 	ffstate = (PgFdwFetchState *) palloc0(sizeof(PgFdwFetchState));
+ 	ffstate->rel = rel;
+ 
+ 	/*
+ 	 * Identify which user to do the remote access as.  This should match what
+ 	 * ExecCheckRTEPerms() does.
+ 	 */
+ 	rte = rt_fetch(erm->rti, estate->es_range_table);
+ 	userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
+ 
+ 	/* Get info about foreign table. */
+ 	table = GetForeignTable(RelationGetRelid(rel));
+ 	server = GetForeignServer(table->serverid);
+ 	user = GetUserMapping(userid, server->serverid);
+ 
+ 	/* Open connection; report that we'll create a prepared statement. */
+ 	ffstate->conn = GetConnection(server, user, true);
+ 	ffstate->p_name = NULL;		/* prepared statement not made yet */
+ 
+ 	/* Deconstruct fdw_private data. */
+ 	ffstate->query = strVal(list_nth(fdw_private,
+ 									 FdwScanPrivateSelectSql2));
+ 	ffstate->retrieved_attrs = (List *) list_nth(fdw_private,
+ 											  FdwScanPrivateRetrievedAttrs2);
+ 
+ 	/* Create context for per-tuple temp workspace. */
+ 	ffstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
+ 											  "postgres_fdw temporary data",
+ 											  ALLOCSET_SMALL_MINSIZE,
+ 											  ALLOCSET_SMALL_INITSIZE,
+ 											  ALLOCSET_SMALL_MAXSIZE);
+ 
+ 	/* Prepare for input conversion of SELECT results. */
+ 	ffstate->attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(rel));
+ 
+ 	/* Prepare for output conversion of parameters used in prepared stmt. */
+ 	ffstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo));
+ 	ffstate->p_nums = 0;
+ 
+ 	/* Only one transmittable parameter will be ctid */
+ 	getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena);
+ 	fmgr_info(typefnoid, &ffstate->p_flinfo[ffstate->p_nums]);
+ 	ffstate->p_nums++;
+ 
+ 	erm->ermExtra = ffstate;
+ }
+ 
+ /*
+  * finish_foreign_fetch_state
+  *		Finish an execution state for fetching/locking foreign rows
+  */
+ static void
+ finish_foreign_fetch_state(EState *estate, ExecRowMark *erm)
+ {
+ 	PgFdwFetchState *ffstate = (PgFdwFetchState *) erm->ermExtra;
+ 
+ 	/* If we created a prepared statement, destroy it */
+ 	if (ffstate->p_name)
+ 	{
+ 		char		sql[64];
+ 		PGresult   *res;
+ 
+ 		snprintf(sql, sizeof(sql), "DEALLOCATE %s", ffstate->p_name);
+ 
+ 		/*
+ 		 * We don't use a PG_TRY block here, so be careful not to throw error
+ 		 * without releasing the PGresult.
+ 		 */
+ 		res = PQexec(ffstate->conn, sql);
+ 		if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ 			pgfdw_report_error(ERROR, res, ffstate->conn, true, sql);
+ 		PQclear(res);
+ 		ffstate->p_name = NULL;
+ 	}
+ 
+ 	/* Release remote connection */
+ 	ReleaseConnection(ffstate->conn);
+ 	ffstate->conn = NULL;
+ 
+ 	erm->ermExtra = NULL;
+ }
+ 
+ /*
   * postgresAnalyzeForeignTable
   *		Test whether analyzing this foreign table is supported
   */
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to