Etsuro Fujita <fujita.ets...@lab.ntt.co.jp> writes:
> On 2015/05/12 7:42, Tom Lane wrote:
>> I don't much like the division of labor between LockForeignRow and
>> FetchForeignRow.  In principle that would lead to not one but two
>> extra remote accesses per locked row in SELECT FOR UPDATE, at least
>> in the case that an EvalPlanQual recheck is required.  (I see that
>> in your prototype patch for postgres_fdw you attempt to avoid that
>> by saving a retrieved tuple in LockForeignRow and then returning it
>> in FetchForeignRow, but that seems extremely ugly and bug-prone,
>> since there is nothing in the API spec insisting that those calls match
>> up one-to-one.)  The fact that locking and fetching a tuple are separate
>> operations in the heapam API is a historical artifact that probably
>> doesn't make sense to duplicate in the FDW API.

> I understand your concern about the postgres_fdw patch.  However, I
> think we should divide this into the two routines as the core patch
> does, because I think that would allow an FDW author to implement these
> routines so as to improve the efficiency for scenarios that seldom need
> fetching, by not retrieving and saving locked tuples in LockForeignRow.

I find it hard to envision a situation where an FDW could lock a row
without being able to fetch its contents more or less for free.  We have
IIRC discussed changing the way that works even for heapam, since the
current design requires multiple buffer lock/unlock cycles which aren't
free either.  In any case, I think that the temptation to do probably-
buggy stuff like what you did in your prototype would be too strong for
most people, and that we're much better off with a simpler one-step API.

An additional advantage of the way I changed this is that it makes the
logic in nodeLockRows simpler too; we no longer need the very messy
hack added by commit 2db576ba8c449fca.

>> Another problem is that we fail to detect whether an EvalPlanQual recheck
>> is required during a SELECT FOR UPDATE on a remote table, which we
>> certainly ought to do if the objective is to make postgres_fdw semantics
>> match the local ones.

> I think that is interesting in theory, but I'm not sure that is worth
> much in practice.

Hm, well, AFAICS the entire point of this patch is to make it possible for
FDWs to duplicate the semantics used for local tables, so I'm not sure
why you'd want to ignore that aspect of it.

Anyway, I redid the patch along those lines, improved the documentation,
and have committed it.

I did a very basic update of your postgres_fdw patch to test this with,
and attach that so that you don't have to repeat the effort.  I'm not sure
whether we want to try to convert that into something committable.  I'm
afraid that the extra round trips involved in doing row locking this way
will be so expensive that no one really wants it for postgres_fdw.  It's
more credible that FDWs operating against local storage would have use
for it.

                        regards, tom lane

diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 0c44260..a122c9e 100644
*** a/contrib/postgres_fdw/postgres_fdw.c
--- b/contrib/postgres_fdw/postgres_fdw.c
*************** typedef struct PgFdwRelationInfo
*** 88,93 ****
--- 88,95 ----
   *
   * 1) SELECT statement text to be sent to the remote server
   * 2) Integer list of attribute numbers retrieved by the SELECT
+  * 3) SELECT statement text to be sent to the remote server
+  * 4) Integer list of attribute numbers retrieved by the SELECT
   *
   * These items are indexed with the enum FdwScanPrivateIndex, so an item
   * can be fetched with list_nth().  For example, to get the SELECT statement:
*************** enum FdwScanPrivateIndex
*** 98,104 ****
  	/* SQL statement to execute remotely (as a String node) */
  	FdwScanPrivateSelectSql,
  	/* Integer list of attribute numbers retrieved by the SELECT */
! 	FdwScanPrivateRetrievedAttrs
  };
  
  /*
--- 100,110 ----
  	/* SQL statement to execute remotely (as a String node) */
  	FdwScanPrivateSelectSql,
  	/* Integer list of attribute numbers retrieved by the SELECT */
! 	FdwScanPrivateRetrievedAttrs,
! 	/* SQL statement to execute remotely (as a String node) */
! 	FdwScanPrivateSelectSql2,
! 	/* Integer list of attribute numbers retrieved by SELECT */
! 	FdwScanPrivateRetrievedAttrs2
  };
  
  /*
*************** typedef struct PgFdwModifyState
*** 186,191 ****
--- 192,221 ----
  } PgFdwModifyState;
  
  /*
+  * Execution state for fetching/locking foreign rows.
+  */
+ typedef struct PgFdwFetchState
+ {
+ 	Relation	rel;			/* relcache entry for the foreign table */
+ 	AttInMetadata *attinmeta;	/* attribute datatype conversion metadata */
+ 
+ 	/* for remote query execution */
+ 	PGconn	   *conn;			/* connection for the fetch */
+ 	char	   *p_name;			/* name of prepared statement, if created */
+ 
+ 	/* extracted fdw_private data */
+ 	char	   *query;			/* text of SELECT command */
+ 	List	   *retrieved_attrs;	/* attr numbers retrieved by SELECT */
+ 
+ 	/* info about parameters for prepared statement */
+ 	int			p_nums;			/* number of parameters to transmit */
+ 	FmgrInfo   *p_flinfo;		/* output conversion functions for them */
+ 
+ 	/* working memory context */
+ 	MemoryContext temp_cxt;		/* context for per-tuple temporary data */
+ } PgFdwFetchState;
+ 
+ /*
   * Workspace for analyzing a foreign table.
   */
  typedef struct PgFdwAnalyzeState
*************** static TupleTableSlot *postgresExecForei
*** 276,281 ****
--- 306,317 ----
  static void postgresEndForeignModify(EState *estate,
  						 ResultRelInfo *resultRelInfo);
  static int	postgresIsForeignRelUpdatable(Relation rel);
+ static RowMarkType postgresGetForeignRowMarkType(RangeTblEntry *rte,
+ 							  LockClauseStrength strength);
+ static HeapTuple postgresRefetchForeignRow(EState *estate,
+ 						  ExecRowMark *erm,
+ 						  Datum rowid,
+ 						  bool *updated);
  static void postgresExplainForeignScan(ForeignScanState *node,
  						   ExplainState *es);
  static void postgresExplainForeignModify(ModifyTableState *mtstate,
*************** static void get_remote_estimate(const ch
*** 306,320 ****
  static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
  						  EquivalenceClass *ec, EquivalenceMember *em,
  						  void *arg);
  static void create_cursor(ForeignScanState *node);
  static void fetch_more_data(ForeignScanState *node);
  static void close_cursor(PGconn *conn, unsigned int cursor_number);
! static void prepare_foreign_modify(PgFdwModifyState *fmstate);
! static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
! 						 ItemPointer tupleid,
! 						 TupleTableSlot *slot);
  static void store_returning_result(PgFdwModifyState *fmstate,
  					   TupleTableSlot *slot, PGresult *res);
  static int postgresAcquireSampleRowsFunc(Relation relation, int elevel,
  							  HeapTuple *rows, int targrows,
  							  double *totalrows,
--- 342,367 ----
  static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
  						  EquivalenceClass *ec, EquivalenceMember *em,
  						  void *arg);
+ static List *create_foreign_fetch_info(PlannerInfo *root,
+ 						  RelOptInfo *baserel,
+ 						  RowMarkType markType);
  static void create_cursor(ForeignScanState *node);
  static void fetch_more_data(ForeignScanState *node);
  static void close_cursor(PGconn *conn, unsigned int cursor_number);
! static char *setup_prep_stmt(PGconn *conn, char *query);
! static const char **convert_prep_stmt_params(ItemPointer tupleid,
! 						 TupleTableSlot *slot,
! 						 int p_nums,
! 						 FmgrInfo *p_flinfo,
! 						 List *target_attrs,
! 						 MemoryContext temp_context);
  static void store_returning_result(PgFdwModifyState *fmstate,
  					   TupleTableSlot *slot, PGresult *res);
+ static void init_foreign_fetch_state(EState *estate,
+ 						 ExecRowMark *erm,
+ 						 List *fdw_private,
+ 						 int eflags);
+ static void finish_foreign_fetch_state(EState *estate, ExecRowMark *erm);
  static int postgresAcquireSampleRowsFunc(Relation relation, int elevel,
  							  HeapTuple *rows, int targrows,
  							  double *totalrows,
*************** postgres_fdw_handler(PG_FUNCTION_ARGS)
*** 358,363 ****
--- 405,414 ----
  	routine->EndForeignModify = postgresEndForeignModify;
  	routine->IsForeignRelUpdatable = postgresIsForeignRelUpdatable;
  
+ 	/* Functions for SELECT FOR UPDATE/SHARE row locking */
+ 	routine->GetForeignRowMarkType = postgresGetForeignRowMarkType;
+ 	routine->RefetchForeignRow = postgresRefetchForeignRow;
+ 
  	/* Support functions for EXPLAIN */
  	routine->ExplainForeignScan = postgresExplainForeignScan;
  	routine->ExplainForeignModify = postgresExplainForeignModify;
*************** postgresGetForeignPlan(PlannerInfo *root
*** 746,751 ****
--- 797,803 ----
  	PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) baserel->fdw_private;
  	Index		scan_relid = baserel->relid;
  	List	   *fdw_private;
+ 	List	   *fdw_private2 = NIL;
  	List	   *remote_conds = NIL;
  	List	   *local_exprs = NIL;
  	List	   *params_list = NIL;
*************** postgresGetForeignPlan(PlannerInfo *root
*** 836,855 ****
  			 * complete information about, and (b) it wouldn't work anyway on
  			 * older remote servers.  Likewise, we don't worry about NOWAIT.
  			 */
! 			switch (rc->strength)
  			{
! 				case LCS_NONE:
! 					/* No locking needed */
! 					break;
! 				case LCS_FORKEYSHARE:
! 				case LCS_FORSHARE:
! 					appendStringInfoString(&sql, " FOR SHARE");
! 					break;
! 				case LCS_FORNOKEYUPDATE:
! 				case LCS_FORUPDATE:
! 					appendStringInfoString(&sql, " FOR UPDATE");
! 					break;
  			}
  		}
  	}
  
--- 888,913 ----
  			 * complete information about, and (b) it wouldn't work anyway on
  			 * older remote servers.  Likewise, we don't worry about NOWAIT.
  			 */
! 			if (rc->markType == ROW_MARK_COPY)
  			{
! 				switch (rc->strength)
! 				{
! 					case LCS_NONE:
! 						/* No locking needed */
! 						break;
! 					case LCS_FORKEYSHARE:
! 					case LCS_FORSHARE:
! 						appendStringInfoString(&sql, " FOR SHARE");
! 						break;
! 					case LCS_FORNOKEYUPDATE:
! 					case LCS_FORUPDATE:
! 						appendStringInfoString(&sql, " FOR UPDATE");
! 						break;
! 				}
  			}
+ 			else
+ 				fdw_private2 = create_foreign_fetch_info(root, baserel,
+ 														 rc->markType);
  		}
  	}
  
*************** postgresGetForeignPlan(PlannerInfo *root
*** 859,864 ****
--- 917,924 ----
  	 */
  	fdw_private = list_make2(makeString(sql.data),
  							 retrieved_attrs);
+ 	if (fdw_private2)
+ 		fdw_private = list_concat(fdw_private, fdw_private2);
  
  	/*
  	 * Create the ForeignScan node from target list, local filtering
*************** postgresBeginForeignScan(ForeignScanStat
*** 887,892 ****
--- 947,953 ----
  	EState	   *estate = node->ss.ps.state;
  	PgFdwScanState *fsstate;
  	RangeTblEntry *rte;
+ 	ExecRowMark *erm;
  	Oid			userid;
  	ForeignTable *table;
  	ForeignServer *server;
*************** postgresBeginForeignScan(ForeignScanStat
*** 987,992 ****
--- 1048,1060 ----
  		fsstate->param_values = (const char **) palloc0(numParams * sizeof(char *));
  	else
  		fsstate->param_values = NULL;
+ 
+ 	/*
+ 	 * Initialize state for fetching/locking foreign rows if needed.
+ 	 */
+ 	erm = ExecFindRowMark(estate, fsplan->scan.scanrelid, true);
+ 	if (erm && erm->relation && erm->ermExtra == NULL)
+ 		init_foreign_fetch_state(estate, erm, fsplan->fdw_private, eflags);
  }
  
  /*
*************** postgresReScanForeignScan(ForeignScanSta
*** 1094,1100 ****
--- 1162,1171 ----
  static void
  postgresEndForeignScan(ForeignScanState *node)
  {
+ 	ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
+ 	EState	   *estate = node->ss.ps.state;
  	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+ 	ExecRowMark *erm;
  
  	/* if fsstate is NULL, we are in EXPLAIN; nothing to do */
  	if (fsstate == NULL)
*************** postgresEndForeignScan(ForeignScanState 
*** 1108,1113 ****
--- 1179,1191 ----
  	ReleaseConnection(fsstate->conn);
  	fsstate->conn = NULL;
  
+ 	/*
+ 	 * Finish state for fetching/locking foreign rows if needed.
+ 	 */
+ 	erm = ExecFindRowMark(estate, fsplan->scan.scanrelid, true);
+ 	if (erm && erm->relation && erm->ermExtra != NULL)
+ 		finish_foreign_fetch_state(estate, erm);
+ 
  	/* MemoryContexts will be deleted automatically. */
  }
  
*************** postgresExecForeignInsert(EState *estate
*** 1405,1414 ****
  
  	/* Set up the prepared statement on the remote server, if we didn't yet */
  	if (!fmstate->p_name)
! 		prepare_foreign_modify(fmstate);
  
  	/* Convert parameters needed by prepared statement to text form */
! 	p_values = convert_prep_stmt_params(fmstate, NULL, slot);
  
  	/*
  	 * Execute the prepared statement, and check for success.
--- 1483,1496 ----
  
  	/* Set up the prepared statement on the remote server, if we didn't yet */
  	if (!fmstate->p_name)
! 		fmstate->p_name = setup_prep_stmt(fmstate->conn, fmstate->query);
  
  	/* Convert parameters needed by prepared statement to text form */
! 	p_values = convert_prep_stmt_params(NULL, slot,
! 										fmstate->p_nums,
! 										fmstate->p_flinfo,
! 										fmstate->target_attrs,
! 										fmstate->temp_cxt);
  
  	/*
  	 * Execute the prepared statement, and check for success.
*************** postgresExecForeignUpdate(EState *estate
*** 1465,1471 ****
  
  	/* Set up the prepared statement on the remote server, if we didn't yet */
  	if (!fmstate->p_name)
! 		prepare_foreign_modify(fmstate);
  
  	/* Get the ctid that was passed up as a resjunk column */
  	datum = ExecGetJunkAttribute(planSlot,
--- 1547,1553 ----
  
  	/* Set up the prepared statement on the remote server, if we didn't yet */
  	if (!fmstate->p_name)
! 		fmstate->p_name = setup_prep_stmt(fmstate->conn, fmstate->query);
  
  	/* Get the ctid that was passed up as a resjunk column */
  	datum = ExecGetJunkAttribute(planSlot,
*************** postgresExecForeignUpdate(EState *estate
*** 1476,1484 ****
  		elog(ERROR, "ctid is NULL");
  
  	/* Convert parameters needed by prepared statement to text form */
! 	p_values = convert_prep_stmt_params(fmstate,
! 										(ItemPointer) DatumGetPointer(datum),
! 										slot);
  
  	/*
  	 * Execute the prepared statement, and check for success.
--- 1558,1569 ----
  		elog(ERROR, "ctid is NULL");
  
  	/* Convert parameters needed by prepared statement to text form */
! 	p_values = convert_prep_stmt_params((ItemPointer) DatumGetPointer(datum),
! 										slot,
! 										fmstate->p_nums,
! 										fmstate->p_flinfo,
! 										fmstate->target_attrs,
! 										fmstate->temp_cxt);
  
  	/*
  	 * Execute the prepared statement, and check for success.
*************** postgresExecForeignDelete(EState *estate
*** 1535,1541 ****
  
  	/* Set up the prepared statement on the remote server, if we didn't yet */
  	if (!fmstate->p_name)
! 		prepare_foreign_modify(fmstate);
  
  	/* Get the ctid that was passed up as a resjunk column */
  	datum = ExecGetJunkAttribute(planSlot,
--- 1620,1626 ----
  
  	/* Set up the prepared statement on the remote server, if we didn't yet */
  	if (!fmstate->p_name)
! 		fmstate->p_name = setup_prep_stmt(fmstate->conn, fmstate->query);
  
  	/* Get the ctid that was passed up as a resjunk column */
  	datum = ExecGetJunkAttribute(planSlot,
*************** postgresExecForeignDelete(EState *estate
*** 1546,1554 ****
  		elog(ERROR, "ctid is NULL");
  
  	/* Convert parameters needed by prepared statement to text form */
! 	p_values = convert_prep_stmt_params(fmstate,
! 										(ItemPointer) DatumGetPointer(datum),
! 										NULL);
  
  	/*
  	 * Execute the prepared statement, and check for success.
--- 1631,1642 ----
  		elog(ERROR, "ctid is NULL");
  
  	/* Convert parameters needed by prepared statement to text form */
! 	p_values = convert_prep_stmt_params((ItemPointer) DatumGetPointer(datum),
! 										NULL,
! 										fmstate->p_nums,
! 										fmstate->p_flinfo,
! 										fmstate->target_attrs,
! 										fmstate->temp_cxt);
  
  	/*
  	 * Execute the prepared statement, and check for success.
*************** postgresIsForeignRelUpdatable(Relation r
*** 1670,1675 ****
--- 1758,1857 ----
  }
  
  /*
+  * postgresGetForeignRowMarkType
+  *		Get rowmark type to use for a particular table
+  */
+ static RowMarkType
+ postgresGetForeignRowMarkType(RangeTblEntry *rte, LockClauseStrength strength)
+ {
+ 	switch (strength)
+ 	{
+ 		case LCS_NONE:
+ 			return ROW_MARK_REFERENCE;
+ 		case LCS_FORKEYSHARE:
+ 			return ROW_MARK_KEYSHARE;
+ 		case LCS_FORSHARE:
+ 			return ROW_MARK_SHARE;
+ 		case LCS_FORNOKEYUPDATE:
+ 			return ROW_MARK_NOKEYEXCLUSIVE;
+ 		case LCS_FORUPDATE:
+ 			return ROW_MARK_EXCLUSIVE;
+ 	}
+ 	return ROW_MARK_COPY;		/* shouldn't happen */
+ }
+ 
+ /*
+  * postgresRefetchForeignRow
+  *		Re-fetch one tuple from a foreign table, possibly locking it
+  */
+ static HeapTuple
+ postgresRefetchForeignRow(EState *estate,
+ 						  ExecRowMark *erm,
+ 						  Datum rowid,
+ 						  bool *updated)
+ {
+ 	PgFdwFetchState *ffstate = (PgFdwFetchState *) erm->ermExtra;
+ 	ItemPointer tupleid = (ItemPointer) DatumGetPointer(rowid);
+ 	const char **p_values;
+ 	PGresult   *res;
+ 	HeapTuple	tuple;
+ 
+ 	/* Set up the prepared statement on the remote server, if we didn't yet */
+ 	if (!ffstate->p_name)
+ 		ffstate->p_name = setup_prep_stmt(ffstate->conn, ffstate->query);
+ 
+ 	/* Convert parameters needed by prepared statement to text form */
+ 	p_values = convert_prep_stmt_params(tupleid, NULL,
+ 										ffstate->p_nums,
+ 										ffstate->p_flinfo,
+ 										NIL,
+ 										ffstate->temp_cxt);
+ 
+ 	/*
+ 	 * Execute the prepared statement, and check for success.
+ 	 *
+ 	 * We don't use a PG_TRY block here, so be careful not to throw error
+ 	 * without releasing the PGresult.
+ 	 */
+ 	res = PQexecPrepared(ffstate->conn,
+ 						 ffstate->p_name,
+ 						 ffstate->p_nums,
+ 						 p_values,
+ 						 NULL,
+ 						 NULL,
+ 						 0);
+ 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ 		pgfdw_report_error(ERROR, res, ffstate->conn, true, ffstate->query);
+ 
+ 	/* PGresult must be released before leaving this function. */
+ 	PG_TRY();
+ 	{
+ 		/* Create the tuple */
+ 		tuple = make_tuple_from_result_row(res, 0,
+ 										   ffstate->rel,
+ 										   ffstate->attinmeta,
+ 										   ffstate->retrieved_attrs,
+ 										   ffstate->temp_cxt);
+ 		tuple->t_self = *tupleid;
+ 		tuple->t_tableOid = erm->relid;
+ 
+ 		PQclear(res);
+ 		res = NULL;
+ 	}
+ 	PG_CATCH();
+ 	{
+ 		if (res)
+ 			PQclear(res);
+ 		PG_RE_THROW();
+ 	}
+ 	PG_END_TRY();
+ 
+ 	MemoryContextReset(ffstate->temp_cxt);
+ 
+ 	return tuple;
+ }
+ 
+ /*
   * postgresExplainForeignScan
   *		Produce extra output for EXPLAIN of a ForeignScan on a foreign table
   */
*************** ec_member_matches_foreign(PlannerInfo *r
*** 1932,1937 ****
--- 2114,2162 ----
  }
  
  /*
+  * Create the FDW-private information for fetching/locking foreign rows.
+  */
+ static List *
+ create_foreign_fetch_info(PlannerInfo *root,
+ 						  RelOptInfo *baserel,
+ 						  RowMarkType markType)
+ {
+ 	StringInfoData sql;
+ 	List	   *retrieved_attrs;
+ 	Bitmapset  *attrs_used = NULL;
+ 
+ 	/*
+ 	 * Build the query string to be sent for execution.
+ 	 */
+ 	initStringInfo(&sql);
+ 	/* Add a whole-row var to attrs_used to retrieve all the columns. */
+ 	attrs_used = bms_add_member(attrs_used,
+ 								0 - FirstLowInvalidHeapAttributeNumber);
+ 	deparseSelectSql(&sql, root, baserel, attrs_used, &retrieved_attrs);
+ 	appendStringInfoString(&sql, " WHERE ctid = $1");
+ 
+ 	switch (markType)
+ 	{
+ 		case ROW_MARK_EXCLUSIVE:
+ 		case ROW_MARK_NOKEYEXCLUSIVE:
+ 			appendStringInfoString(&sql, " FOR UPDATE");
+ 			break;
+ 		case ROW_MARK_SHARE:
+ 		case ROW_MARK_KEYSHARE:
+ 			appendStringInfoString(&sql, " FOR SHARE");
+ 			break;
+ 		default:
+ 			break;
+ 	}
+ 
+ 	/*
+ 	 * Build the fdw_private list that will be available to the executor.
+ 	 * Items in the list must match enum FdwFetchPrivateIndex, above.
+ 	 */
+ 	return list_make2(makeString(sql.data), retrieved_attrs);
+ }
+ 
+ /*
   * Create cursor for node's query with current parameter values.
   */
  static void
*************** close_cursor(PGconn *conn, unsigned int 
*** 2168,2178 ****
  }
  
  /*
!  * prepare_foreign_modify
   *		Establish a prepared statement for execution of INSERT/UPDATE/DELETE
   */
! static void
! prepare_foreign_modify(PgFdwModifyState *fmstate)
  {
  	char		prep_name[NAMEDATALEN];
  	char	   *p_name;
--- 2393,2404 ----
  }
  
  /*
!  * setup_prep_stmt
   *		Establish a prepared statement for execution of INSERT/UPDATE/DELETE
+  *		or re-fetching tuples for EvalPlanQual rechecking
   */
! static char *
! setup_prep_stmt(PGconn *conn, char *query)
  {
  	char		prep_name[NAMEDATALEN];
  	char	   *p_name;
*************** prepare_foreign_modify(PgFdwModifyState 
*** 2180,2186 ****
  
  	/* Construct name we'll use for the prepared statement. */
  	snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u",
! 			 GetPrepStmtNumber(fmstate->conn));
  	p_name = pstrdup(prep_name);
  
  	/*
--- 2406,2412 ----
  
  	/* Construct name we'll use for the prepared statement. */
  	snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u",
! 			 GetPrepStmtNumber(conn));
  	p_name = pstrdup(prep_name);
  
  	/*
*************** prepare_foreign_modify(PgFdwModifyState 
*** 2193,2210 ****
  	 * We don't use a PG_TRY block here, so be careful not to throw error
  	 * without releasing the PGresult.
  	 */
! 	res = PQprepare(fmstate->conn,
! 					p_name,
! 					fmstate->query,
! 					0,
! 					NULL);
  
  	if (PQresultStatus(res) != PGRES_COMMAND_OK)
! 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
  	PQclear(res);
  
  	/* This action shows that the prepare has been done. */
! 	fmstate->p_name = p_name;
  }
  
  /*
--- 2419,2432 ----
  	 * We don't use a PG_TRY block here, so be careful not to throw error
  	 * without releasing the PGresult.
  	 */
! 	res = PQprepare(conn, p_name, query, 0, NULL);
  
  	if (PQresultStatus(res) != PGRES_COMMAND_OK)
! 		pgfdw_report_error(ERROR, res, conn, true, query);
  	PQclear(res);
  
  	/* This action shows that the prepare has been done. */
! 	return p_name;
  }
  
  /*
*************** prepare_foreign_modify(PgFdwModifyState 
*** 2217,2252 ****
   * Data is constructed in temp_cxt; caller should reset that after use.
   */
  static const char **
! convert_prep_stmt_params(PgFdwModifyState *fmstate,
! 						 ItemPointer tupleid,
! 						 TupleTableSlot *slot)
  {
  	const char **p_values;
  	int			pindex = 0;
  	MemoryContext oldcontext;
  
! 	oldcontext = MemoryContextSwitchTo(fmstate->temp_cxt);
  
! 	p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums);
  
  	/* 1st parameter should be ctid, if it's in use */
  	if (tupleid != NULL)
  	{
  		/* don't need set_transmission_modes for TID output */
! 		p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
  											  PointerGetDatum(tupleid));
  		pindex++;
  	}
  
  	/* get following parameters from slot */
! 	if (slot != NULL && fmstate->target_attrs != NIL)
  	{
  		int			nestlevel;
  		ListCell   *lc;
  
  		nestlevel = set_transmission_modes();
  
! 		foreach(lc, fmstate->target_attrs)
  		{
  			int			attnum = lfirst_int(lc);
  			Datum		value;
--- 2439,2477 ----
   * Data is constructed in temp_cxt; caller should reset that after use.
   */
  static const char **
! convert_prep_stmt_params(ItemPointer tupleid,
! 						 TupleTableSlot *slot,
! 						 int p_nums,
! 						 FmgrInfo *p_flinfo,
! 						 List *target_attrs,
! 						 MemoryContext temp_context)
  {
  	const char **p_values;
  	int			pindex = 0;
  	MemoryContext oldcontext;
  
! 	oldcontext = MemoryContextSwitchTo(temp_context);
  
! 	p_values = (const char **) palloc(sizeof(char *) * p_nums);
  
  	/* 1st parameter should be ctid, if it's in use */
  	if (tupleid != NULL)
  	{
  		/* don't need set_transmission_modes for TID output */
! 		p_values[pindex] = OutputFunctionCall(&p_flinfo[pindex],
  											  PointerGetDatum(tupleid));
  		pindex++;
  	}
  
  	/* get following parameters from slot */
! 	if (slot != NULL && target_attrs != NIL)
  	{
  		int			nestlevel;
  		ListCell   *lc;
  
  		nestlevel = set_transmission_modes();
  
! 		foreach(lc, target_attrs)
  		{
  			int			attnum = lfirst_int(lc);
  			Datum		value;
*************** convert_prep_stmt_params(PgFdwModifyStat
*** 2256,2262 ****
  			if (isnull)
  				p_values[pindex] = NULL;
  			else
! 				p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
  													  value);
  			pindex++;
  		}
--- 2481,2487 ----
  			if (isnull)
  				p_values[pindex] = NULL;
  			else
! 				p_values[pindex] = OutputFunctionCall(&p_flinfo[pindex],
  													  value);
  			pindex++;
  		}
*************** convert_prep_stmt_params(PgFdwModifyStat
*** 2264,2270 ****
  		reset_transmission_modes(nestlevel);
  	}
  
! 	Assert(pindex == fmstate->p_nums);
  
  	MemoryContextSwitchTo(oldcontext);
  
--- 2489,2495 ----
  		reset_transmission_modes(nestlevel);
  	}
  
! 	Assert(pindex == p_nums);
  
  	MemoryContextSwitchTo(oldcontext);
  
*************** store_returning_result(PgFdwModifyState 
*** 2304,2309 ****
--- 2529,2635 ----
  }
  
  /*
+  * init_foreign_fetch_state
+  *		Initialize an execution state for fetching/locking foreign rows
+  */
+ static void
+ init_foreign_fetch_state(EState *estate,
+ 						 ExecRowMark *erm,
+ 						 List *fdw_private,
+ 						 int eflags)
+ {
+ 	PgFdwFetchState *ffstate;
+ 	Relation	rel = erm->relation;
+ 	RangeTblEntry *rte;
+ 	Oid			userid;
+ 	ForeignTable *table;
+ 	ForeignServer *server;
+ 	UserMapping *user;
+ 	Oid			typefnoid;
+ 	bool		isvarlena;
+ 
+ 	/* Begin constructing PgFdwFetchState. */
+ 	ffstate = (PgFdwFetchState *) palloc0(sizeof(PgFdwFetchState));
+ 	ffstate->rel = rel;
+ 
+ 	/*
+ 	 * Identify which user to do the remote access as.  This should match what
+ 	 * ExecCheckRTEPerms() does.
+ 	 */
+ 	rte = rt_fetch(erm->rti, estate->es_range_table);
+ 	userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
+ 
+ 	/* Get info about foreign table. */
+ 	table = GetForeignTable(RelationGetRelid(rel));
+ 	server = GetForeignServer(table->serverid);
+ 	user = GetUserMapping(userid, server->serverid);
+ 
+ 	/* Open connection; report that we'll create a prepared statement. */
+ 	ffstate->conn = GetConnection(server, user, true);
+ 	ffstate->p_name = NULL;		/* prepared statement not made yet */
+ 
+ 	/* Deconstruct fdw_private data. */
+ 	ffstate->query = strVal(list_nth(fdw_private,
+ 									 FdwScanPrivateSelectSql2));
+ 	ffstate->retrieved_attrs = (List *) list_nth(fdw_private,
+ 											  FdwScanPrivateRetrievedAttrs2);
+ 
+ 	/* Create context for per-tuple temp workspace. */
+ 	ffstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
+ 											  "postgres_fdw temporary data",
+ 											  ALLOCSET_SMALL_MINSIZE,
+ 											  ALLOCSET_SMALL_INITSIZE,
+ 											  ALLOCSET_SMALL_MAXSIZE);
+ 
+ 	/* Prepare for input conversion of SELECT results. */
+ 	ffstate->attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(rel));
+ 
+ 	/* Prepare for output conversion of parameters used in prepared stmt. */
+ 	ffstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo));
+ 	ffstate->p_nums = 0;
+ 
+ 	/* Only one transmittable parameter will be ctid */
+ 	getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena);
+ 	fmgr_info(typefnoid, &ffstate->p_flinfo[ffstate->p_nums]);
+ 	ffstate->p_nums++;
+ 
+ 	erm->ermExtra = ffstate;
+ }
+ 
+ /*
+  * finish_foreign_fetch_state
+  *		Finish an execution state for fetching/locking foreign rows
+  */
+ static void
+ finish_foreign_fetch_state(EState *estate, ExecRowMark *erm)
+ {
+ 	PgFdwFetchState *ffstate = (PgFdwFetchState *) erm->ermExtra;
+ 
+ 	/* If we created a prepared statement, destroy it */
+ 	if (ffstate->p_name)
+ 	{
+ 		char		sql[64];
+ 		PGresult   *res;
+ 
+ 		snprintf(sql, sizeof(sql), "DEALLOCATE %s", ffstate->p_name);
+ 
+ 		/*
+ 		 * We don't use a PG_TRY block here, so be careful not to throw error
+ 		 * without releasing the PGresult.
+ 		 */
+ 		res = PQexec(ffstate->conn, sql);
+ 		if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ 			pgfdw_report_error(ERROR, res, ffstate->conn, true, sql);
+ 		PQclear(res);
+ 		ffstate->p_name = NULL;
+ 	}
+ 
+ 	/* Release remote connection */
+ 	ReleaseConnection(ffstate->conn);
+ 	ffstate->conn = NULL;
+ }
+ 
+ /*
   * postgresAnalyzeForeignTable
   *		Test whether analyzing this foreign table is supported
   */
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to