On 2010-08-31 12:07 AM +0300, I wrote:
The patch needs a bit more comments and some cleaning up ..
Here's a cleaned up version with a bit more comments.
This patch still silently breaks pg_parse_and_rewrite(). We only use it
in our source code for SQL-language functions, so I think we should
deprecate it in favor of a function which would do the right thing for
SQL functions. Thoughts?
Regards,
Marko Tiikkaja
*** a/src/backend/catalog/pg_proc.c
--- b/src/backend/catalog/pg_proc.c
***************
*** 832,838 **** fmgr_sql_validator(PG_FUNCTION_ARGS)
proc->proargtypes.values,
proc->pronargs);
(void) check_sql_fn_retval(funcoid, proc->prorettype,
!
querytree_list,
NULL, NULL);
}
else
--- 832,838 ----
proc->proargtypes.values,
proc->pronargs);
(void) check_sql_fn_retval(funcoid, proc->prorettype,
!
llast(querytree_list),
NULL, NULL);
}
else
*** a/src/backend/executor/functions.c
--- b/src/backend/executor/functions.c
***************
*** 90,107 **** typedef struct
ParamListInfo paramLI; /* Param list representing current args
*/
Tuplestorestate *tstore; /* where we accumulate result tuples */
JunkFilter *junkFilter; /* will be NULL if function returns
VOID */
! /* head of linked list of execution_state records */
! execution_state *func_state;
} SQLFunctionCache;
typedef SQLFunctionCache *SQLFunctionCachePtr;
/* non-export function prototypes */
! static execution_state *init_execution_state(List *queryTree_list,
SQLFunctionCachePtr fcache,
bool lazyEvalOK);
static void init_sql_fcache(FmgrInfo *finfo, bool lazyEvalOK);
--- 90,107 ----
ParamListInfo paramLI; /* Param list representing current args
*/
Tuplestorestate *tstore; /* where we accumulate result tuples */
+ Snapshot snapshot;
JunkFilter *junkFilter; /* will be NULL if function returns
VOID */
! List *func_state;
} SQLFunctionCache;
typedef SQLFunctionCache *SQLFunctionCachePtr;
/* non-export function prototypes */
! static List *init_execution_state(List *queryTree_list,
SQLFunctionCachePtr fcache,
bool lazyEvalOK);
static void init_sql_fcache(FmgrInfo *finfo, bool lazyEvalOK);
***************
*** 123,183 **** static void sqlfunction_destroy(DestReceiver *self);
/* Set up the list of per-query execution_state records for a SQL function */
! static execution_state *
init_execution_state(List *queryTree_list,
SQLFunctionCachePtr fcache,
bool lazyEvalOK)
{
! execution_state *firstes = NULL;
! execution_state *preves = NULL;
execution_state *lasttages = NULL;
! ListCell *qtl_item;
! foreach(qtl_item, queryTree_list)
{
! Query *queryTree = (Query *) lfirst(qtl_item);
! Node *stmt;
! execution_state *newes;
! Assert(IsA(queryTree, Query));
! if (queryTree->commandType == CMD_UTILITY)
! stmt = queryTree->utilityStmt;
! else
! stmt = (Node *) pg_plan_query(queryTree, 0, NULL);
! /* Precheck all commands for validity in a function */
! if (IsA(stmt, TransactionStmt))
! ereport(ERROR,
! (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
! /* translator: %s is a SQL statement name */
! errmsg("%s is not allowed in a SQL
function",
!
CreateCommandTag(stmt))));
! if (fcache->readonly_func && !CommandIsReadOnly(stmt))
! ereport(ERROR,
! (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
! /* translator: %s is a SQL statement name */
! errmsg("%s is not allowed in a
non-volatile function",
!
CreateCommandTag(stmt))));
! newes = (execution_state *) palloc(sizeof(execution_state));
! if (preves)
! preves->next = newes;
! else
! firstes = newes;
! newes->next = NULL;
! newes->status = F_EXEC_START;
! newes->setsResult = false; /* might change below */
! newes->lazyEval = false; /* might change below */
! newes->stmt = stmt;
! newes->qd = NULL;
! if (queryTree->canSetTag)
! lasttages = newes;
! preves = newes;
}
/*
--- 123,200 ----
/* Set up the list of per-query execution_state records for a SQL function */
! static List *
init_execution_state(List *queryTree_list,
SQLFunctionCachePtr fcache,
bool lazyEvalOK)
{
! execution_state *firstes;
! execution_state *preves;
execution_state *lasttages = NULL;
! List *eslist;
! ListCell *lc1;
! ListCell *lc2;
! List *qtlist;
! Query *queryTree;
!
!
! eslist = NIL;
! foreach(lc1, queryTree_list)
{
! qtlist = (List *) lfirst(lc1);
! firstes = NULL;
! preves = NULL;
! foreach(lc2, qtlist)
! {
! Node *stmt;
! execution_state *newes;
! queryTree = (Query *) lfirst(lc2);
! Assert(IsA(queryTree, Query));
! if (queryTree->commandType == CMD_UTILITY)
! stmt = queryTree->utilityStmt;
! else
! stmt = (Node *) pg_plan_query(queryTree, 0,
NULL);
! /* Precheck all commands for validity in a function */
! if (IsA(stmt, TransactionStmt))
! ereport(ERROR,
!
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
! /* translator: %s is a SQL statement name */
! errmsg("%s is not allowed in a
SQL function",
!
CreateCommandTag(stmt))));
! if (fcache->readonly_func && !CommandIsReadOnly(stmt))
! ereport(ERROR,
!
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
! /* translator: %s is a SQL statement name */
! errmsg("%s is not allowed in a
non-volatile function",
!
CreateCommandTag(stmt))));
!
! newes = (execution_state *)
palloc(sizeof(execution_state));
! if (preves)
! preves->next = newes;
! else
! firstes = newes;
! newes->next = NULL;
! newes->status = F_EXEC_START;
! newes->setsResult = false; /* might change
below */
! newes->lazyEval = false; /* might change below */
! newes->stmt = stmt;
! newes->qd = NULL;
! if (queryTree->canSetTag)
! lasttages = newes;
!
! preves = newes;
! }
!
! eslist = lappend(eslist, firstes);
}
/*
***************
*** 210,216 **** init_execution_state(List *queryTree_list,
}
}
! return firstes;
}
/* Initialize the SQLFunctionCache for a SQL function */
--- 227,233 ----
}
}
! return eslist;
}
/* Initialize the SQLFunctionCache for a SQL function */
***************
*** 342,348 **** init_sql_fcache(FmgrInfo *finfo, bool lazyEvalOK)
*/
fcache->returnsTuple = check_sql_fn_retval(foid,
rettype,
!
queryTree_list,
NULL,
&fcache->junkFilter);
--- 359,365 ----
*/
fcache->returnsTuple = check_sql_fn_retval(foid,
rettype,
!
llast(queryTree_list),
NULL,
&fcache->junkFilter);
***************
*** 374,397 **** init_sql_fcache(FmgrInfo *finfo, bool lazyEvalOK)
static void
postquel_start(execution_state *es, SQLFunctionCachePtr fcache)
{
- Snapshot snapshot;
DestReceiver *dest;
Assert(es->qd == NULL);
! /*
! * In a read-only function, use the surrounding query's snapshot;
! * otherwise take a new snapshot for each query. The snapshot should
! * include a fresh command ID so that all work to date in this
transaction
! * is visible.
! */
! if (fcache->readonly_func)
! snapshot = GetActiveSnapshot();
! else
! {
! CommandCounterIncrement();
! snapshot = GetTransactionSnapshot();
! }
/*
* If this query produces the function result, send its output to the
--- 391,401 ----
static void
postquel_start(execution_state *es, SQLFunctionCachePtr fcache)
{
DestReceiver *dest;
Assert(es->qd == NULL);
! Assert(ActiveSnapshotSet());
/*
* If this query produces the function result, send its output to the
***************
*** 415,427 **** postquel_start(execution_state *es, SQLFunctionCachePtr fcache)
if (IsA(es->stmt, PlannedStmt))
es->qd = CreateQueryDesc((PlannedStmt *) es->stmt,
fcache->src,
! snapshot,
InvalidSnapshot,
dest,
fcache->paramLI, 0);
else
es->qd = CreateUtilityQueryDesc(es->stmt,
fcache->src,
!
snapshot,
dest,
fcache->paramLI);
--- 419,432 ----
if (IsA(es->stmt, PlannedStmt))
es->qd = CreateQueryDesc((PlannedStmt *) es->stmt,
fcache->src,
!
GetActiveSnapshot(),
!
InvalidSnapshot,
dest,
fcache->paramLI, 0);
else
es->qd = CreateUtilityQueryDesc(es->stmt,
fcache->src,
!
GetActiveSnapshot(),
dest,
fcache->paramLI);
***************
*** 617,622 **** fmgr_sql(PG_FUNCTION_ARGS)
--- 622,629 ----
execution_state *es;
TupleTableSlot *slot;
Datum result;
+ List *eslist;
+ ListCell *eslc;
/*
* Switch to context in which the fcache lives. This ensures that
***************
*** 668,680 **** fmgr_sql(PG_FUNCTION_ARGS)
init_sql_fcache(fcinfo->flinfo, lazyEvalOK);
fcache = (SQLFunctionCachePtr) fcinfo->flinfo->fn_extra;
}
! es = fcache->func_state;
/*
* Convert params to appropriate format if starting a fresh execution.
(If
* continuing execution, we can re-use prior params.)
*/
! if (es && es->status == F_EXEC_START)
postquel_sub_params(fcache, fcinfo);
/*
--- 675,687 ----
init_sql_fcache(fcinfo->flinfo, lazyEvalOK);
fcache = (SQLFunctionCachePtr) fcinfo->flinfo->fn_extra;
}
! eslist = fcache->func_state;
/*
* Convert params to appropriate format if starting a fresh execution.
(If
* continuing execution, we can re-use prior params.)
*/
! if (linitial(eslist) && ((execution_state *) linitial(eslist))->status
== F_EXEC_START)
postquel_sub_params(fcache, fcinfo);
/*
***************
*** 687,694 **** fmgr_sql(PG_FUNCTION_ARGS)
/*
* Find first unfinished query in function.
*/
! while (es && es->status == F_EXEC_DONE)
! es = es->next;
/*
* Execute each command in the function one after another until we
either
--- 694,709 ----
/*
* Find first unfinished query in function.
*/
! foreach(eslc, eslist)
! {
! es = (execution_state *) lfirst(eslc);
!
! while (es && es->status == F_EXEC_DONE)
! es = es->next;
!
! if (es)
! break;
! }
/*
* Execute each command in the function one after another until we
either
***************
*** 699,706 **** fmgr_sql(PG_FUNCTION_ARGS)
--- 714,744 ----
bool completed;
if (es->status == F_EXEC_START)
+ {
+ if (!fcache->readonly_func)
+ {
+ /*
+ * In a read-only function, use the surrounding
query's snapshot;
+ * otherwise take a new snapshot if we don't
have one yet. The
+ * snapshot should include a fresh command ID
so that all work to
+ * date in this transaction is visible.
+ */
+ if (!fcache->snapshot)
+ {
+ CommandCounterIncrement();
+ fcache->snapshot =
RegisterSnapshot(GetTransactionSnapshot());
+ PushActiveSnapshot(fcache->snapshot);
+ }
+ else
+ PushUpdatedSnapshot(fcache->snapshot);
+ }
+
postquel_start(es, fcache);
+ if (!fcache->readonly_func)
+ PopActiveSnapshot();
+ }
+
completed = postquel_getnext(es, fcache);
/*
***************
*** 726,731 **** fmgr_sql(PG_FUNCTION_ARGS)
--- 764,788 ----
if (es->status != F_EXEC_DONE)
break;
es = es->next;
+
+ if (!es)
+ {
+ eslc = lnext(eslc);
+ if (!eslc)
+ break;
+
+ es = (execution_state *) lfirst(eslc);
+
+ /* make sure we take a new snapshot for this query list
*/
+ if (!fcache->readonly_func)
+ {
+ Assert(fcache->snapshot != InvalidSnapshot);
+ UnregisterSnapshot(fcache->snapshot);
+ fcache->snapshot = InvalidSnapshot;
+ }
+ else
+ Assert(fcache->snapshot == InvalidSnapshot);
+ }
}
/*
***************
*** 794,799 **** fmgr_sql(PG_FUNCTION_ARGS)
--- 851,861 ----
PointerGetDatum(fcache));
fcache->shutdown_reg = false;
}
+
+ /* Unregister snapshot if we have one */
+ if (fcache->snapshot != InvalidSnapshot)
+ UnregisterSnapshot(fcache->snapshot);
+ fcache->snapshot = InvalidSnapshot;
}
else
{
***************
*** 820,825 **** fmgr_sql(PG_FUNCTION_ARGS)
--- 882,892 ----
PointerGetDatum(fcache));
fcache->shutdown_reg = false;
}
+
+ /* Unregister snapshot if we have one */
+ if (fcache->snapshot != InvalidSnapshot)
+ UnregisterSnapshot(fcache->snapshot);
+ fcache->snapshot = InvalidSnapshot;
}
}
else
***************
*** 850,855 **** fmgr_sql(PG_FUNCTION_ARGS)
--- 917,927 ----
/* Clear the tuplestore, but keep it for next time */
tuplestore_clear(fcache->tstore);
+
+ /* Unregister snapshot if we have one */
+ if (fcache->snapshot != InvalidSnapshot)
+ UnregisterSnapshot(fcache->snapshot);
+ fcache->snapshot = InvalidSnapshot;
}
/*
***************
*** 858,868 **** fmgr_sql(PG_FUNCTION_ARGS)
*/
if (es == NULL)
{
! es = fcache->func_state;
! while (es)
{
! es->status = F_EXEC_START;
! es = es->next;
}
}
--- 930,943 ----
*/
if (es == NULL)
{
! foreach(eslc, fcache->func_state)
{
! es = (execution_state *) lfirst(eslc);
! while (es)
! {
! es->status = F_EXEC_START;
! es = es->next;
! }
}
}
***************
*** 913,931 **** sql_exec_error_callback(void *arg)
{
execution_state *es;
int query_num;
- es = fcache->func_state;
query_num = 1;
! while (es)
{
! if (es->qd)
{
! errcontext("SQL function \"%s\" statement %d",
! fcache->fname, query_num);
! break;
}
- es = es->next;
- query_num++;
}
if (es == NULL)
{
--- 988,1011 ----
{
execution_state *es;
int query_num;
+ ListCell *lc;
query_num = 1;
!
! foreach(lc, fcache->func_state)
{
! es = (execution_state *) lfirst(lc);
! while (es)
{
! if (es->qd)
! {
! errcontext("SQL function \"%s\"
statement %d",
! fcache->fname,
query_num);
! break;
! }
! es = es->next;
! query_num++;
}
}
if (es == NULL)
{
***************
*** 956,973 **** static void
ShutdownSQLFunction(Datum arg)
{
SQLFunctionCachePtr fcache = (SQLFunctionCachePtr) DatumGetPointer(arg);
! execution_state *es = fcache->func_state;
! while (es != NULL)
{
! /* Shut down anything still running */
! if (es->status == F_EXEC_RUN)
! postquel_end(es);
! /* Reset states to START in case we're called again */
! es->status = F_EXEC_START;
! es = es->next;
}
/* Release tuplestore if we have one */
if (fcache->tstore)
tuplestore_end(fcache->tstore);
--- 1036,1064 ----
ShutdownSQLFunction(Datum arg)
{
SQLFunctionCachePtr fcache = (SQLFunctionCachePtr) DatumGetPointer(arg);
! execution_state *es;
! ListCell *lc;
! foreach(lc, fcache->func_state)
{
! es = (execution_state *) lfirst(lc);
!
! while (es)
! {
! /* Shut down anything still running */
! if (es->status == F_EXEC_RUN)
! postquel_end(es);
! /* Reset states to START in case we're called again */
! es->status = F_EXEC_START;
! es = es->next;
! }
}
+ /* Unregister snapshot if we have one */
+ if (fcache->snapshot != InvalidSnapshot)
+ UnregisterSnapshot(fcache->snapshot);
+ fcache->snapshot = InvalidSnapshot;
+
/* Release tuplestore if we have one */
if (fcache->tstore)
tuplestore_end(fcache->tstore);
*** a/src/backend/executor/spi.c
--- b/src/backend/executor/spi.c
***************
*** 1769,1774 **** _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
--- 1769,1775 ----
SPITupleTable *my_tuptable = NULL;
int res = 0;
bool have_active_snap = ActiveSnapshotSet();
+ bool registered_snap = false;
ErrorContextCallback spierrcontext;
CachedPlan *cplan = NULL;
ListCell *lc1;
***************
*** 1872,1879 **** _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
}
else
{
!
PushActiveSnapshot(GetTransactionSnapshot());
pushed_active_snap = true;
}
}
else
--- 1873,1882 ----
}
else
{
! snapshot =
RegisterSnapshot(GetTransactionSnapshot());
! PushActiveSnapshot(snapshot);
pushed_active_snap = true;
+ registered_snap = true;
}
}
else
***************
*** 1966,1975 **** _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
--- 1969,1991 ----
*/
if (!read_only)
CommandCounterIncrement();
+
+ /*
+ * If we took a new snapshot for this query list, unregister it
and
+ * make sure we take a new one for the next list.
+ */
+ if (registered_snap)
+ {
+ UnregisterSnapshot(snapshot);
+ snapshot = InvalidSnapshot;
+ }
}
fail:
+ if (registered_snap)
+ UnregisterSnapshot(snapshot);
+
/* We no longer need the cached plan refcount, if any */
if (cplan)
ReleaseCachedPlan(cplan, true);
*** a/src/backend/tcop/postgres.c
--- b/src/backend/tcop/postgres.c
***************
*** 537,547 **** pg_parse_and_rewrite(const char *query_string, /* string to
execute */
{
Node *parsetree = (Node *) lfirst(list_item);
! querytree_list = list_concat(querytree_list,
!
pg_analyze_and_rewrite(parsetree,
!
query_string,
!
paramTypes,
!
numParams));
}
return querytree_list;
--- 537,547 ----
{
Node *parsetree = (Node *) lfirst(list_item);
! querytree_list = lappend(querytree_list,
!
pg_analyze_and_rewrite(parsetree,
!
query_string,
!
paramTypes,
!
numParams));
}
return querytree_list;
*** a/src/backend/tcop/pquery.c
--- b/src/backend/tcop/pquery.c
***************
*** 170,180 **** ProcessQuery(PlannedStmt *plan,
elog(DEBUG3, "ProcessQuery");
/*
- * Must always set a snapshot for plannable queries.
- */
- PushActiveSnapshot(GetTransactionSnapshot());
-
- /*
* Create the QueryDesc object
*/
queryDesc = CreateQueryDesc(plan, sourceText,
--- 170,175 ----
***************
*** 234,241 **** ProcessQuery(PlannedStmt *plan,
/* Now take care of any queued AFTER triggers */
AfterTriggerEndQuery(queryDesc->estate);
- PopActiveSnapshot();
-
/*
* Now, we close down all the scans and free allocated resources.
*/
--- 229,234 ----
***************
*** 1220,1225 **** PortalRunMulti(Portal portal, bool isTopLevel,
--- 1213,1219 ----
char *completionTag)
{
ListCell *stmtlist_item;
+ Snapshot snapshot = InvalidSnapshot;
/*
* If the destination is DestRemoteExecute, change to DestNone. The
***************
*** 1262,1267 **** PortalRunMulti(Portal portal, bool isTopLevel,
--- 1256,1270 ----
if (log_executor_stats)
ResetUsage();
+ /* if no snapshot is set, grab a new one and register
it */
+ if (snapshot == InvalidSnapshot)
+ {
+ snapshot =
RegisterSnapshot(GetTransactionSnapshot());
+ PushActiveSnapshot(snapshot);
+ }
+ else
+ PushUpdatedSnapshot(snapshot);
+
if (pstmt->canSetTag)
{
/* statement can set tag string */
***************
*** 1279,1284 **** PortalRunMulti(Portal portal, bool isTopLevel,
--- 1282,1289 ----
altdest, NULL);
}
+ PopActiveSnapshot();
+
if (log_executor_stats)
ShowUsage("EXECUTOR STATISTICS");
***************
*** 1291,1301 **** PortalRunMulti(Portal portal, bool isTopLevel,
--- 1296,1320 ----
*
* These are assumed canSetTag if they're the only stmt
in the
* portal.
+ *
+ * NotifyStmt is the only utility statement allowed in
a list of
+ * rewritten queries, and it doesn't need a snapshot so
we don't
+ * need to worry about it. However, if the list has
only one
+ * statement and it's a utility statement, we are not
allowed to
+ * take a snapshot. See the first comment in
PortalRunUtility().
*/
if (list_length(portal->stmts) == 1)
+ {
+ Assert(snapshot == InvalidSnapshot);
+
PortalRunUtility(portal, stmt, isTopLevel,
dest, completionTag);
+ }
else
+ {
+ Assert(IsA(stmt, NotifyStmt));
+
PortalRunUtility(portal, stmt, isTopLevel,
altdest, NULL);
+ }
}
/*
***************
*** 1313,1318 **** PortalRunMulti(Portal portal, bool isTopLevel,
--- 1332,1340 ----
MemoryContextDeleteChildren(PortalGetHeapMemory(portal));
}
+ if (snapshot != InvalidSnapshot)
+ UnregisterSnapshot(snapshot);
+
/*
* If a command completion tag was supplied, use it. Otherwise use the
* portal's commandTag as the default completion tag.
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers