(2019/03/04 12:10), Etsuro Fujita wrote:
(2019/03/02 3:57), Andres Freund wrote:
FWIW, I pushed the EPQ patch, doing this conversion blindly. It'd be
awesome if you'd check that it actually works...
I'll start the work later this week. I think I can post an (initial)
report on that next week, maybe.
Here is an updated version of the patch [1]. This version doesn't allow
pushing down joins to the remote if there is a possibility that EPQ will
be executed, but I think it would be useful to test the EPQ patch. I
haven't looked into the EPQ patch in detail yet, but I tested the patch
with the attached, and couldn't find any issues on the patch.
Best regards,
Etsuro Fujita
[1] https://www.postgresql.org/message-id/16016.1431455074%40sss.pgh.pa.us
*** a/contrib/postgres_fdw/deparse.c
--- b/contrib/postgres_fdw/deparse.c
***************
*** 1216,1222 **** deparseLockingClause(deparse_expr_cxt *context)
{
PlanRowMark *rc = get_plan_rowmark(root->rowMarks, relid);
! if (rc)
{
/*
* Relation is specified as a FOR UPDATE/SHARE target, so
--- 1216,1222 ----
{
PlanRowMark *rc = get_plan_rowmark(root->rowMarks, relid);
! if (rc && rc->markType == ROW_MARK_COPY)
{
/*
* Relation is specified as a FOR UPDATE/SHARE target, so
*** a/contrib/postgres_fdw/postgres_fdw.c
--- b/contrib/postgres_fdw/postgres_fdw.c
***************
*** 32,37 ****
--- 32,38 ----
#include "optimizer/pathnode.h"
#include "optimizer/paths.h"
#include "optimizer/planmain.h"
+ #include "optimizer/prep.h"
#include "optimizer/restrictinfo.h"
#include "optimizer/tlist.h"
#include "parser/parsetree.h"
***************
*** 70,75 **** enum FdwScanPrivateIndex
--- 71,80 ----
FdwScanPrivateRetrievedAttrs,
/* Integer representing the desired fetch_size */
FdwScanPrivateFetchSize,
+ /* SQL statement to execute remotely (as a String node) */
+ FdwScanPrivateSelectSql2,
+ /* Integer list of attribute numbers retrieved by SELECT */
+ FdwScanPrivateRetrievedAttrs2,
/*
* String describing join i.e. names of relations being joined and types
***************
*** 222,227 **** typedef struct PgFdwDirectModifyState
--- 227,256 ----
MemoryContext temp_cxt; /* context for per-tuple temporary data */
} PgFdwDirectModifyState;
+ /*
+ * Execution state for fetching/locking foreign rows.
+ */
+ typedef struct PgFdwFetchState
+ {
+ Relation rel; /* relcache entry for the foreign table */
+ AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
+
+ /* for remote query execution */
+ PGconn *conn; /* connection for the fetch */
+ char *p_name; /* name of prepared statement, if created */
+
+ /* extracted fdw_private data */
+ char *query; /* text of SELECT command */
+ List *retrieved_attrs; /* attr numbers retrieved by SELECT */
+
+ /* info about parameters for prepared statement */
+ int p_nums; /* number of parameters to transmit */
+ FmgrInfo *p_flinfo; /* output conversion functions for them */
+
+ /* working memory context */
+ MemoryContext temp_cxt; /* context for per-tuple temporary data */
+ } PgFdwFetchState;
+
/*
* Workspace for analyzing a foreign table.
*/
***************
*** 333,338 **** static bool postgresPlanDirectModify(PlannerInfo *root,
--- 362,374 ----
static void postgresBeginDirectModify(ForeignScanState *node, int eflags);
static TupleTableSlot *postgresIterateDirectModify(ForeignScanState *node);
static void postgresEndDirectModify(ForeignScanState *node);
+ static RowMarkType postgresGetForeignRowMarkType(RangeTblEntry *rte,
+ LockClauseStrength strength);
+ static void postgresRefetchForeignRow(EState *estate,
+ ExecRowMark *erm,
+ Datum rowid,
+ TupleTableSlot *slot,
+ bool *updated);
static void postgresExplainForeignScan(ForeignScanState *node,
ExplainState *es);
static void postgresExplainForeignModify(ModifyTableState *mtstate,
***************
*** 379,384 **** static void get_remote_estimate(const char *sql,
--- 415,423 ----
static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
EquivalenceClass *ec, EquivalenceMember *em,
void *arg);
+ static List *create_foreign_fetch_info(PlannerInfo *root,
+ RelOptInfo *baserel,
+ RowMarkType markType);
static void create_cursor(ForeignScanState *node);
static void fetch_more_data(ForeignScanState *node);
static void close_cursor(PGconn *conn, unsigned int cursor_number);
***************
*** 396,405 **** static TupleTableSlot *execute_foreign_modify(EState *estate,
CmdType operation,
TupleTableSlot *slot,
TupleTableSlot *planSlot);
! static void prepare_foreign_modify(PgFdwModifyState *fmstate);
! static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
! ItemPointer tupleid,
! TupleTableSlot *slot);
static void store_returning_result(PgFdwModifyState *fmstate,
TupleTableSlot *slot, PGresult *res);
static void finish_foreign_modify(PgFdwModifyState *fmstate);
--- 435,447 ----
CmdType operation,
TupleTableSlot *slot,
TupleTableSlot *planSlot);
! static char *setup_prep_stmt(PGconn *conn, char *query);
! static const char **convert_prep_stmt_params(ItemPointer tupleid,
! TupleTableSlot *slot,
! int p_nums,
! FmgrInfo *p_flinfo,
! List *target_attrs,
! MemoryContext temp_context);
static void store_returning_result(PgFdwModifyState *fmstate,
TupleTableSlot *slot, PGresult *res);
static void finish_foreign_modify(PgFdwModifyState *fmstate);
***************
*** 424,429 **** static void process_query_params(ExprContext *econtext,
--- 466,476 ----
FmgrInfo *param_flinfo,
List *param_exprs,
const char **param_values);
+ static void init_foreign_fetch_state(EState *estate,
+ ExecRowMark *erm,
+ List *fdw_private,
+ int eflags);
+ static void finish_foreign_fetch_state(EState *estate, ExecRowMark *erm);
static int postgresAcquireSampleRowsFunc(Relation relation, int elevel,
HeapTuple *rows, int targrows,
double *totalrows,
***************
*** 493,500 **** postgres_fdw_handler(PG_FUNCTION_ARGS)
routine->IterateDirectModify = postgresIterateDirectModify;
routine->EndDirectModify = postgresEndDirectModify;
! /* Function for EvalPlanQual rechecks */
routine->RecheckForeignScan = postgresRecheckForeignScan;
/* Support functions for EXPLAIN */
routine->ExplainForeignScan = postgresExplainForeignScan;
routine->ExplainForeignModify = postgresExplainForeignModify;
--- 540,550 ----
routine->IterateDirectModify = postgresIterateDirectModify;
routine->EndDirectModify = postgresEndDirectModify;
! /* Functions for SELECT FOR UPDATE/SHARE row locking */
! routine->GetForeignRowMarkType = postgresGetForeignRowMarkType;
! routine->RefetchForeignRow = postgresRefetchForeignRow;
routine->RecheckForeignScan = postgresRecheckForeignScan;
+
/* Support functions for EXPLAIN */
routine->ExplainForeignScan = postgresExplainForeignScan;
routine->ExplainForeignModify = postgresExplainForeignModify;
***************
*** 1149,1154 **** postgresGetForeignPlan(PlannerInfo *root,
--- 1199,1206 ----
List *fdw_recheck_quals = NIL;
List *retrieved_attrs;
StringInfoData sql;
+ PlanRowMark *rc = NULL;
+ List *fdw_private2 = NIL;
ListCell *lc;
if (IS_SIMPLE_REL(foreignrel))
***************
*** 1311,1316 **** postgresGetForeignPlan(PlannerInfo *root,
--- 1363,1383 ----
fdw_private = list_make3(makeString(sql.data),
retrieved_attrs,
makeInteger(fpinfo->fetch_size));
+
+ /* Append information about late row locking for rowmarked rels. */
+ if (root->rowMarks && IS_SIMPLE_REL(foreignrel))
+ rc = get_plan_rowmark(root->rowMarks, foreignrel->relid);
+ if (rc)
+ {
+ Assert(rc->markType != ROW_MARK_COPY);
+ fdw_private2 = create_foreign_fetch_info(root,
+ foreignrel,
+ rc->markType);
+ }
+ else
+ fdw_private2 = list_make2(NULL, NIL);
+ fdw_private = list_concat(fdw_private, fdw_private2);
+
if (IS_JOIN_REL(foreignrel) || IS_UPPER_REL(foreignrel))
fdw_private = lappend(fdw_private,
makeString(fpinfo->relation_name->data));
***************
*** 1343,1348 **** postgresBeginForeignScan(ForeignScanState *node, int eflags)
--- 1410,1416 ----
EState *estate = node->ss.ps.state;
PgFdwScanState *fsstate;
RangeTblEntry *rte;
+ ExecRowMark *erm;
Oid userid;
ForeignTable *table;
UserMapping *user;
***************
*** 1433,1438 **** postgresBeginForeignScan(ForeignScanState *node, int eflags)
--- 1501,1516 ----
&fsstate->param_flinfo,
&fsstate->param_exprs,
&fsstate->param_values);
+
+ /*
+ * Initialize state for fetching/locking foreign rows if needed.
+ */
+ if (fsplan->scan.scanrelid > 0)
+ {
+ erm = ExecFindRowMark(estate, fsplan->scan.scanrelid, true);
+ if (erm && erm->relation && erm->ermExtra == NULL)
+ init_foreign_fetch_state(estate, erm, fsplan->fdw_private, eflags);
+ }
}
/*
***************
*** 1539,1545 **** postgresReScanForeignScan(ForeignScanState *node)
--- 1617,1626 ----
static void
postgresEndForeignScan(ForeignScanState *node)
{
+ ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
+ EState *estate = node->ss.ps.state;
PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+ ExecRowMark *erm;
/* if fsstate is NULL, we are in EXPLAIN; nothing to do */
if (fsstate == NULL)
***************
*** 1553,1558 **** postgresEndForeignScan(ForeignScanState *node)
--- 1634,1649 ----
ReleaseConnection(fsstate->conn);
fsstate->conn = NULL;
+ /*
+ * Finish state for fetching/locking foreign rows if needed.
+ */
+ if (fsplan->scan.scanrelid > 0)
+ {
+ erm = ExecFindRowMark(estate, fsplan->scan.scanrelid, true);
+ if (erm && erm->relation && erm->ermExtra != NULL)
+ finish_foreign_fetch_state(estate, erm);
+ }
+
/* MemoryContexts will be deleted automatically. */
}
***************
*** 2401,2406 **** postgresEndDirectModify(ForeignScanState *node)
--- 2492,2600 ----
/* MemoryContext will be deleted automatically. */
}
+ /*
+ * postgresGetForeignRowMarkType
+ * Get rowmark type to use for a particular table
+ */
+ static RowMarkType
+ postgresGetForeignRowMarkType(RangeTblEntry *rte, LockClauseStrength strength)
+ {
+ switch (strength)
+ {
+ case LCS_NONE:
+ return ROW_MARK_REFERENCE;
+ case LCS_FORKEYSHARE:
+ return ROW_MARK_KEYSHARE;
+ case LCS_FORSHARE:
+ return ROW_MARK_SHARE;
+ case LCS_FORNOKEYUPDATE:
+ return ROW_MARK_NOKEYEXCLUSIVE;
+ case LCS_FORUPDATE:
+ return ROW_MARK_EXCLUSIVE;
+ }
+ return ROW_MARK_COPY; /* shouldn't happen */
+ }
+
+ /*
+ * postgresRefetchForeignRow
+ * Re-fetch one tuple from a foreign table, possibly locking it
+ */
+ static void
+ postgresRefetchForeignRow(EState *estate,
+ ExecRowMark *erm,
+ Datum rowid,
+ TupleTableSlot *slot,
+ bool *updated)
+ {
+ PgFdwFetchState *ffstate = (PgFdwFetchState *) erm->ermExtra;
+ ItemPointer tupleid = (ItemPointer) DatumGetPointer(rowid);
+ const char **p_values;
+ PGresult *res;
+ HeapTuple tuple;
+
+ /* Set up the prepared statement on the remote server, if we didn't yet */
+ if (!ffstate->p_name)
+ ffstate->p_name = setup_prep_stmt(ffstate->conn, ffstate->query);
+
+ /* Convert parameters needed by prepared statement to text form */
+ p_values = convert_prep_stmt_params(tupleid, NULL,
+ ffstate->p_nums,
+ ffstate->p_flinfo,
+ NIL,
+ ffstate->temp_cxt);
+
+ /*
+ * Execute the prepared statement.
+ */
+ if (!PQsendQueryPrepared(ffstate->conn,
+ ffstate->p_name,
+ ffstate->p_nums,
+ p_values,
+ NULL,
+ NULL,
+ 0))
+ pgfdw_report_error(ERROR, NULL, ffstate->conn, false, ffstate->query);
+
+ /*
+ * Get the result, and check for success.
+ *
+ * We don't use a PG_TRY block here, so be careful not to throw error
+ * without releasing the PGresult.
+ */
+ res = pgfdw_get_result(ffstate->conn, ffstate->query);
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ pgfdw_report_error(ERROR, res, ffstate->conn, true, ffstate->query);
+
+ /* PGresult must be released before leaving this function. */
+ PG_TRY();
+ {
+ /* Create the tuple */
+ tuple = make_tuple_from_result_row(res, 0,
+ ffstate->rel,
+ ffstate->attinmeta,
+ ffstate->retrieved_attrs,
+ NULL,
+ ffstate->temp_cxt);
+ tuple->t_self = *tupleid;
+ tuple->t_tableOid = erm->relid;
+
+ PQclear(res);
+ res = NULL;
+ }
+ PG_CATCH();
+ {
+ if (res)
+ PQclear(res);
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+
+ MemoryContextReset(ffstate->temp_cxt);
+
+ /* Store the tuple into the given slot */
+ ExecStoreHeapTuple(tuple, slot, true);
+ }
+
/*
* postgresExplainForeignScan
* Produce extra output for EXPLAIN of a ForeignScan on a foreign table
***************
*** 2966,2971 **** ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
--- 3160,3214 ----
return true;
}
+ /*
+ * Create the FDW-private information for fetching/locking foreign rows.
+ */
+ static List *
+ create_foreign_fetch_info(PlannerInfo *root,
+ RelOptInfo *baserel,
+ RowMarkType markType)
+ {
+ PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) baserel->fdw_private;
+ StringInfoData sql;
+ List *retrieved_attrs;
+ Bitmapset *attrs_used = NULL;
+ Bitmapset *save_attrs_used;
+
+ /*
+ * Build the query string to be sent for execution.
+ */
+ initStringInfo(&sql);
+ /* Add a whole-row var to attrs_used to retrieve all the columns. */
+ attrs_used = bms_add_member(attrs_used,
+ 0 - FirstLowInvalidHeapAttributeNumber);
+ save_attrs_used = fpinfo->attrs_used;
+ fpinfo->attrs_used = attrs_used;
+ deparseSelectStmtForRel(&sql, root, baserel, NIL, NIL, NIL, false,
+ &retrieved_attrs, NULL);
+ fpinfo->attrs_used = save_attrs_used;
+ appendStringInfoString(&sql, " WHERE ctid = $1");
+
+ switch (markType)
+ {
+ case ROW_MARK_EXCLUSIVE:
+ case ROW_MARK_NOKEYEXCLUSIVE:
+ appendStringInfoString(&sql, " FOR UPDATE");
+ break;
+ case ROW_MARK_SHARE:
+ case ROW_MARK_KEYSHARE:
+ appendStringInfoString(&sql, " FOR SHARE");
+ break;
+ default:
+ break;
+ }
+
+ /*
+ * Build the fdw_private list that will be available to the executor.
+ * Items in the list must match enum FdwScanPrivateIndex, above.
+ */
+ return list_make2(makeString(sql.data), retrieved_attrs);
+ }
+
/*
* Create cursor for node's query with current parameter values.
*/
***************
*** 3311,3317 **** execute_foreign_modify(EState *estate,
/* Set up the prepared statement on the remote server, if we didn't yet */
if (!fmstate->p_name)
! prepare_foreign_modify(fmstate);
/*
* For UPDATE/DELETE, get the ctid that was passed up as a resjunk column
--- 3554,3560 ----
/* Set up the prepared statement on the remote server, if we didn't yet */
if (!fmstate->p_name)
! fmstate->p_name = setup_prep_stmt(fmstate->conn, fmstate->query);
/*
* For UPDATE/DELETE, get the ctid that was passed up as a resjunk column
***************
*** 3331,3337 **** execute_foreign_modify(EState *estate,
}
/* Convert parameters needed by prepared statement to text form */
! p_values = convert_prep_stmt_params(fmstate, ctid, slot);
/*
* Execute the prepared statement.
--- 3574,3584 ----
}
/* Convert parameters needed by prepared statement to text form */
! p_values = convert_prep_stmt_params(ctid, slot,
! fmstate->p_nums,
! fmstate->p_flinfo,
! fmstate->target_attrs,
! fmstate->temp_cxt);
/*
* Execute the prepared statement.
***************
*** 3378,3388 **** execute_foreign_modify(EState *estate,
}
/*
! * prepare_foreign_modify
* Establish a prepared statement for execution of INSERT/UPDATE/DELETE
*/
! static void
! prepare_foreign_modify(PgFdwModifyState *fmstate)
{
char prep_name[NAMEDATALEN];
char *p_name;
--- 3625,3636 ----
}
/*
! * setup_prep_stmt
* Establish a prepared statement for execution of INSERT/UPDATE/DELETE
+ * or re-fetching tuples for EvalPlanQual rechecking
*/
! static char *
! setup_prep_stmt(PGconn *conn, char *query)
{
char prep_name[NAMEDATALEN];
char *p_name;
***************
*** 3390,3396 **** prepare_foreign_modify(PgFdwModifyState *fmstate)
/* Construct name we'll use for the prepared statement. */
snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u",
! GetPrepStmtNumber(fmstate->conn));
p_name = pstrdup(prep_name);
/*
--- 3638,3644 ----
/* Construct name we'll use for the prepared statement. */
snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u",
! GetPrepStmtNumber(conn));
p_name = pstrdup(prep_name);
/*
***************
*** 3400,3411 **** prepare_foreign_modify(PgFdwModifyState *fmstate)
* the prepared statements we use in this module are simple enough that
* the remote server will make the right choices.
*/
! if (!PQsendPrepare(fmstate->conn,
! p_name,
! fmstate->query,
! 0,
! NULL))
! pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
/*
* Get the result, and check for success.
--- 3648,3655 ----
* the prepared statements we use in this module are simple enough that
* the remote server will make the right choices.
*/
! if (!PQsendPrepare(conn, p_name, query, 0, NULL))
! pgfdw_report_error(ERROR, NULL, conn, false, query);
/*
* Get the result, and check for success.
***************
*** 3413,3425 **** prepare_foreign_modify(PgFdwModifyState *fmstate)
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
! res = pgfdw_get_result(fmstate->conn, fmstate->query);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
! pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
PQclear(res);
/* This action shows that the prepare has been done. */
! fmstate->p_name = p_name;
}
/*
--- 3657,3669 ----
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
! res = pgfdw_get_result(conn, query);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
! pgfdw_report_error(ERROR, res, conn, true, query);
PQclear(res);
/* This action shows that the prepare has been done. */
! return p_name;
}
/*
***************
*** 3432,3467 **** prepare_foreign_modify(PgFdwModifyState *fmstate)
* Data is constructed in temp_cxt; caller should reset that after use.
*/
static const char **
! convert_prep_stmt_params(PgFdwModifyState *fmstate,
! ItemPointer tupleid,
! TupleTableSlot *slot)
{
const char **p_values;
int pindex = 0;
MemoryContext oldcontext;
! oldcontext = MemoryContextSwitchTo(fmstate->temp_cxt);
! p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums);
/* 1st parameter should be ctid, if it's in use */
if (tupleid != NULL)
{
/* don't need set_transmission_modes for TID output */
! p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
PointerGetDatum(tupleid));
pindex++;
}
/* get following parameters from slot */
! if (slot != NULL && fmstate->target_attrs != NIL)
{
int nestlevel;
ListCell *lc;
nestlevel = set_transmission_modes();
! foreach(lc, fmstate->target_attrs)
{
int attnum = lfirst_int(lc);
Datum value;
--- 3676,3714 ----
* Data is constructed in temp_cxt; caller should reset that after use.
*/
static const char **
! convert_prep_stmt_params(ItemPointer tupleid,
! TupleTableSlot *slot,
! int p_nums,
! FmgrInfo *p_flinfo,
! List *target_attrs,
! MemoryContext temp_context)
{
const char **p_values;
int pindex = 0;
MemoryContext oldcontext;
! oldcontext = MemoryContextSwitchTo(temp_context);
! p_values = (const char **) palloc(sizeof(char *) * p_nums);
/* 1st parameter should be ctid, if it's in use */
if (tupleid != NULL)
{
/* don't need set_transmission_modes for TID output */
! p_values[pindex] = OutputFunctionCall(&p_flinfo[pindex],
PointerGetDatum(tupleid));
pindex++;
}
/* get following parameters from slot */
! if (slot != NULL && target_attrs != NIL)
{
int nestlevel;
ListCell *lc;
nestlevel = set_transmission_modes();
! foreach(lc, target_attrs)
{
int attnum = lfirst_int(lc);
Datum value;
***************
*** 3471,3477 **** convert_prep_stmt_params(PgFdwModifyState *fmstate,
if (isnull)
p_values[pindex] = NULL;
else
! p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
value);
pindex++;
}
--- 3718,3724 ----
if (isnull)
p_values[pindex] = NULL;
else
! p_values[pindex] = OutputFunctionCall(&p_flinfo[pindex],
value);
pindex++;
}
***************
*** 3479,3485 **** convert_prep_stmt_params(PgFdwModifyState *fmstate,
reset_transmission_modes(nestlevel);
}
! Assert(pindex == fmstate->p_nums);
MemoryContextSwitchTo(oldcontext);
--- 3726,3732 ----
reset_transmission_modes(nestlevel);
}
! Assert(pindex == p_nums);
MemoryContextSwitchTo(oldcontext);
***************
*** 4067,4072 **** process_query_params(ExprContext *econtext,
--- 4314,4418 ----
reset_transmission_modes(nestlevel);
}
+ /*
+ * init_foreign_fetch_state
+ * Initialize an execution state for fetching/locking foreign rows
+ */
+ static void
+ init_foreign_fetch_state(EState *estate,
+ ExecRowMark *erm,
+ List *fdw_private,
+ int eflags)
+ {
+ PgFdwFetchState *ffstate;
+ Relation rel = erm->relation;
+ RangeTblEntry *rte;
+ Oid userid;
+ ForeignTable *table;
+ UserMapping *user;
+ Oid typefnoid;
+ bool isvarlena;
+
+ /* Begin constructing PgFdwFetchState. */
+ ffstate = (PgFdwFetchState *) palloc0(sizeof(PgFdwFetchState));
+ ffstate->rel = rel;
+
+ /*
+ * Identify which user to do the remote access as. This should match what
+ * ExecCheckRTEPerms() does.
+ */
+ rte = rt_fetch(erm->rti, estate->es_range_table);
+ userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
+
+ /* Get info about foreign table. */
+ table = GetForeignTable(RelationGetRelid(rel));
+ user = GetUserMapping(userid, table->serverid);
+
+ /* Open connection; report that we'll create a prepared statement. */
+ ffstate->conn = GetConnection(user, true);
+ ffstate->p_name = NULL; /* prepared statement not made yet */
+
+ /* Deconstruct fdw_private data. */
+ ffstate->query = strVal(list_nth(fdw_private,
+ FdwScanPrivateSelectSql2));
+ ffstate->retrieved_attrs = (List *) list_nth(fdw_private,
+ FdwScanPrivateRetrievedAttrs2);
+
+ /* Create context for per-tuple temp workspace. */
+ ffstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
+ "postgres_fdw temporary data",
+ ALLOCSET_SMALL_MINSIZE,
+ ALLOCSET_SMALL_INITSIZE,
+ ALLOCSET_SMALL_MAXSIZE);
+
+ /* Prepare for input conversion of SELECT results. */
+ ffstate->attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(rel));
+
+ /* Prepare for output conversion of parameters used in prepared stmt. */
+ ffstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo));
+ ffstate->p_nums = 0;
+
+ /* Only one transmittable parameter will be ctid */
+ getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena);
+ fmgr_info(typefnoid, &ffstate->p_flinfo[ffstate->p_nums]);
+ ffstate->p_nums++;
+
+ erm->ermExtra = ffstate;
+ }
+
+ /*
+ * finish_foreign_fetch_state
+ * Finish an execution state for fetching/locking foreign rows
+ */
+ static void
+ finish_foreign_fetch_state(EState *estate, ExecRowMark *erm)
+ {
+ PgFdwFetchState *ffstate = (PgFdwFetchState *) erm->ermExtra;
+
+ /* If we created a prepared statement, destroy it */
+ if (ffstate->p_name)
+ {
+ char sql[64];
+ PGresult *res;
+
+ snprintf(sql, sizeof(sql), "DEALLOCATE %s", ffstate->p_name);
+
+ /*
+ * We don't use a PG_TRY block here, so be careful not to throw error
+ * without releasing the PGresult.
+ */
+ res = PQexec(ffstate->conn, sql);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ pgfdw_report_error(ERROR, res, ffstate->conn, true, sql);
+ PQclear(res);
+ ffstate->p_name = NULL;
+ }
+
+ /* Release remote connection */
+ ReleaseConnection(ffstate->conn);
+ ffstate->conn = NULL;
+ }
+
/*
* postgresAnalyzeForeignTable
* Test whether analyzing this foreign table is supported
***************
*** 5099,5106 **** postgresGetForeignJoinPaths(PlannerInfo *root,
int width;
Cost startup_cost;
Cost total_cost;
! Path *epq_path; /* Path to create plan to be executed when
! * EvalPlanQual gets triggered. */
/*
* Skip if this join combination has been considered already.
--- 5445,5452 ----
int width;
Cost startup_cost;
Cost total_cost;
! Path *epq_path = NULL; /* Path to create plan to be executed when
! * EvalPlanQual gets triggered. */
/*
* Skip if this join combination has been considered already.
***************
*** 5108,5113 **** postgresGetForeignJoinPaths(PlannerInfo *root,
--- 5454,5468 ----
if (joinrel->fdw_private)
return;
+ /*
+ * Don't allow pushing down joins to the remote if there is a possibility
+ * that EvalPlanQual will be executed.
+ */
+ if (root->parse->commandType == CMD_DELETE ||
+ root->parse->commandType == CMD_UPDATE ||
+ root->rowMarks)
+ return;
+
/*
* This code does not work for joins with lateral references, since those
* must have parameterized paths, which we don't generate yet.
***************
*** 5128,5157 **** postgresGetForeignJoinPaths(PlannerInfo *root,
/* attrs_used is only for base relations. */
fpinfo->attrs_used = NULL;
- /*
- * If there is a possibility that EvalPlanQual will be executed, we need
- * to be able to reconstruct the row using scans of the base relations.
- * GetExistingLocalJoinPath will find a suitable path for this purpose in
- * the path list of the joinrel, if one exists. We must be careful to
- * call it before adding any ForeignPath, since the ForeignPath might
- * dominate the only suitable local path available. We also do it before
- * calling foreign_join_ok(), since that function updates fpinfo and marks
- * it as pushable if the join is found to be pushable.
- */
- if (root->parse->commandType == CMD_DELETE ||
- root->parse->commandType == CMD_UPDATE ||
- root->rowMarks)
- {
- epq_path = GetExistingLocalJoinPath(joinrel);
- if (!epq_path)
- {
- elog(DEBUG3, "could not push down foreign join because a local path suitable for EPQ checks was not found");
- return;
- }
- }
- else
- epq_path = NULL;
-
if (!foreign_join_ok(root, joinrel, jointype, outerrel, innerrel, extra))
{
/* Free path required for EPQ if we copied one; we don't need it now */
--- 5483,5488 ----