On 2016/03/29 15:37, Etsuro Fujita wrote:
I added two helper functions: GetFdwScanTupleExtraData and FillFdwScanTupleSysAttrs. The FDW author could use the former to get info about system attributes other than ctids and oids in fdw_scan_tlist during BeginForeignScan, and the latter to set values for these system attributes during IterateForeignScan (InvalidTransactionId for xmins/xmaxs, InvalidCommandId for cmins/cmaxs, and valid values for tableoids). Attached is a proposed patch for that. I also slightly simplified the changes to make_tuple_from_result_row and conversion_error_callback made by the postgres_fdw join pushdown patch. What do you think about that?
I revised comments a little bit. Attached is an updated version of the patch. I think this issue should be fixed in advance of the PostgreSQL 9.6beta1 release.
Best regards, Etsuro Fujita
*** a/contrib/postgres_fdw/deparse.c --- b/contrib/postgres_fdw/deparse.c *************** *** 1091,1130 **** get_jointype_name(JoinType jointype) * * tlist is list of TargetEntry's which in turn contain Var nodes. * ! * retrieved_attrs is the list of continuously increasing integers starting ! * from 1. It has same number of entries as tlist. */ static void deparseExplicitTargetList(List *tlist, List **retrieved_attrs, deparse_expr_cxt *context) { - ListCell *lc; StringInfo buf = context->buf; ! int i = 0; *retrieved_attrs = NIL; foreach(lc, tlist) { TargetEntry *tle = (TargetEntry *) lfirst(lc); Var *var; - /* Extract expression if TargetEntry node */ Assert(IsA(tle, TargetEntry)); var = (Var *) tle->expr; /* We expect only Var nodes here */ Assert(IsA(var, Var)); ! if (i > 0) ! appendStringInfoString(buf, ", "); ! deparseVar(var, context); ! *retrieved_attrs = lappend_int(*retrieved_attrs, i + 1); i++; } ! if (i == 0) appendStringInfoString(buf, "NULL"); } --- 1091,1142 ---- * * tlist is list of TargetEntry's which in turn contain Var nodes. * ! * System columns other than ctid and oid are not retrieved from the remote ! * server, since we set values for such columns locally. ! * ! * We create an integer List of the columns being retrieved, which is returned ! * to *retrieved_attrs. */ static void deparseExplicitTargetList(List *tlist, List **retrieved_attrs, deparse_expr_cxt *context) { StringInfo buf = context->buf; ! ListCell *lc; ! bool first; ! int i; *retrieved_attrs = NIL; + i = 1; + first = true; foreach(lc, tlist) { TargetEntry *tle = (TargetEntry *) lfirst(lc); Var *var; Assert(IsA(tle, TargetEntry)); var = (Var *) tle->expr; /* We expect only Var nodes here */ Assert(IsA(var, Var)); ! if (var->varattno >= 0 || ! var->varattno == SelfItemPointerAttributeNumber || ! var->varattno == ObjectIdAttributeNumber) ! { ! if (!first) ! appendStringInfoString(buf, ", "); ! first = false; ! deparseVar(var, context); ! ! *retrieved_attrs = lappend_int(*retrieved_attrs, i); ! } i++; } ! if (first) appendStringInfoString(buf, "NULL"); } *** a/contrib/postgres_fdw/expected/postgres_fdw.out --- b/contrib/postgres_fdw/expected/postgres_fdw.out *************** *** 1923,1928 **** SELECT t1."C 1" FROM "S 1"."T 1" t1, LATERAL (SELECT DISTINCT t2.c1, t3.c1 FROM --- 1923,1956 ---- 1 (10 rows) + -- System columns, except ctid and oid, should not be retrieved from remote + EXPLAIN (COSTS false, VERBOSE) + SELECT t1.tableoid::regclass, t1.c1, t2.tableoid::regclass, t2.c1 FROM ft1 t1 JOIN ft2 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c3, t1.c1 OFFSET 100 LIMIT 10; + QUERY PLAN + ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Limit + Output: ((t1.tableoid)::regclass), t1.c1, ((t2.tableoid)::regclass), t2.c1, t1.c3 + -> Foreign Scan + Output: (t1.tableoid)::regclass, t1.c1, (t2.tableoid)::regclass, t2.c1, t1.c3 + Relations: (public.ft1 t1) INNER JOIN (public.ft2 t2) + Remote SQL: SELECT r1."C 1", r1.c3, r2."C 1" FROM ("S 1"."T 1" r1 INNER JOIN "S 1"."T 1" r2 ON (TRUE)) WHERE ((r1."C 1" = r2."C 1")) ORDER BY r1.c3 ASC NULLS LAST, r1."C 1" ASC NULLS LAST + (6 rows) + + SELECT t1.tableoid::regclass, t1.c1, t2.tableoid::regclass, t2.c1 FROM ft1 t1 JOIN ft2 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c3, t1.c1 OFFSET 100 LIMIT 10; + tableoid | c1 | tableoid | c1 + ----------+-----+----------+----- + ft1 | 101 | ft2 | 101 + ft1 | 102 | ft2 | 102 + ft1 | 103 | ft2 | 103 + ft1 | 104 | ft2 | 104 + ft1 | 105 | ft2 | 105 + ft1 | 106 | ft2 | 106 + ft1 | 107 | ft2 | 107 + ft1 | 108 | ft2 | 108 + ft1 | 109 | ft2 | 109 + ft1 | 110 | ft2 | 110 + (10 rows) + -- create another user for permission, user mapping, effective user tests CREATE USER view_owner; -- grant privileges on ft4 and ft5 to view_owner *** a/contrib/postgres_fdw/postgres_fdw.c --- b/contrib/postgres_fdw/postgres_fdw.c *************** *** 127,134 **** typedef struct PgFdwScanState { Relation rel; /* relcache entry for the foreign table. NULL * for a foreign join scan. */ - TupleDesc tupdesc; /* tuple descriptor of scan */ AttInMetadata *attinmeta; /* attribute datatype conversion metadata */ /* extracted fdw_private data */ char *query; /* text of SELECT command */ --- 127,134 ---- { Relation rel; /* relcache entry for the foreign table. NULL * for a foreign join scan. */ AttInMetadata *attinmeta; /* attribute datatype conversion metadata */ + FdwScanTupleExtraData *extra; /* info about result tup, if foreign join */ /* extracted fdw_private data */ char *query; /* text of SELECT command */ *************** *** 246,260 **** typedef struct PgFdwAnalyzeState typedef struct ConversionLocation { Relation rel; /* foreign table's relcache entry. */ AttrNumber cur_attno; /* attribute number being processed, or 0 */ - - /* - * In case of foreign join push down, fdw_scan_tlist is used to identify - * the Var node corresponding to the error location and - * fsstate->ss.ps.state gives access to the RTEs of corresponding relation - * to get the relation name and attribute name. - */ - ForeignScanState *fsstate; } ConversionLocation; /* Callback argument for ec_member_matches_foreign */ --- 246,253 ---- typedef struct ConversionLocation { Relation rel; /* foreign table's relcache entry. */ + FdwScanTupleExtraData *extra; /* info about result tup, if foreign join */ AttrNumber cur_attno; /* attribute number being processed, or 0 */ } ConversionLocation; /* Callback argument for ec_member_matches_foreign */ *************** *** 395,402 **** static HeapTuple make_tuple_from_result_row(PGresult *res, int row, Relation rel, AttInMetadata *attinmeta, List *retrieved_attrs, - ForeignScanState *fsstate, MemoryContext temp_context); static void conversion_error_callback(void *arg); static bool foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel, --- 388,395 ---- int row, Relation rel, AttInMetadata *attinmeta, + FdwScanTupleExtraData *extra, List *retrieved_attrs, MemoryContext temp_context); static void conversion_error_callback(void *arg); static bool foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel, *************** *** 1235,1240 **** postgresBeginForeignScan(ForeignScanState *node, int eflags) --- 1228,1234 ---- EState *estate = node->ss.ps.state; PgFdwScanState *fsstate; UserMapping *user; + TupleDesc tupdesc; int numParams; /* *************** *** 1316,1326 **** postgresBeginForeignScan(ForeignScanState *node, int eflags) * into local representation and error reporting during that process. */ if (fsplan->scan.scanrelid > 0) ! fsstate->tupdesc = RelationGetDescr(fsstate->rel); else ! fsstate->tupdesc = node->ss.ss_ScanTupleSlot->tts_tupleDescriptor; ! fsstate->attinmeta = TupleDescGetAttInMetadata(fsstate->tupdesc); /* * Prepare for processing of parameters used in remote query, if any. --- 1310,1327 ---- * into local representation and error reporting during that process. */ if (fsplan->scan.scanrelid > 0) ! tupdesc = RelationGetDescr(fsstate->rel); else ! tupdesc = node->ss.ss_ScanTupleSlot->tts_tupleDescriptor; ! fsstate->attinmeta = TupleDescGetAttInMetadata(tupdesc); ! ! /* Create extra info for making scan tuples, if foreign join */ ! if (fsplan->scan.scanrelid > 0) ! fsstate->extra = NULL; ! else ! fsstate->extra = GetFdwScanTupleExtraData(fsplan->fdw_scan_tlist, ! estate->es_range_table); /* * Prepare for processing of parameters used in remote query, if any. *************** *** 2887,2894 **** fetch_more_data(ForeignScanState *node) make_tuple_from_result_row(res, i, fsstate->rel, fsstate->attinmeta, fsstate->retrieved_attrs, - node, fsstate->temp_cxt); } --- 2888,2895 ---- make_tuple_from_result_row(res, i, fsstate->rel, fsstate->attinmeta, + fsstate->extra, fsstate->retrieved_attrs, fsstate->temp_cxt); } *************** *** 3106,3113 **** store_returning_result(PgFdwModifyState *fmstate, newtup = make_tuple_from_result_row(res, 0, fmstate->rel, fmstate->attinmeta, - fmstate->retrieved_attrs, NULL, fmstate->temp_cxt); /* tuple will be deleted when it is cleared from the slot */ ExecStoreTuple(newtup, slot, InvalidBuffer, true); --- 3107,3114 ---- newtup = make_tuple_from_result_row(res, 0, fmstate->rel, fmstate->attinmeta, NULL, + fmstate->retrieved_attrs, fmstate->temp_cxt); /* tuple will be deleted when it is cleared from the slot */ ExecStoreTuple(newtup, slot, InvalidBuffer, true); *************** *** 3207,3214 **** get_returning_data(ForeignScanState *node) dmstate->next_tuple, dmstate->rel, dmstate->attinmeta, - dmstate->retrieved_attrs, NULL, dmstate->temp_cxt); ExecStoreTuple(newtup, slot, InvalidBuffer, false); } --- 3208,3215 ---- dmstate->next_tuple, dmstate->rel, dmstate->attinmeta, NULL, + dmstate->retrieved_attrs, dmstate->temp_cxt); ExecStoreTuple(newtup, slot, InvalidBuffer, false); } *************** *** 3609,3616 **** analyze_row_processor(PGresult *res, int row, PgFdwAnalyzeState *astate) astate->rows[pos] = make_tuple_from_result_row(res, row, astate->rel, astate->attinmeta, - astate->retrieved_attrs, NULL, astate->temp_cxt); MemoryContextSwitchTo(oldcontext); --- 3610,3617 ---- astate->rows[pos] = make_tuple_from_result_row(res, row, astate->rel, astate->attinmeta, NULL, + astate->retrieved_attrs, astate->temp_cxt); MemoryContextSwitchTo(oldcontext); *************** *** 4289,4296 **** make_tuple_from_result_row(PGresult *res, int row, Relation rel, AttInMetadata *attinmeta, List *retrieved_attrs, - ForeignScanState *fsstate, MemoryContext temp_context) { HeapTuple tuple; --- 4290,4297 ---- int row, Relation rel, AttInMetadata *attinmeta, + FdwScanTupleExtraData *extra, List *retrieved_attrs, MemoryContext temp_context) { HeapTuple tuple; *************** *** 4317,4327 **** make_tuple_from_result_row(PGresult *res, tupdesc = RelationGetDescr(rel); else { ! PgFdwScanState *fdw_sstate; ! ! Assert(fsstate); ! fdw_sstate = (PgFdwScanState *) fsstate->fdw_state; ! tupdesc = fdw_sstate->tupdesc; } values = (Datum *) palloc0(tupdesc->natts * sizeof(Datum)); --- 4318,4325 ---- tupdesc = RelationGetDescr(rel); else { ! Assert(extra); ! tupdesc = attinmeta->tupdesc; } values = (Datum *) palloc0(tupdesc->natts * sizeof(Datum)); *************** *** 4333,4340 **** make_tuple_from_result_row(PGresult *res, * Set up and install callback to report where conversion error occurs. */ errpos.rel = rel; errpos.cur_attno = 0; - errpos.fsstate = fsstate; errcallback.callback = conversion_error_callback; errcallback.arg = (void *) &errpos; errcallback.previous = error_context_stack; --- 4331,4338 ---- * Set up and install callback to report where conversion error occurs. */ errpos.rel = rel; + errpos.extra = extra; errpos.cur_attno = 0; errcallback.callback = conversion_error_callback; errcallback.arg = (void *) &errpos; errcallback.previous = error_context_stack; *************** *** 4399,4404 **** make_tuple_from_result_row(PGresult *res, --- 4397,4408 ---- */ MemoryContextSwitchTo(oldcontext); + /* + * Set values for xmins/xmaxs, cmins/cmaxs, and tableoids, if foreign join. + */ + if (extra && extra->has_sys_attrs) + FillFdwScanTupleSysAttrs(extra, values, nulls); + tuple = heap_form_tuple(tupdesc, values, nulls); /* *************** *** 4442,4462 **** conversion_error_callback(void *arg) else { /* error occurred in a scan against a foreign join */ - ForeignScanState *fsstate = errpos->fsstate; - ForeignScan *fsplan = (ForeignScan *) fsstate->ss.ps.plan; - EState *estate = fsstate->ss.ps.state; TargetEntry *tle; Var *var; RangeTblEntry *rte; ! Assert(IsA(fsplan, ForeignScan)); ! tle = (TargetEntry *) list_nth(fsplan->fdw_scan_tlist, errpos->cur_attno - 1); Assert(IsA(tle, TargetEntry)); var = (Var *) tle->expr; Assert(IsA(var, Var)); ! rte = rt_fetch(var->varno, estate->es_range_table); relname = get_rel_name(rte->relid); attname = get_relid_attribute_name(rte->relid, var->varattno); } --- 4446,4463 ---- else { /* error occurred in a scan against a foreign join */ TargetEntry *tle; Var *var; RangeTblEntry *rte; ! Assert(errpos->cur_attno > 0); ! tle = (TargetEntry *) list_nth(errpos->extra->fdw_scan_tlist, errpos->cur_attno - 1); Assert(IsA(tle, TargetEntry)); var = (Var *) tle->expr; Assert(IsA(var, Var)); ! rte = rt_fetch(var->varno, errpos->extra->range_table); relname = get_rel_name(rte->relid); attname = get_relid_attribute_name(rte->relid, var->varattno); } *** a/contrib/postgres_fdw/sql/postgres_fdw.sql --- b/contrib/postgres_fdw/sql/postgres_fdw.sql *************** *** 466,471 **** SELECT t1c1, avg(t1c1 + t2c1) FROM (SELECT t1.c1, t2.c1 FROM ft1 t1 JOIN ft2 t2 --- 466,475 ---- EXPLAIN (COSTS false, VERBOSE) SELECT t1."C 1" FROM "S 1"."T 1" t1, LATERAL (SELECT DISTINCT t2.c1, t3.c1 FROM ft1 t2, ft2 t3 WHERE t2.c1 = t3.c1 AND t2.c2 = t1.c2) q ORDER BY t1."C 1" OFFSET 10 LIMIT 10; SELECT t1."C 1" FROM "S 1"."T 1" t1, LATERAL (SELECT DISTINCT t2.c1, t3.c1 FROM ft1 t2, ft2 t3 WHERE t2.c1 = t3.c1 AND t2.c2 = t1.c2) q ORDER BY t1."C 1" OFFSET 10 LIMIT 10; + -- System columns, except ctid and oid, should not be retrieved from remote + EXPLAIN (COSTS false, VERBOSE) + SELECT t1.tableoid::regclass, t1.c1, t2.tableoid::regclass, t2.c1 FROM ft1 t1 JOIN ft2 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c3, t1.c1 OFFSET 100 LIMIT 10; + SELECT t1.tableoid::regclass, t1.c1, t2.tableoid::regclass, t2.c1 FROM ft1 t1 JOIN ft2 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c3, t1.c1 OFFSET 100 LIMIT 10; -- create another user for permission, user mapping, effective user tests CREATE USER view_owner; *** a/src/backend/foreign/foreign.c --- b/src/backend/foreign/foreign.c *************** *** 14,19 **** --- 14,20 ---- #include "access/htup_details.h" #include "access/reloptions.h" + #include "access/sysattr.h" #include "catalog/pg_foreign_data_wrapper.h" #include "catalog/pg_foreign_server.h" #include "catalog/pg_foreign_table.h" *************** *** 22,27 **** --- 23,29 ---- #include "foreign/foreign.h" #include "lib/stringinfo.h" #include "miscadmin.h" + #include "parser/parsetree.h" #include "utils/builtins.h" #include "utils/memutils.h" #include "utils/rel.h" *************** *** 909,911 **** GetExistingLocalJoinPath(RelOptInfo *joinrel) --- 911,1048 ---- } return NULL; } + + /* + * Get extra information for making scan tuples described by fdw_scan_tlist + * + * Note: we assume that an expression in each TargetEntry in fdw_scan_tlist + * is a simple Var. + */ + FdwScanTupleExtraData * + GetFdwScanTupleExtraData(List *fdw_scan_tlist, List *range_table) + { + FdwScanTupleExtraData *extra; + List *xmin_attrs = NIL; + List *cmin_attrs = NIL; + List *xmax_attrs = NIL; + List *cmax_attrs = NIL; + List *tableoid_attrs = NIL; + List *tableoid_values = NIL; + RangeTblEntry *rte; + ListCell *lc; + int i; + + i = 1; + foreach(lc, fdw_scan_tlist) + { + TargetEntry *tle = (TargetEntry *) lfirst(lc); + Var *var = (Var *) tle->expr; + + if (!IsA(var, Var)) + elog(ERROR, "unexpected node type in fdw_scan_tlist target entry: %d", + (int) nodeTag(var)); + + if (var->varattno < 0) + { + switch (var->varattno) + { + case SelfItemPointerAttributeNumber: + case ObjectIdAttributeNumber: + break; + case MinTransactionIdAttributeNumber: + xmin_attrs = lappend_int(xmin_attrs, i); + break; + case MinCommandIdAttributeNumber: + cmin_attrs = lappend_int(cmin_attrs, i); + break; + case MaxTransactionIdAttributeNumber: + xmax_attrs = lappend_int(xmax_attrs, i); + break; + case MaxCommandIdAttributeNumber: + cmax_attrs = lappend_int(cmax_attrs, i); + break; + case TableOidAttributeNumber: + rte = rt_fetch(var->varno, range_table); + tableoid_attrs = lappend_int(tableoid_attrs, i); + tableoid_values = lappend_oid(tableoid_values, rte->relid); + break; + default: + elog(ERROR, "invalid attnum: %d", var->varattno); + break; + } + } + + i++; + } + + extra = (FdwScanTupleExtraData *) palloc0(sizeof(FdwScanTupleExtraData)); + extra->fdw_scan_tlist = fdw_scan_tlist; + if (xmin_attrs || cmin_attrs || xmax_attrs || cmax_attrs || tableoid_attrs) + extra->has_sys_attrs = true; + else + extra->has_sys_attrs = false; + extra->xmin_attrs = xmin_attrs; + extra->cmin_attrs = cmin_attrs; + extra->xmax_attrs = xmax_attrs; + extra->cmax_attrs = cmax_attrs; + extra->tableoid_attrs = tableoid_attrs; + extra->tableoid_values = tableoid_values; + extra->range_table = range_table; + + return extra; + } + + /* + * Set values for xmins/xmaxs, cmins/cmaxs, and tableoids in a given tuple. + */ + void + FillFdwScanTupleSysAttrs(FdwScanTupleExtraData *extra, + Datum *values, bool *nulls) + { + ListCell *lc; + ListCell *lc2; + + Assert(extra); + + /* Fill xmins/xmaxs with InvalidTransactionId */ + foreach(lc, extra->xmin_attrs) + { + int i = lfirst_int(lc); + + nulls[i - 1] = false; + values[i - 1] = TransactionIdGetDatum(InvalidTransactionId); + } + foreach(lc, extra->xmax_attrs) + { + int i = lfirst_int(lc); + + nulls[i - 1] = false; + values[i - 1] = TransactionIdGetDatum(InvalidTransactionId); + } + + /* Fill cmins/cmaxs with InvalidCommandId */ + foreach(lc, extra->cmin_attrs) + { + int i = lfirst_int(lc); + + nulls[i - 1] = false; + values[i - 1] = CommandIdGetDatum(InvalidCommandId); + } + foreach(lc, extra->cmax_attrs) + { + int i = lfirst_int(lc); + + nulls[i - 1] = false; + values[i - 1] = CommandIdGetDatum(InvalidCommandId); + } + + /* Fill tableoids with valid values */ + forboth(lc, extra->tableoid_attrs, lc2, extra->tableoid_values) + { + int i = lfirst_int(lc); + Oid tableoid = lfirst_oid(lc2); + + nulls[i - 1] = false; + values[i - 1] = ObjectIdGetDatum(tableoid); + } + } *** a/src/include/foreign/fdwapi.h --- b/src/include/foreign/fdwapi.h *************** *** 224,229 **** typedef struct FdwRoutine --- 224,247 ---- InitializeWorkerForeignScan_function InitializeWorkerForeignScan; } FdwRoutine; + /* + * Struct for extra information for making scan tuples described by + * fdw_scan_tlist + */ + typedef struct FdwScanTupleExtraData + { + List *fdw_scan_tlist; /* tlist describing the structure of tuple */ + bool has_sys_attrs; /* does tuple contain any system attrs other + * than ctids and oids */ + List *xmin_attrs; /* attribute numbers of xmins in tuple */ + List *cmin_attrs; /* attribute numbers of cmins in tuple */ + List *xmax_attrs; /* attribute numbers of xmaxs in tuple */ + List *cmax_attrs; /* attribute numbers of cmaxs in tuple */ + List *tableoid_attrs; /* attribute numbers of tableoids in tuple */ + List *tableoid_values; /* OIDs of tableoids in tuple */ + List *range_table; /* EState's list of RangeTblEntry */ + } FdwScanTupleExtraData; + /* Functions in foreign/foreign.c */ extern FdwRoutine *GetFdwRoutine(Oid fdwhandler); *************** *** 234,238 **** extern FdwRoutine *GetFdwRoutineForRelation(Relation relation, bool makecopy); --- 252,260 ---- extern bool IsImportableForeignTable(const char *tablename, ImportForeignSchemaStmt *stmt); extern Path *GetExistingLocalJoinPath(RelOptInfo *joinrel); + extern FdwScanTupleExtraData *GetFdwScanTupleExtraData(List *fdw_scan_tlist, + List *range_table); + extern void FillFdwScanTupleSysAttrs(FdwScanTupleExtraData *extra, + Datum *values, bool *nulls); #endif /* FDWAPI_H */
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers