On 2015/05/11 8:50, Tom Lane wrote:
> Etsuro Fujita <[email protected]> writes:
>> [ EvalPlanQual-v6.patch ]
>
> I've started to study this in a little more detail, and I'm not terribly
> happy with some of the API decisions in it.
Thanks for taking the time to review the patch!
> In particular, I find the addition of "void *fdw_state" to ExecRowMark
> to be pretty questionable. That does not seem like a good place to keep
> random state. (I realize that WHERE CURRENT OF keeps some state in
> ExecRowMark, but that's a crock not something to emulate.) ISTM that in
> most scenarios, the state that LockForeignRow/FetchForeignRow would like
> to get at is probably the FDW state associated with the ForeignScanState
> that the tuple came from. Which this API doesn't help with particularly.
> I wonder if we should instead add a "ScanState*" field and expect the
> core code to set that up (ExecOpenScanRelation could do it with minor
> API changes...).
Sorry, I don't understand clearly what you mean, but that (the idea of
expecting the core to set it up) sounds inconsistent with your comment
on the earlier version of the API "BeginForeignFetch" [1].
> I'm also a bit tempted to pass the TIDs to LockForeignRow and
> FetchForeignRow as Datums not ItemPointers. We have the Datum format
> available already at the call sites, so this is free as far as the core
> code is concerned, and would only cost another line or so for the FDWs.
> This is by no means sufficient to allow FDWs to use some other type than
> "tid" for row identifiers; but it would be a down payment on that problem,
> and at least would avoid nailing the rowids-are-tids assumption into yet
> another global API.
That is a good idea.
> Also, as I mentioned, I'd be a whole lot happier if we had a way to test
> this...
Attached is a postgres_fdw patch that I used for the testing. If you
try it, edit postgresGetForeignRowMarkType as necessary. I have to
confess that I did the testing only in the normal conditions by the patch.
Sorry for the delay. I took a vacation until yesterday.
Best regards,
Etsuro Fujita
[1] http://www.postgresql.org/message-id/[email protected]
*** a/contrib/postgres_fdw/postgres_fdw.c
--- b/contrib/postgres_fdw/postgres_fdw.c
***************
*** 88,93 **** typedef struct PgFdwRelationInfo
--- 88,95 ----
*
* 1) SELECT statement text to be sent to the remote server
* 2) Integer list of attribute numbers retrieved by the SELECT
+ * 3) SELECT statement text to be sent to the remote server
+ * 4) Integer list of attribute numbers retrieved by the SELECT
*
* These items are indexed with the enum FdwScanPrivateIndex, so an item
* can be fetched with list_nth(). For example, to get the SELECT statement:
***************
*** 98,104 **** enum FdwScanPrivateIndex
/* SQL statement to execute remotely (as a String node) */
FdwScanPrivateSelectSql,
/* Integer list of attribute numbers retrieved by the SELECT */
! FdwScanPrivateRetrievedAttrs
};
/*
--- 100,110 ----
/* SQL statement to execute remotely (as a String node) */
FdwScanPrivateSelectSql,
/* Integer list of attribute numbers retrieved by the SELECT */
! FdwScanPrivateRetrievedAttrs,
! /* SQL statement to execute remotely (as a String node) */
! FdwScanPrivateSelectSql2,
! /* Integer list of attribute numbers retrieved by SELECT */
! FdwScanPrivateRetrievedAttrs2
};
/*
***************
*** 186,191 **** typedef struct PgFdwModifyState
--- 192,223 ----
} PgFdwModifyState;
/*
+ * Execution state for fetching/locking foreign rows.
+ */
+ typedef struct PgFdwFetchState
+ {
+ Relation rel; /* relcache entry for the foreign table */
+ AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
+
+ /* for remote query execution */
+ PGconn *conn; /* connection for the fetch */
+ char *p_name; /* name of prepared statement, if created */
+
+ /* extracted fdw_private data */
+ char *query; /* text of SELECT command */
+ List *retrieved_attrs; /* attr numbers retrieved by SELECT */
+
+ /* info about parameters for prepared statement */
+ int p_nums; /* number of parameters to transmit */
+ FmgrInfo *p_flinfo; /* output conversion functions for them */
+
+ HeapTuple locked_tuple;
+
+ /* working memory context */
+ MemoryContext temp_cxt; /* context for per-tuple temporary data */
+ } PgFdwFetchState;
+
+ /*
* Workspace for analyzing a foreign table.
*/
typedef struct PgFdwAnalyzeState
***************
*** 276,281 **** static TupleTableSlot *postgresExecForeignDelete(EState *estate,
--- 308,320 ----
static void postgresEndForeignModify(EState *estate,
ResultRelInfo *resultRelInfo);
static int postgresIsForeignRelUpdatable(Relation rel);
+ static RowMarkType postgresGetForeignRowMarkType(LockClauseStrength strength);
+ static bool postgresLockForeignRow(EState *estate,
+ ExecRowMark *erm,
+ ItemPointer tupleid);
+ static HeapTuple postgresFetchForeignRow(EState *estate,
+ ExecRowMark *erm,
+ ItemPointer tupleid);
static void postgresExplainForeignScan(ForeignScanState *node,
ExplainState *es);
static void postgresExplainForeignModify(ModifyTableState *mtstate,
***************
*** 306,320 **** static void get_remote_estimate(const char *sql,
static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
EquivalenceClass *ec, EquivalenceMember *em,
void *arg);
static void create_cursor(ForeignScanState *node);
static void fetch_more_data(ForeignScanState *node);
static void close_cursor(PGconn *conn, unsigned int cursor_number);
! static void prepare_foreign_modify(PgFdwModifyState *fmstate);
! static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
! ItemPointer tupleid,
! TupleTableSlot *slot);
static void store_returning_result(PgFdwModifyState *fmstate,
TupleTableSlot *slot, PGresult *res);
static int postgresAcquireSampleRowsFunc(Relation relation, int elevel,
HeapTuple *rows, int targrows,
double *totalrows,
--- 345,370 ----
static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
EquivalenceClass *ec, EquivalenceMember *em,
void *arg);
+ static List *create_foreign_fetch_info(PlannerInfo *root,
+ RelOptInfo *baserel,
+ RowMarkType markType);
static void create_cursor(ForeignScanState *node);
static void fetch_more_data(ForeignScanState *node);
static void close_cursor(PGconn *conn, unsigned int cursor_number);
! static char *setup_prep_stmt(PGconn *conn, char *query);
! static const char **convert_prep_stmt_params(ItemPointer tupleid,
! TupleTableSlot *slot,
! int p_nums,
! FmgrInfo *p_flinfo,
! List *target_attrs,
! MemoryContext temp_context);
static void store_returning_result(PgFdwModifyState *fmstate,
TupleTableSlot *slot, PGresult *res);
+ static void init_foreign_fetch_state(EState *estate,
+ ExecRowMark *erm,
+ List *fdw_private,
+ int eflags);
+ static void finish_foreign_fetch_state(EState *estate, ExecRowMark *erm);
static int postgresAcquireSampleRowsFunc(Relation relation, int elevel,
HeapTuple *rows, int targrows,
double *totalrows,
***************
*** 358,363 **** postgres_fdw_handler(PG_FUNCTION_ARGS)
--- 408,418 ----
routine->EndForeignModify = postgresEndForeignModify;
routine->IsForeignRelUpdatable = postgresIsForeignRelUpdatable;
+ /* Functions for SELECT FOR UPDATE/SHARE row locking */
+ routine->GetForeignRowMarkType = postgresGetForeignRowMarkType;
+ routine->LockForeignRow = postgresLockForeignRow;
+ routine->FetchForeignRow = postgresFetchForeignRow;
+
/* Support functions for EXPLAIN */
routine->ExplainForeignScan = postgresExplainForeignScan;
routine->ExplainForeignModify = postgresExplainForeignModify;
***************
*** 746,751 **** postgresGetForeignPlan(PlannerInfo *root,
--- 801,807 ----
PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) baserel->fdw_private;
Index scan_relid = baserel->relid;
List *fdw_private;
+ List *fdw_private2 = NIL;
List *remote_conds = NIL;
List *local_exprs = NIL;
List *params_list = NIL;
***************
*** 836,855 **** postgresGetForeignPlan(PlannerInfo *root,
* complete information about, and (b) it wouldn't work anyway on
* older remote servers. Likewise, we don't worry about NOWAIT.
*/
! switch (rc->strength)
{
! case LCS_NONE:
! /* No locking needed */
! break;
! case LCS_FORKEYSHARE:
! case LCS_FORSHARE:
! appendStringInfoString(&sql, " FOR SHARE");
! break;
! case LCS_FORNOKEYUPDATE:
! case LCS_FORUPDATE:
! appendStringInfoString(&sql, " FOR UPDATE");
! break;
}
}
}
--- 892,917 ----
* complete information about, and (b) it wouldn't work anyway on
* older remote servers. Likewise, we don't worry about NOWAIT.
*/
! if (rc->markType == ROW_MARK_COPY)
{
! switch (rc->strength)
! {
! case LCS_NONE:
! /* No locking needed */
! break;
! case LCS_FORKEYSHARE:
! case LCS_FORSHARE:
! appendStringInfoString(&sql, " FOR SHARE");
! break;
! case LCS_FORNOKEYUPDATE:
! case LCS_FORUPDATE:
! appendStringInfoString(&sql, " FOR UPDATE");
! break;
! }
}
+ else
+ fdw_private2 = create_foreign_fetch_info(root, baserel,
+ rc->markType);
}
}
***************
*** 859,864 **** postgresGetForeignPlan(PlannerInfo *root,
--- 921,928 ----
*/
fdw_private = list_make2(makeString(sql.data),
retrieved_attrs);
+ if (fdw_private2)
+ fdw_private = list_concat(fdw_private, fdw_private2);
/*
* Create the ForeignScan node from target list, local filtering
***************
*** 886,891 **** postgresBeginForeignScan(ForeignScanState *node, int eflags)
--- 950,956 ----
EState *estate = node->ss.ps.state;
PgFdwScanState *fsstate;
RangeTblEntry *rte;
+ ExecRowMark *erm;
Oid userid;
ForeignTable *table;
ForeignServer *server;
***************
*** 986,991 **** postgresBeginForeignScan(ForeignScanState *node, int eflags)
--- 1051,1063 ----
fsstate->param_values = (const char **) palloc0(numParams * sizeof(char *));
else
fsstate->param_values = NULL;
+
+ /*
+ * Initialize state for fetching/locking foreign rows if needed.
+ */
+ erm = ExecFindRowMark(estate, fsplan->scan.scanrelid, true);
+ if (erm && erm->relation && erm->fdw_state == NULL)
+ init_foreign_fetch_state(estate, erm, fsplan->fdw_private, eflags);
}
/*
***************
*** 1093,1099 **** postgresReScanForeignScan(ForeignScanState *node)
--- 1165,1174 ----
static void
postgresEndForeignScan(ForeignScanState *node)
{
+ ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
+ EState *estate = node->ss.ps.state;
PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+ ExecRowMark *erm;
/* if fsstate is NULL, we are in EXPLAIN; nothing to do */
if (fsstate == NULL)
***************
*** 1107,1112 **** postgresEndForeignScan(ForeignScanState *node)
--- 1182,1194 ----
ReleaseConnection(fsstate->conn);
fsstate->conn = NULL;
+ /*
+ * Finish state for fetching/locking foreign rows if needed.
+ */
+ erm = ExecFindRowMark(estate, fsplan->scan.scanrelid, true);
+ if (erm && erm->relation && erm->fdw_state != NULL)
+ finish_foreign_fetch_state(estate, erm);
+
/* MemoryContexts will be deleted automatically. */
}
***************
*** 1391,1400 **** postgresExecForeignInsert(EState *estate,
/* Set up the prepared statement on the remote server, if we didn't yet */
if (!fmstate->p_name)
! prepare_foreign_modify(fmstate);
/* Convert parameters needed by prepared statement to text form */
! p_values = convert_prep_stmt_params(fmstate, NULL, slot);
/*
* Execute the prepared statement, and check for success.
--- 1473,1486 ----
/* Set up the prepared statement on the remote server, if we didn't yet */
if (!fmstate->p_name)
! fmstate->p_name = setup_prep_stmt(fmstate->conn, fmstate->query);
/* Convert parameters needed by prepared statement to text form */
! p_values = convert_prep_stmt_params(NULL, slot,
! fmstate->p_nums,
! fmstate->p_flinfo,
! fmstate->target_attrs,
! fmstate->temp_cxt);
/*
* Execute the prepared statement, and check for success.
***************
*** 1451,1457 **** postgresExecForeignUpdate(EState *estate,
/* Set up the prepared statement on the remote server, if we didn't yet */
if (!fmstate->p_name)
! prepare_foreign_modify(fmstate);
/* Get the ctid that was passed up as a resjunk column */
datum = ExecGetJunkAttribute(planSlot,
--- 1537,1543 ----
/* Set up the prepared statement on the remote server, if we didn't yet */
if (!fmstate->p_name)
! fmstate->p_name = setup_prep_stmt(fmstate->conn, fmstate->query);
/* Get the ctid that was passed up as a resjunk column */
datum = ExecGetJunkAttribute(planSlot,
***************
*** 1462,1470 **** postgresExecForeignUpdate(EState *estate,
elog(ERROR, "ctid is NULL");
/* Convert parameters needed by prepared statement to text form */
! p_values = convert_prep_stmt_params(fmstate,
! (ItemPointer) DatumGetPointer(datum),
! slot);
/*
* Execute the prepared statement, and check for success.
--- 1548,1559 ----
elog(ERROR, "ctid is NULL");
/* Convert parameters needed by prepared statement to text form */
! p_values = convert_prep_stmt_params((ItemPointer) DatumGetPointer(datum),
! slot,
! fmstate->p_nums,
! fmstate->p_flinfo,
! fmstate->target_attrs,
! fmstate->temp_cxt);
/*
* Execute the prepared statement, and check for success.
***************
*** 1521,1527 **** postgresExecForeignDelete(EState *estate,
/* Set up the prepared statement on the remote server, if we didn't yet */
if (!fmstate->p_name)
! prepare_foreign_modify(fmstate);
/* Get the ctid that was passed up as a resjunk column */
datum = ExecGetJunkAttribute(planSlot,
--- 1610,1616 ----
/* Set up the prepared statement on the remote server, if we didn't yet */
if (!fmstate->p_name)
! fmstate->p_name = setup_prep_stmt(fmstate->conn, fmstate->query);
/* Get the ctid that was passed up as a resjunk column */
datum = ExecGetJunkAttribute(planSlot,
***************
*** 1532,1540 **** postgresExecForeignDelete(EState *estate,
elog(ERROR, "ctid is NULL");
/* Convert parameters needed by prepared statement to text form */
! p_values = convert_prep_stmt_params(fmstate,
! (ItemPointer) DatumGetPointer(datum),
! NULL);
/*
* Execute the prepared statement, and check for success.
--- 1621,1632 ----
elog(ERROR, "ctid is NULL");
/* Convert parameters needed by prepared statement to text form */
! p_values = convert_prep_stmt_params((ItemPointer) DatumGetPointer(datum),
! NULL,
! fmstate->p_nums,
! fmstate->p_flinfo,
! fmstate->target_attrs,
! fmstate->temp_cxt);
/*
* Execute the prepared statement, and check for success.
***************
*** 1656,1661 **** postgresIsForeignRelUpdatable(Relation rel)
--- 1748,1927 ----
}
/*
+ * postgresGetForeignRowMarkType
+ * Get rowmark type that we use for a given LockClauseStrength value.
+ */
+ static RowMarkType
+ postgresGetForeignRowMarkType(LockClauseStrength strength)
+ {
+ /* return ROW_MARK_COPY; */
+ switch (strength)
+ {
+ case LCS_NONE:
+ return ROW_MARK_REFERENCE;
+ case LCS_FORKEYSHARE:
+ return ROW_MARK_KEYSHARE;
+ case LCS_FORSHARE:
+ return ROW_MARK_SHARE;
+ case LCS_FORNOKEYUPDATE:
+ return ROW_MARK_NOKEYEXCLUSIVE;
+ case LCS_FORUPDATE:
+ return ROW_MARK_EXCLUSIVE;
+ }
+ return ROW_MARK_COPY; /* shouldn't happen */
+ }
+
+ /*
+ * postgresLockForeignRow
+ * Lock one tuple in a foreign table
+ */
+ static bool
+ postgresLockForeignRow(EState *estate,
+ ExecRowMark *erm,
+ ItemPointer tupleid)
+ {
+ PgFdwFetchState *ffstate = (PgFdwFetchState *) erm->fdw_state;
+ const char **p_values;
+ PGresult *res;
+ HeapTuple tuple;
+
+ ffstate->locked_tuple = NULL;
+
+ /* Set up the prepared statement on the remote server, if we didn't yet */
+ if (!ffstate->p_name)
+ ffstate->p_name = setup_prep_stmt(ffstate->conn, ffstate->query);
+
+ /* Convert parameters needed by prepared statement to text form */
+ p_values = convert_prep_stmt_params(tupleid, NULL,
+ ffstate->p_nums,
+ ffstate->p_flinfo,
+ NIL,
+ ffstate->temp_cxt);
+
+ /*
+ * Execute the prepared statement, and check for success.
+ *
+ * We don't use a PG_TRY block here, so be careful not to throw error
+ * without releasing the PGresult.
+ */
+ res = PQexecPrepared(ffstate->conn,
+ ffstate->p_name,
+ ffstate->p_nums,
+ p_values,
+ NULL,
+ NULL,
+ 0);
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ pgfdw_report_error(ERROR, res, ffstate->conn, true, ffstate->query);
+
+ /* PGresult must be released before leaving this function. */
+ PG_TRY();
+ {
+ /* Create the tuple */
+ tuple = make_tuple_from_result_row(res, 0,
+ ffstate->rel,
+ ffstate->attinmeta,
+ ffstate->retrieved_attrs,
+ ffstate->temp_cxt);
+ tuple->t_self = *tupleid;
+ tuple->t_tableOid = erm->relid;
+
+ PQclear(res);
+ res = NULL;
+ }
+ PG_CATCH();
+ {
+ if (res)
+ PQclear(res);
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+
+ MemoryContextReset(ffstate->temp_cxt);
+
+ /* Remember locked tuple for later processing */
+ ffstate->locked_tuple = tuple;
+
+ /* Got the lock successfully */
+ return true;
+ }
+
+ /*
+ * postgresFetchForeignRow
+ * Fetch one tuple from a foreign table
+ */
+ static HeapTuple
+ postgresFetchForeignRow(EState *estate,
+ ExecRowMark *erm,
+ ItemPointer tupleid)
+ {
+ PgFdwFetchState *ffstate = (PgFdwFetchState *) erm->fdw_state;
+ const char **p_values;
+ PGresult *res;
+ HeapTuple tuple;
+
+ if (RowMarkRequiresRowShareLock(erm->markType))
+ {
+ Assert(ffstate->locked_tuple);
+ return ffstate->locked_tuple;
+ }
+
+ /* Set up the prepared statement on the remote server, if we didn't yet */
+ if (!ffstate->p_name)
+ ffstate->p_name = setup_prep_stmt(ffstate->conn, ffstate->query);
+
+ /* Convert parameters needed by prepared statement to text form */
+ p_values = convert_prep_stmt_params(tupleid, NULL,
+ ffstate->p_nums,
+ ffstate->p_flinfo,
+ NIL,
+ ffstate->temp_cxt);
+
+ /*
+ * Execute the prepared statement, and check for success.
+ *
+ * We don't use a PG_TRY block here, so be careful not to throw error
+ * without releasing the PGresult.
+ */
+ res = PQexecPrepared(ffstate->conn,
+ ffstate->p_name,
+ ffstate->p_nums,
+ p_values,
+ NULL,
+ NULL,
+ 0);
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ pgfdw_report_error(ERROR, res, ffstate->conn, true, ffstate->query);
+
+ /* PGresult must be released before leaving this function. */
+ PG_TRY();
+ {
+ /* Create the tuple */
+ tuple = make_tuple_from_result_row(res, 0,
+ ffstate->rel,
+ ffstate->attinmeta,
+ ffstate->retrieved_attrs,
+ ffstate->temp_cxt);
+ tuple->t_self = *tupleid;
+ tuple->t_tableOid = erm->relid;
+
+ PQclear(res);
+ res = NULL;
+ }
+ PG_CATCH();
+ {
+ if (res)
+ PQclear(res);
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+
+ MemoryContextReset(ffstate->temp_cxt);
+
+ return tuple;
+ }
+
+ /*
* postgresExplainForeignScan
* Produce extra output for EXPLAIN of a ForeignScan on a foreign table
*/
***************
*** 1918,1923 **** ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
--- 2184,2232 ----
}
/*
+ * Create the FDW-private information for fetching/locking foreign rows.
+ */
+ static List *
+ create_foreign_fetch_info(PlannerInfo *root,
+ RelOptInfo *baserel,
+ RowMarkType markType)
+ {
+ StringInfoData sql;
+ List *retrieved_attrs;
+ Bitmapset *attrs_used = NULL;
+
+ /*
+ * Build the query string to be sent for execution.
+ */
+ initStringInfo(&sql);
+ /* Add a whole-row var to attrs_used to retrieve all the columns. */
+ attrs_used = bms_add_member(attrs_used,
+ 0 - FirstLowInvalidHeapAttributeNumber);
+ deparseSelectSql(&sql, root, baserel, attrs_used, &retrieved_attrs);
+ appendStringInfoString(&sql, " WHERE ctid = $1");
+
+ switch (markType)
+ {
+ case ROW_MARK_EXCLUSIVE:
+ case ROW_MARK_NOKEYEXCLUSIVE:
+ appendStringInfoString(&sql, " FOR UPDATE");
+ break;
+ case ROW_MARK_SHARE:
+ case ROW_MARK_KEYSHARE:
+ appendStringInfoString(&sql, " FOR SHARE");
+ break;
+ default:
+ break;
+ }
+
+ /*
+ * Build the fdw_private list that will be available to the executor.
+ * Items in the list must match enum FdwFetchPrivateIndex, above.
+ */
+ return list_make2(makeString(sql.data), retrieved_attrs);
+ }
+
+ /*
* Create cursor for node's query with current parameter values.
*/
static void
***************
*** 2154,2164 **** close_cursor(PGconn *conn, unsigned int cursor_number)
}
/*
! * prepare_foreign_modify
* Establish a prepared statement for execution of INSERT/UPDATE/DELETE
*/
! static void
! prepare_foreign_modify(PgFdwModifyState *fmstate)
{
char prep_name[NAMEDATALEN];
char *p_name;
--- 2463,2474 ----
}
/*
! * setup_prep_stmt
* Establish a prepared statement for execution of INSERT/UPDATE/DELETE
+ * or re-fetching tuples for EvalPlanQual rechecking
*/
! static char *
! setup_prep_stmt(PGconn *conn, char *query)
{
char prep_name[NAMEDATALEN];
char *p_name;
***************
*** 2166,2172 **** prepare_foreign_modify(PgFdwModifyState *fmstate)
/* Construct name we'll use for the prepared statement. */
snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u",
! GetPrepStmtNumber(fmstate->conn));
p_name = pstrdup(prep_name);
/*
--- 2476,2482 ----
/* Construct name we'll use for the prepared statement. */
snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u",
! GetPrepStmtNumber(conn));
p_name = pstrdup(prep_name);
/*
***************
*** 2179,2196 **** prepare_foreign_modify(PgFdwModifyState *fmstate)
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
! res = PQprepare(fmstate->conn,
! p_name,
! fmstate->query,
! 0,
! NULL);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
! pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
PQclear(res);
/* This action shows that the prepare has been done. */
! fmstate->p_name = p_name;
}
/*
--- 2489,2502 ----
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
! res = PQprepare(conn, p_name, query, 0, NULL);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
! pgfdw_report_error(ERROR, res, conn, true, query);
PQclear(res);
/* This action shows that the prepare has been done. */
! return p_name;
}
/*
***************
*** 2203,2238 **** prepare_foreign_modify(PgFdwModifyState *fmstate)
* Data is constructed in temp_cxt; caller should reset that after use.
*/
static const char **
! convert_prep_stmt_params(PgFdwModifyState *fmstate,
! ItemPointer tupleid,
! TupleTableSlot *slot)
{
const char **p_values;
int pindex = 0;
MemoryContext oldcontext;
! oldcontext = MemoryContextSwitchTo(fmstate->temp_cxt);
! p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums);
/* 1st parameter should be ctid, if it's in use */
if (tupleid != NULL)
{
/* don't need set_transmission_modes for TID output */
! p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
PointerGetDatum(tupleid));
pindex++;
}
/* get following parameters from slot */
! if (slot != NULL && fmstate->target_attrs != NIL)
{
int nestlevel;
ListCell *lc;
nestlevel = set_transmission_modes();
! foreach(lc, fmstate->target_attrs)
{
int attnum = lfirst_int(lc);
Datum value;
--- 2509,2547 ----
* Data is constructed in temp_cxt; caller should reset that after use.
*/
static const char **
! convert_prep_stmt_params(ItemPointer tupleid,
! TupleTableSlot *slot,
! int p_nums,
! FmgrInfo *p_flinfo,
! List *target_attrs,
! MemoryContext temp_context)
{
const char **p_values;
int pindex = 0;
MemoryContext oldcontext;
! oldcontext = MemoryContextSwitchTo(temp_context);
! p_values = (const char **) palloc(sizeof(char *) * p_nums);
/* 1st parameter should be ctid, if it's in use */
if (tupleid != NULL)
{
/* don't need set_transmission_modes for TID output */
! p_values[pindex] = OutputFunctionCall(&p_flinfo[pindex],
PointerGetDatum(tupleid));
pindex++;
}
/* get following parameters from slot */
! if (slot != NULL && target_attrs != NIL)
{
int nestlevel;
ListCell *lc;
nestlevel = set_transmission_modes();
! foreach(lc, target_attrs)
{
int attnum = lfirst_int(lc);
Datum value;
***************
*** 2242,2248 **** convert_prep_stmt_params(PgFdwModifyState *fmstate,
if (isnull)
p_values[pindex] = NULL;
else
! p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
value);
pindex++;
}
--- 2551,2557 ----
if (isnull)
p_values[pindex] = NULL;
else
! p_values[pindex] = OutputFunctionCall(&p_flinfo[pindex],
value);
pindex++;
}
***************
*** 2250,2256 **** convert_prep_stmt_params(PgFdwModifyState *fmstate,
reset_transmission_modes(nestlevel);
}
! Assert(pindex == fmstate->p_nums);
MemoryContextSwitchTo(oldcontext);
--- 2559,2565 ----
reset_transmission_modes(nestlevel);
}
! Assert(pindex == p_nums);
MemoryContextSwitchTo(oldcontext);
***************
*** 2290,2295 **** store_returning_result(PgFdwModifyState *fmstate,
--- 2599,2705 ----
}
/*
+ * init_foreign_fetch_state
+ * Initialize an execution state for fetching/locking foreign rows
+ */
+ static void
+ init_foreign_fetch_state(EState *estate,
+ ExecRowMark *erm,
+ List *fdw_private,
+ int eflags)
+ {
+ PgFdwFetchState *ffstate;
+ Relation rel = erm->relation;
+ RangeTblEntry *rte;
+ Oid userid;
+ ForeignTable *table;
+ ForeignServer *server;
+ UserMapping *user;
+ Oid typefnoid;
+ bool isvarlena;
+
+ /* Begin constructing PgFdwFetchState. */
+ ffstate = (PgFdwFetchState *) palloc0(sizeof(PgFdwFetchState));
+ ffstate->rel = rel;
+
+ /*
+ * Identify which user to do the remote access as. This should match what
+ * ExecCheckRTEPerms() does.
+ */
+ rte = rt_fetch(erm->rti, estate->es_range_table);
+ userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
+
+ /* Get info about foreign table. */
+ table = GetForeignTable(RelationGetRelid(rel));
+ server = GetForeignServer(table->serverid);
+ user = GetUserMapping(userid, server->serverid);
+
+ /* Open connection; report that we'll create a prepared statement. */
+ ffstate->conn = GetConnection(server, user, true);
+ ffstate->p_name = NULL; /* prepared statement not made yet */
+
+ /* Deconstruct fdw_private data. */
+ ffstate->query = strVal(list_nth(fdw_private,
+ FdwScanPrivateSelectSql2));
+ ffstate->retrieved_attrs = (List *) list_nth(fdw_private,
+ FdwScanPrivateRetrievedAttrs2);
+
+ /* Create context for per-tuple temp workspace. */
+ ffstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
+ "postgres_fdw temporary data",
+ ALLOCSET_SMALL_MINSIZE,
+ ALLOCSET_SMALL_INITSIZE,
+ ALLOCSET_SMALL_MAXSIZE);
+
+ /* Prepare for input conversion of SELECT results. */
+ ffstate->attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(rel));
+
+ /* Prepare for output conversion of parameters used in prepared stmt. */
+ ffstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo));
+ ffstate->p_nums = 0;
+
+ /* Only one transmittable parameter will be ctid */
+ getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena);
+ fmgr_info(typefnoid, &ffstate->p_flinfo[ffstate->p_nums]);
+ ffstate->p_nums++;
+
+ erm->fdw_state = ffstate;
+ }
+
+ /*
+ * finish_foreign_fetch_state
+ * Finish an execution state for fetching/locking foreign rows
+ */
+ static void
+ finish_foreign_fetch_state(EState *estate, ExecRowMark *erm)
+ {
+ PgFdwFetchState *ffstate = (PgFdwFetchState *) erm->fdw_state;
+
+ /* If we created a prepared statement, destroy it */
+ if (ffstate->p_name)
+ {
+ char sql[64];
+ PGresult *res;
+
+ snprintf(sql, sizeof(sql), "DEALLOCATE %s", ffstate->p_name);
+
+ /*
+ * We don't use a PG_TRY block here, so be careful not to throw error
+ * without releasing the PGresult.
+ */
+ res = PQexec(ffstate->conn, sql);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ pgfdw_report_error(ERROR, res, ffstate->conn, true, sql);
+ PQclear(res);
+ ffstate->p_name = NULL;
+ }
+
+ /* Release remote connection */
+ ReleaseConnection(ffstate->conn);
+ ffstate->conn = NULL;
+ }
+
+ /*
* postgresAnalyzeForeignTable
* Test whether analyzing this foreign table is supported
*/
--
Sent via pgsql-hackers mailing list ([email protected])
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers