(2019/03/04 12:10), Etsuro Fujita wrote:
(2019/03/02 3:57), Andres Freund wrote:
FWIW, I pushed the EPQ patch, doing this conversion blindly. It'd be
awesome if you'd check that it actually works...

I'll start the work later this week. I think I can post an (initial)
report on that next week, maybe.

Here is an updated version of the patch [1]. This version doesn't allow pushing down joins to the remote if there is a possibility that EPQ will be executed, but I think it would be useful to test the EPQ patch. I haven't looked into the EPQ patch in detail yet, but I tested the patch with the attached, and couldn't find any issues on the patch.

Best regards,
Etsuro Fujita

[1] https://www.postgresql.org/message-id/16016.1431455074%40sss.pgh.pa.us
*** a/contrib/postgres_fdw/deparse.c
--- b/contrib/postgres_fdw/deparse.c
***************
*** 1216,1222 **** deparseLockingClause(deparse_expr_cxt *context)
  		{
  			PlanRowMark *rc = get_plan_rowmark(root->rowMarks, relid);
  
! 			if (rc)
  			{
  				/*
  				 * Relation is specified as a FOR UPDATE/SHARE target, so
--- 1216,1222 ----
  		{
  			PlanRowMark *rc = get_plan_rowmark(root->rowMarks, relid);
  
! 			if (rc && rc->markType == ROW_MARK_COPY)
  			{
  				/*
  				 * Relation is specified as a FOR UPDATE/SHARE target, so
*** a/contrib/postgres_fdw/postgres_fdw.c
--- b/contrib/postgres_fdw/postgres_fdw.c
***************
*** 32,37 ****
--- 32,38 ----
  #include "optimizer/pathnode.h"
  #include "optimizer/paths.h"
  #include "optimizer/planmain.h"
+ #include "optimizer/prep.h"
  #include "optimizer/restrictinfo.h"
  #include "optimizer/tlist.h"
  #include "parser/parsetree.h"
***************
*** 70,75 **** enum FdwScanPrivateIndex
--- 71,80 ----
  	FdwScanPrivateRetrievedAttrs,
  	/* Integer representing the desired fetch_size */
  	FdwScanPrivateFetchSize,
+ 	/* SQL statement to execute remotely (as a String node) */
+ 	FdwScanPrivateSelectSql2,
+ 	/* Integer list of attribute numbers retrieved by SELECT */
+ 	FdwScanPrivateRetrievedAttrs2,
  
  	/*
  	 * String describing join i.e. names of relations being joined and types
***************
*** 222,227 **** typedef struct PgFdwDirectModifyState
--- 227,256 ----
  	MemoryContext temp_cxt;		/* context for per-tuple temporary data */
  } PgFdwDirectModifyState;
  
+ /*
+  * Execution state for fetching/locking foreign rows.
+  */
+ typedef struct PgFdwFetchState
+ {
+ 	Relation	rel;			/* relcache entry for the foreign table */
+ 	AttInMetadata *attinmeta;	/* attribute datatype conversion metadata */
+ 
+ 	/* for remote query execution */
+ 	PGconn	   *conn;			/* connection for the fetch */
+ 	char	   *p_name;			/* name of prepared statement, if created */
+ 
+ 	/* extracted fdw_private data */
+ 	char	   *query;			/* text of SELECT command */
+ 	List	   *retrieved_attrs;	/* attr numbers retrieved by SELECT */
+ 
+ 	/* info about parameters for prepared statement */
+ 	int			p_nums;			/* number of parameters to transmit */
+ 	FmgrInfo   *p_flinfo;		/* output conversion functions for them */
+ 
+ 	/* working memory context */
+ 	MemoryContext temp_cxt;		/* context for per-tuple temporary data */
+ } PgFdwFetchState;
+ 
  /*
   * Workspace for analyzing a foreign table.
   */
***************
*** 333,338 **** static bool postgresPlanDirectModify(PlannerInfo *root,
--- 362,374 ----
  static void postgresBeginDirectModify(ForeignScanState *node, int eflags);
  static TupleTableSlot *postgresIterateDirectModify(ForeignScanState *node);
  static void postgresEndDirectModify(ForeignScanState *node);
+ static RowMarkType postgresGetForeignRowMarkType(RangeTblEntry *rte,
+ 							  LockClauseStrength strength);
+ static void postgresRefetchForeignRow(EState *estate,
+ 						  ExecRowMark *erm,
+ 						  Datum rowid,
+ 						  TupleTableSlot *slot,
+ 						  bool *updated);
  static void postgresExplainForeignScan(ForeignScanState *node,
  						   ExplainState *es);
  static void postgresExplainForeignModify(ModifyTableState *mtstate,
***************
*** 379,384 **** static void get_remote_estimate(const char *sql,
--- 415,423 ----
  static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
  						  EquivalenceClass *ec, EquivalenceMember *em,
  						  void *arg);
+ static List *create_foreign_fetch_info(PlannerInfo *root,
+ 						  RelOptInfo *baserel,
+ 						  RowMarkType markType);
  static void create_cursor(ForeignScanState *node);
  static void fetch_more_data(ForeignScanState *node);
  static void close_cursor(PGconn *conn, unsigned int cursor_number);
***************
*** 396,405 **** static TupleTableSlot *execute_foreign_modify(EState *estate,
  					   CmdType operation,
  					   TupleTableSlot *slot,
  					   TupleTableSlot *planSlot);
! static void prepare_foreign_modify(PgFdwModifyState *fmstate);
! static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
! 						 ItemPointer tupleid,
! 						 TupleTableSlot *slot);
  static void store_returning_result(PgFdwModifyState *fmstate,
  					   TupleTableSlot *slot, PGresult *res);
  static void finish_foreign_modify(PgFdwModifyState *fmstate);
--- 435,447 ----
  					   CmdType operation,
  					   TupleTableSlot *slot,
  					   TupleTableSlot *planSlot);
! static char *setup_prep_stmt(PGconn *conn, char *query);
! static const char **convert_prep_stmt_params(ItemPointer tupleid,
! 						 TupleTableSlot *slot,
! 						 int p_nums,
! 						 FmgrInfo *p_flinfo,
! 						 List *target_attrs,
! 						 MemoryContext temp_context);
  static void store_returning_result(PgFdwModifyState *fmstate,
  					   TupleTableSlot *slot, PGresult *res);
  static void finish_foreign_modify(PgFdwModifyState *fmstate);
***************
*** 424,429 **** static void process_query_params(ExprContext *econtext,
--- 466,476 ----
  					 FmgrInfo *param_flinfo,
  					 List *param_exprs,
  					 const char **param_values);
+ static void init_foreign_fetch_state(EState *estate,
+ 						 ExecRowMark *erm,
+ 						 List *fdw_private,
+ 						 int eflags);
+ static void finish_foreign_fetch_state(EState *estate, ExecRowMark *erm);
  static int postgresAcquireSampleRowsFunc(Relation relation, int elevel,
  							  HeapTuple *rows, int targrows,
  							  double *totalrows,
***************
*** 493,500 **** postgres_fdw_handler(PG_FUNCTION_ARGS)
  	routine->IterateDirectModify = postgresIterateDirectModify;
  	routine->EndDirectModify = postgresEndDirectModify;
  
! 	/* Function for EvalPlanQual rechecks */
  	routine->RecheckForeignScan = postgresRecheckForeignScan;
  	/* Support functions for EXPLAIN */
  	routine->ExplainForeignScan = postgresExplainForeignScan;
  	routine->ExplainForeignModify = postgresExplainForeignModify;
--- 540,550 ----
  	routine->IterateDirectModify = postgresIterateDirectModify;
  	routine->EndDirectModify = postgresEndDirectModify;
  
! 	/* Functions for SELECT FOR UPDATE/SHARE row locking */
! 	routine->GetForeignRowMarkType = postgresGetForeignRowMarkType;
! 	routine->RefetchForeignRow = postgresRefetchForeignRow;
  	routine->RecheckForeignScan = postgresRecheckForeignScan;
+ 
  	/* Support functions for EXPLAIN */
  	routine->ExplainForeignScan = postgresExplainForeignScan;
  	routine->ExplainForeignModify = postgresExplainForeignModify;
***************
*** 1149,1154 **** postgresGetForeignPlan(PlannerInfo *root,
--- 1199,1206 ----
  	List	   *fdw_recheck_quals = NIL;
  	List	   *retrieved_attrs;
  	StringInfoData sql;
+ 	PlanRowMark *rc = NULL;
+ 	List	   *fdw_private2 = NIL;
  	ListCell   *lc;
  
  	if (IS_SIMPLE_REL(foreignrel))
***************
*** 1311,1316 **** postgresGetForeignPlan(PlannerInfo *root,
--- 1363,1383 ----
  	fdw_private = list_make3(makeString(sql.data),
  							 retrieved_attrs,
  							 makeInteger(fpinfo->fetch_size));
+ 
+ 	/* Append information about late row locking for rowmarked rels. */
+ 	if (root->rowMarks && IS_SIMPLE_REL(foreignrel))
+ 		rc = get_plan_rowmark(root->rowMarks, foreignrel->relid);
+ 	if (rc)
+ 	{
+ 		Assert(rc->markType != ROW_MARK_COPY);
+ 		fdw_private2 = create_foreign_fetch_info(root,
+ 												 foreignrel,
+ 												 rc->markType);
+ 	}
+ 	else
+ 		fdw_private2 = list_make2(NULL, NIL);
+ 	fdw_private = list_concat(fdw_private, fdw_private2);
+ 
  	if (IS_JOIN_REL(foreignrel) || IS_UPPER_REL(foreignrel))
  		fdw_private = lappend(fdw_private,
  							  makeString(fpinfo->relation_name->data));
***************
*** 1343,1348 **** postgresBeginForeignScan(ForeignScanState *node, int eflags)
--- 1410,1416 ----
  	EState	   *estate = node->ss.ps.state;
  	PgFdwScanState *fsstate;
  	RangeTblEntry *rte;
+ 	ExecRowMark *erm;
  	Oid			userid;
  	ForeignTable *table;
  	UserMapping *user;
***************
*** 1433,1438 **** postgresBeginForeignScan(ForeignScanState *node, int eflags)
--- 1501,1516 ----
  							 &fsstate->param_flinfo,
  							 &fsstate->param_exprs,
  							 &fsstate->param_values);
+ 
+ 	/*
+ 	 * Initialize state for fetching/locking foreign rows if needed.
+ 	 */
+ 	if (fsplan->scan.scanrelid > 0)
+ 	{
+ 		erm = ExecFindRowMark(estate, fsplan->scan.scanrelid, true);
+ 		if (erm && erm->relation && erm->ermExtra == NULL)
+ 			init_foreign_fetch_state(estate, erm, fsplan->fdw_private, eflags);
+ 	}
  }
  
  /*
***************
*** 1539,1545 **** postgresReScanForeignScan(ForeignScanState *node)
--- 1617,1626 ----
  static void
  postgresEndForeignScan(ForeignScanState *node)
  {
+ 	ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
+ 	EState	   *estate = node->ss.ps.state;
  	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+ 	ExecRowMark *erm;
  
  	/* if fsstate is NULL, we are in EXPLAIN; nothing to do */
  	if (fsstate == NULL)
***************
*** 1553,1558 **** postgresEndForeignScan(ForeignScanState *node)
--- 1634,1649 ----
  	ReleaseConnection(fsstate->conn);
  	fsstate->conn = NULL;
  
+ 	/*
+ 	 * Finish state for fetching/locking foreign rows if needed.
+ 	 */
+ 	if (fsplan->scan.scanrelid > 0)
+ 	{
+ 		erm = ExecFindRowMark(estate, fsplan->scan.scanrelid, true);
+ 		if (erm && erm->relation && erm->ermExtra != NULL)
+ 			finish_foreign_fetch_state(estate, erm);
+ 	}
+ 
  	/* MemoryContexts will be deleted automatically. */
  }
  
***************
*** 2401,2406 **** postgresEndDirectModify(ForeignScanState *node)
--- 2492,2600 ----
  	/* MemoryContext will be deleted automatically. */
  }
  
+ /*
+  * postgresGetForeignRowMarkType
+  *		Get rowmark type to use for a particular table
+  */
+ static RowMarkType
+ postgresGetForeignRowMarkType(RangeTblEntry *rte, LockClauseStrength strength)
+ {
+ 	switch (strength)
+ 	{
+ 		case LCS_NONE:
+ 			return ROW_MARK_REFERENCE;
+ 		case LCS_FORKEYSHARE:
+ 			return ROW_MARK_KEYSHARE;
+ 		case LCS_FORSHARE:
+ 			return ROW_MARK_SHARE;
+ 		case LCS_FORNOKEYUPDATE:
+ 			return ROW_MARK_NOKEYEXCLUSIVE;
+ 		case LCS_FORUPDATE:
+ 			return ROW_MARK_EXCLUSIVE;
+ 	}
+ 	return ROW_MARK_COPY;		/* shouldn't happen */
+ }
+ 
+ /*
+  * postgresRefetchForeignRow
+  *		Re-fetch one tuple from a foreign table, possibly locking it
+  */
+ static void
+ postgresRefetchForeignRow(EState *estate,
+ 						  ExecRowMark *erm,
+ 						  Datum rowid,
+ 						  TupleTableSlot *slot,
+ 						  bool *updated)
+ {
+ 	PgFdwFetchState *ffstate = (PgFdwFetchState *) erm->ermExtra;
+ 	ItemPointer tupleid = (ItemPointer) DatumGetPointer(rowid);
+ 	const char **p_values;
+ 	PGresult   *res;
+ 	HeapTuple	tuple;
+ 
+ 	/* Set up the prepared statement on the remote server, if we didn't yet */
+ 	if (!ffstate->p_name)
+ 		ffstate->p_name = setup_prep_stmt(ffstate->conn, ffstate->query);
+ 
+ 	/* Convert parameters needed by prepared statement to text form */
+ 	p_values = convert_prep_stmt_params(tupleid, NULL,
+ 										ffstate->p_nums,
+ 										ffstate->p_flinfo,
+ 										NIL,
+ 										ffstate->temp_cxt);
+ 
+ 	/*
+ 	 * Execute the prepared statement.
+ 	 */
+ 	if (!PQsendQueryPrepared(ffstate->conn,
+ 							 ffstate->p_name,
+ 							 ffstate->p_nums,
+ 							 p_values,
+ 							 NULL,
+ 							 NULL,
+ 							 0))
+ 		pgfdw_report_error(ERROR, NULL, ffstate->conn, false, ffstate->query);
+ 
+ 	/*
+ 	 * Get the result, and check for success.
+ 	 *
+ 	 * We don't use a PG_TRY block here, so be careful not to throw error
+ 	 * without releasing the PGresult.
+ 	 */
+ 	res = pgfdw_get_result(ffstate->conn, ffstate->query);
+ 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ 		pgfdw_report_error(ERROR, res, ffstate->conn, true, ffstate->query);
+ 
+ 	/* PGresult must be released before leaving this function. */
+ 	PG_TRY();
+ 	{
+ 		/* Create the tuple */
+ 		tuple = make_tuple_from_result_row(res, 0,
+ 										   ffstate->rel,
+ 										   ffstate->attinmeta,
+ 										   ffstate->retrieved_attrs,
+ 										   NULL,
+ 										   ffstate->temp_cxt);
+ 		tuple->t_self = *tupleid;
+ 		tuple->t_tableOid = erm->relid;
+ 
+ 		PQclear(res);
+ 		res = NULL;
+ 	}
+ 	PG_CATCH();
+ 	{
+ 		if (res)
+ 			PQclear(res);
+ 		PG_RE_THROW();
+ 	}
+ 	PG_END_TRY();
+ 
+ 	MemoryContextReset(ffstate->temp_cxt);
+ 
+ 	/* Store the tuple into the given slot */
+ 	ExecStoreHeapTuple(tuple, slot, true);
+ }
+ 
  /*
   * postgresExplainForeignScan
   *		Produce extra output for EXPLAIN of a ForeignScan on a foreign table
***************
*** 2966,2971 **** ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
--- 3160,3214 ----
  	return true;
  }
  
+ /*
+  * Create the FDW-private information for fetching/locking foreign rows.
+  */
+ static List *
+ create_foreign_fetch_info(PlannerInfo *root,
+ 						  RelOptInfo *baserel,
+ 						  RowMarkType markType)
+ {
+ 	PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) baserel->fdw_private;
+ 	StringInfoData sql;
+ 	List	   *retrieved_attrs;
+ 	Bitmapset  *attrs_used = NULL;
+ 	Bitmapset  *save_attrs_used;
+ 
+ 	/*
+ 	 * Build the query string to be sent for execution.
+ 	 */
+ 	initStringInfo(&sql);
+ 	/* Add a whole-row var to attrs_used to retrieve all the columns. */
+ 	attrs_used = bms_add_member(attrs_used,
+ 								0 - FirstLowInvalidHeapAttributeNumber);
+ 	save_attrs_used = fpinfo->attrs_used;
+ 	fpinfo->attrs_used = attrs_used;
+ 	deparseSelectStmtForRel(&sql, root, baserel, NIL, NIL, NIL, false,
+ 							&retrieved_attrs, NULL);
+ 	fpinfo->attrs_used = save_attrs_used;
+ 	appendStringInfoString(&sql, " WHERE ctid = $1");
+ 
+ 	switch (markType)
+ 	{
+ 		case ROW_MARK_EXCLUSIVE:
+ 		case ROW_MARK_NOKEYEXCLUSIVE:
+ 			appendStringInfoString(&sql, " FOR UPDATE");
+ 			break;
+ 		case ROW_MARK_SHARE:
+ 		case ROW_MARK_KEYSHARE:
+ 			appendStringInfoString(&sql, " FOR SHARE");
+ 			break;
+ 		default:
+ 			break;
+ 	}
+ 
+ 	/*
+ 	 * Build the fdw_private list that will be available to the executor.
+ 	 * Items in the list must match enum FdwScanPrivateIndex, above.
+ 	 */
+ 	return list_make2(makeString(sql.data), retrieved_attrs);
+ }
+ 
  /*
   * Create cursor for node's query with current parameter values.
   */
***************
*** 3311,3317 **** execute_foreign_modify(EState *estate,
  
  	/* Set up the prepared statement on the remote server, if we didn't yet */
  	if (!fmstate->p_name)
! 		prepare_foreign_modify(fmstate);
  
  	/*
  	 * For UPDATE/DELETE, get the ctid that was passed up as a resjunk column
--- 3554,3560 ----
  
  	/* Set up the prepared statement on the remote server, if we didn't yet */
  	if (!fmstate->p_name)
! 		fmstate->p_name = setup_prep_stmt(fmstate->conn, fmstate->query);
  
  	/*
  	 * For UPDATE/DELETE, get the ctid that was passed up as a resjunk column
***************
*** 3331,3337 **** execute_foreign_modify(EState *estate,
  	}
  
  	/* Convert parameters needed by prepared statement to text form */
! 	p_values = convert_prep_stmt_params(fmstate, ctid, slot);
  
  	/*
  	 * Execute the prepared statement.
--- 3574,3584 ----
  	}
  
  	/* Convert parameters needed by prepared statement to text form */
! 	p_values = convert_prep_stmt_params(ctid, slot,
! 										fmstate->p_nums,
! 										fmstate->p_flinfo,
! 										fmstate->target_attrs,
! 										fmstate->temp_cxt);
  
  	/*
  	 * Execute the prepared statement.
***************
*** 3378,3388 **** execute_foreign_modify(EState *estate,
  }
  
  /*
!  * prepare_foreign_modify
   *		Establish a prepared statement for execution of INSERT/UPDATE/DELETE
   */
! static void
! prepare_foreign_modify(PgFdwModifyState *fmstate)
  {
  	char		prep_name[NAMEDATALEN];
  	char	   *p_name;
--- 3625,3636 ----
  }
  
  /*
!  * setup_prep_stmt
   *		Establish a prepared statement for execution of INSERT/UPDATE/DELETE
+  *		or re-fetching tuples for EvalPlanQual rechecking
   */
! static char *
! setup_prep_stmt(PGconn *conn, char *query)
  {
  	char		prep_name[NAMEDATALEN];
  	char	   *p_name;
***************
*** 3390,3396 **** prepare_foreign_modify(PgFdwModifyState *fmstate)
  
  	/* Construct name we'll use for the prepared statement. */
  	snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u",
! 			 GetPrepStmtNumber(fmstate->conn));
  	p_name = pstrdup(prep_name);
  
  	/*
--- 3638,3644 ----
  
  	/* Construct name we'll use for the prepared statement. */
  	snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u",
! 			 GetPrepStmtNumber(conn));
  	p_name = pstrdup(prep_name);
  
  	/*
***************
*** 3400,3411 **** prepare_foreign_modify(PgFdwModifyState *fmstate)
  	 * the prepared statements we use in this module are simple enough that
  	 * the remote server will make the right choices.
  	 */
! 	if (!PQsendPrepare(fmstate->conn,
! 					   p_name,
! 					   fmstate->query,
! 					   0,
! 					   NULL))
! 		pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
  
  	/*
  	 * Get the result, and check for success.
--- 3648,3655 ----
  	 * the prepared statements we use in this module are simple enough that
  	 * the remote server will make the right choices.
  	 */
! 	if (!PQsendPrepare(conn, p_name, query, 0, NULL))
! 		pgfdw_report_error(ERROR, NULL, conn, false, query);
  
  	/*
  	 * Get the result, and check for success.
***************
*** 3413,3425 **** prepare_foreign_modify(PgFdwModifyState *fmstate)
  	 * We don't use a PG_TRY block here, so be careful not to throw error
  	 * without releasing the PGresult.
  	 */
! 	res = pgfdw_get_result(fmstate->conn, fmstate->query);
  	if (PQresultStatus(res) != PGRES_COMMAND_OK)
! 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
  	PQclear(res);
  
  	/* This action shows that the prepare has been done. */
! 	fmstate->p_name = p_name;
  }
  
  /*
--- 3657,3669 ----
  	 * We don't use a PG_TRY block here, so be careful not to throw error
  	 * without releasing the PGresult.
  	 */
! 	res = pgfdw_get_result(conn, query);
  	if (PQresultStatus(res) != PGRES_COMMAND_OK)
! 		pgfdw_report_error(ERROR, res, conn, true, query);
  	PQclear(res);
  
  	/* This action shows that the prepare has been done. */
! 	return p_name;
  }
  
  /*
***************
*** 3432,3467 **** prepare_foreign_modify(PgFdwModifyState *fmstate)
   * Data is constructed in temp_cxt; caller should reset that after use.
   */
  static const char **
! convert_prep_stmt_params(PgFdwModifyState *fmstate,
! 						 ItemPointer tupleid,
! 						 TupleTableSlot *slot)
  {
  	const char **p_values;
  	int			pindex = 0;
  	MemoryContext oldcontext;
  
! 	oldcontext = MemoryContextSwitchTo(fmstate->temp_cxt);
  
! 	p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums);
  
  	/* 1st parameter should be ctid, if it's in use */
  	if (tupleid != NULL)
  	{
  		/* don't need set_transmission_modes for TID output */
! 		p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
  											  PointerGetDatum(tupleid));
  		pindex++;
  	}
  
  	/* get following parameters from slot */
! 	if (slot != NULL && fmstate->target_attrs != NIL)
  	{
  		int			nestlevel;
  		ListCell   *lc;
  
  		nestlevel = set_transmission_modes();
  
! 		foreach(lc, fmstate->target_attrs)
  		{
  			int			attnum = lfirst_int(lc);
  			Datum		value;
--- 3676,3714 ----
   * Data is constructed in temp_cxt; caller should reset that after use.
   */
  static const char **
! convert_prep_stmt_params(ItemPointer tupleid,
! 						 TupleTableSlot *slot,
! 						 int p_nums,
! 						 FmgrInfo *p_flinfo,
! 						 List *target_attrs,
! 						 MemoryContext temp_context)
  {
  	const char **p_values;
  	int			pindex = 0;
  	MemoryContext oldcontext;
  
! 	oldcontext = MemoryContextSwitchTo(temp_context);
  
! 	p_values = (const char **) palloc(sizeof(char *) * p_nums);
  
  	/* 1st parameter should be ctid, if it's in use */
  	if (tupleid != NULL)
  	{
  		/* don't need set_transmission_modes for TID output */
! 		p_values[pindex] = OutputFunctionCall(&p_flinfo[pindex],
  											  PointerGetDatum(tupleid));
  		pindex++;
  	}
  
  	/* get following parameters from slot */
! 	if (slot != NULL && target_attrs != NIL)
  	{
  		int			nestlevel;
  		ListCell   *lc;
  
  		nestlevel = set_transmission_modes();
  
! 		foreach(lc, target_attrs)
  		{
  			int			attnum = lfirst_int(lc);
  			Datum		value;
***************
*** 3471,3477 **** convert_prep_stmt_params(PgFdwModifyState *fmstate,
  			if (isnull)
  				p_values[pindex] = NULL;
  			else
! 				p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
  													  value);
  			pindex++;
  		}
--- 3718,3724 ----
  			if (isnull)
  				p_values[pindex] = NULL;
  			else
! 				p_values[pindex] = OutputFunctionCall(&p_flinfo[pindex],
  													  value);
  			pindex++;
  		}
***************
*** 3479,3485 **** convert_prep_stmt_params(PgFdwModifyState *fmstate,
  		reset_transmission_modes(nestlevel);
  	}
  
! 	Assert(pindex == fmstate->p_nums);
  
  	MemoryContextSwitchTo(oldcontext);
  
--- 3726,3732 ----
  		reset_transmission_modes(nestlevel);
  	}
  
! 	Assert(pindex == p_nums);
  
  	MemoryContextSwitchTo(oldcontext);
  
***************
*** 4067,4072 **** process_query_params(ExprContext *econtext,
--- 4314,4418 ----
  	reset_transmission_modes(nestlevel);
  }
  
+ /*
+  * init_foreign_fetch_state
+  *		Initialize an execution state for fetching/locking foreign rows
+  */
+ static void
+ init_foreign_fetch_state(EState *estate,
+ 						 ExecRowMark *erm,
+ 						 List *fdw_private,
+ 						 int eflags)
+ {
+ 	PgFdwFetchState *ffstate;
+ 	Relation	rel = erm->relation;
+ 	RangeTblEntry *rte;
+ 	Oid			userid;
+ 	ForeignTable *table;
+ 	UserMapping *user;
+ 	Oid			typefnoid;
+ 	bool		isvarlena;
+ 
+ 	/* Begin constructing PgFdwFetchState. */
+ 	ffstate = (PgFdwFetchState *) palloc0(sizeof(PgFdwFetchState));
+ 	ffstate->rel = rel;
+ 
+ 	/*
+ 	 * Identify which user to do the remote access as.  This should match what
+ 	 * ExecCheckRTEPerms() does.
+ 	 */
+ 	rte = rt_fetch(erm->rti, estate->es_range_table);
+ 	userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
+ 
+ 	/* Get info about foreign table. */
+ 	table = GetForeignTable(RelationGetRelid(rel));
+ 	user = GetUserMapping(userid, table->serverid);
+ 
+ 	/* Open connection; report that we'll create a prepared statement. */
+ 	ffstate->conn = GetConnection(user, true);
+ 	ffstate->p_name = NULL;		/* prepared statement not made yet */
+ 
+ 	/* Deconstruct fdw_private data. */
+ 	ffstate->query = strVal(list_nth(fdw_private,
+ 									 FdwScanPrivateSelectSql2));
+ 	ffstate->retrieved_attrs = (List *) list_nth(fdw_private,
+ 											  FdwScanPrivateRetrievedAttrs2);
+ 
+ 	/* Create context for per-tuple temp workspace. */
+ 	ffstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
+ 											  "postgres_fdw temporary data",
+ 											  ALLOCSET_SMALL_MINSIZE,
+ 											  ALLOCSET_SMALL_INITSIZE,
+ 											  ALLOCSET_SMALL_MAXSIZE);
+ 
+ 	/* Prepare for input conversion of SELECT results. */
+ 	ffstate->attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(rel));
+ 
+ 	/* Prepare for output conversion of parameters used in prepared stmt. */
+ 	ffstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo));
+ 	ffstate->p_nums = 0;
+ 
+ 	/* Only one transmittable parameter will be ctid */
+ 	getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena);
+ 	fmgr_info(typefnoid, &ffstate->p_flinfo[ffstate->p_nums]);
+ 	ffstate->p_nums++;
+ 
+ 	erm->ermExtra = ffstate;
+ }
+ 
+ /*
+  * finish_foreign_fetch_state
+  *		Finish an execution state for fetching/locking foreign rows
+  */
+ static void
+ finish_foreign_fetch_state(EState *estate, ExecRowMark *erm)
+ {
+ 	PgFdwFetchState *ffstate = (PgFdwFetchState *) erm->ermExtra;
+ 
+ 	/* If we created a prepared statement, destroy it */
+ 	if (ffstate->p_name)
+ 	{
+ 		char		sql[64];
+ 		PGresult   *res;
+ 
+ 		snprintf(sql, sizeof(sql), "DEALLOCATE %s", ffstate->p_name);
+ 
+ 		/*
+ 		 * We don't use a PG_TRY block here, so be careful not to throw error
+ 		 * without releasing the PGresult.
+ 		 */
+ 		res = PQexec(ffstate->conn, sql);
+ 		if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ 			pgfdw_report_error(ERROR, res, ffstate->conn, true, sql);
+ 		PQclear(res);
+ 		ffstate->p_name = NULL;
+ 	}
+ 
+ 	/* Release remote connection */
+ 	ReleaseConnection(ffstate->conn);
+ 	ffstate->conn = NULL;
+ }
+ 
  /*
   * postgresAnalyzeForeignTable
   *		Test whether analyzing this foreign table is supported
***************
*** 5099,5106 **** postgresGetForeignJoinPaths(PlannerInfo *root,
  	int			width;
  	Cost		startup_cost;
  	Cost		total_cost;
! 	Path	   *epq_path;		/* Path to create plan to be executed when
! 								 * EvalPlanQual gets triggered. */
  
  	/*
  	 * Skip if this join combination has been considered already.
--- 5445,5452 ----
  	int			width;
  	Cost		startup_cost;
  	Cost		total_cost;
! 	Path	   *epq_path = NULL;	/* Path to create plan to be executed when
! 									 * EvalPlanQual gets triggered. */
  
  	/*
  	 * Skip if this join combination has been considered already.
***************
*** 5108,5113 **** postgresGetForeignJoinPaths(PlannerInfo *root,
--- 5454,5468 ----
  	if (joinrel->fdw_private)
  		return;
  
+ 	/*
+ 	 * Don't allow pushing down joins to the remote if there is a possibility
+ 	 * that EvalPlanQual will be executed.
+ 	 */
+ 	if (root->parse->commandType == CMD_DELETE ||
+ 		root->parse->commandType == CMD_UPDATE ||
+ 		root->rowMarks)
+ 		return;
+ 
  	/*
  	 * This code does not work for joins with lateral references, since those
  	 * must have parameterized paths, which we don't generate yet.
***************
*** 5128,5157 **** postgresGetForeignJoinPaths(PlannerInfo *root,
  	/* attrs_used is only for base relations. */
  	fpinfo->attrs_used = NULL;
  
- 	/*
- 	 * If there is a possibility that EvalPlanQual will be executed, we need
- 	 * to be able to reconstruct the row using scans of the base relations.
- 	 * GetExistingLocalJoinPath will find a suitable path for this purpose in
- 	 * the path list of the joinrel, if one exists.  We must be careful to
- 	 * call it before adding any ForeignPath, since the ForeignPath might
- 	 * dominate the only suitable local path available.  We also do it before
- 	 * calling foreign_join_ok(), since that function updates fpinfo and marks
- 	 * it as pushable if the join is found to be pushable.
- 	 */
- 	if (root->parse->commandType == CMD_DELETE ||
- 		root->parse->commandType == CMD_UPDATE ||
- 		root->rowMarks)
- 	{
- 		epq_path = GetExistingLocalJoinPath(joinrel);
- 		if (!epq_path)
- 		{
- 			elog(DEBUG3, "could not push down foreign join because a local path suitable for EPQ checks was not found");
- 			return;
- 		}
- 	}
- 	else
- 		epq_path = NULL;
- 
  	if (!foreign_join_ok(root, joinrel, jointype, outerrel, innerrel, extra))
  	{
  		/* Free path required for EPQ if we copied one; we don't need it now */
--- 5483,5488 ----

Reply via email to