On 4/4/17 7:44 PM, Craig Ringer wrote:
On 5 April 2017 at 08:23, Craig Ringer <cr...@2ndquadrant.com> wrote:
On 5 April 2017 at 08:00, Craig Ringer <cr...@2ndquadrant.com> wrote:
This patch fails to update the documentation at all.
https://www.postgresql.org/docs/devel/static/spi.html
I'll fix that soon.
missing newline
Fixed.
+/* Execute a previously prepared plan with a callback Destination */
caps "Destination"
Hmm, I capitalized it since DestReceiver is capitalized. I guess it's
best to just drop Destination.
+ // XXX throw error if callback is set
Fixed (opted to just use an Assert).
+static DestReceiver spi_callbackDR = {
+ donothingReceive, donothingStartup, donothingCleanup, donothingCleanup,
+ DestSPICallback
+};
Presumably that's a default destination you're then supposed to modify
with your own callbacks? There aren't any comments to indicate what's
going on here.
Correct. Added the following comment:
/*
* This is strictly a starting point for creating a callback. It should not
* actually be used.
*/
Comments on patch 2:
If this is the "bare minimum" patch, what is pending? Does it impose
any downsides or limits?
No limits. I'm not aware of any downsides.
It's "bare minimum" because a follow-on step is to allow different
methods of returning results. In particular, my testing indicates that
patch 1 + returning a dict of lists (as opposed to the current list of
dicts) results in a 460% improvement vs ~30% with patch 2.
+/* Get switch execution contexts */
+extern PLyExecutionContext
*PLy_switch_execution_context(PLyExecutionContext *new);
Comment makes no sense to me. This seems something like memory context
switch, where you supply the new and return the old. But the comment
doesn't explain it at all.
Comment changed to
/* Switch execution context (similar to MemoryContextSwitchTo */
+void PLy_CSStartup(DestReceiver *self, int operation, TupleDesc typeinfo);
+void PLy_CSDestroy(DestReceiver *self);
These are declared in the plpy_spi.c. Why aren't these static?
Derp. Fixed.
+ volatile MemoryContext oldcontext;
+ volatile ResourceOwner oldowner;
int rv;
- volatile MemoryContext oldcontext;
- volatile ResourceOwner oldowner;
Unnecessary code movement.
IMHO it reads better that way. I've left it for now so $COMMITTER can
decide, but if it really bugs you let me know and I'll swap it.
In PLy_Callback_New, I think your use of a sub-context is sensible. Is
it necessary to palloc0 though?
Hrm, maybe not... but it seems like cheap insurance considering the
amount of other stuff involved in just starting a new SPI call. And
honestly, I'd rather not mess with it at this point. :) I have added an
XXX comment though.
+static CallbackState *
+PLy_Callback_Free(CallbackState *callback)
The code here looks like it could be part of spi.c's callback support,
rather than plpy_spi specific, since you provide a destroy callback in
the SPI callback struct.
I added this for use in PG_CATCH() blocks, because it's not clear to me
that the portal gets cleaned up in those cases. It's certainly possible
that it's pointless.
FWIW, I'm pretty sure I copied that pattern from somewhere else...
probably plpgsql or pltcl.
+ /* We need to store this because the TupleDesc the receive
function gets has no names. */
+ myState->desc = typeinfo;
Is it safe to just store the pointer to the TupleDesc here? What's the lifetime?
Only Portal lifetime.
+ * will clean it up when the time is right. XXX This might result in a leak
+ * if an error happens and the result doesn't get dereferenced.
+ */
+ MemoryContextSwitchTo(TopMemoryContext);
+ result->tupdesc = CreateTupleDescCopy(typeinfo);
especially given this XXX comment...
I've changed the comment to the following. Hopefully this clarifies things:
/*
* Save tuple descriptor for later use by result set metadata
* functions. Save it in TopMemoryContext so that it survives outside
of
* an SPI context. We trust that PLy_result_dealloc() will clean it up
* when the time is right. The difference between result and everything
* else is that result needs to survive after the portal is destroyed,
* because result is what's handed back to the plpython function. While
* it's tempting to use something other than TopMemoryContext, that
won't
* work: the user could potentially put result into the global
dictionary,
* which means it could survive as long as the session does. This might
* result in a leak if an error happens and the result doesn't get
* dereferenced, but if that happens it means the python GC has failed
us,
* at which point we probably have bigger problems.
*
* This still isn't perfect though; if something the result tupledesc
* references has it's OID changed then the tupledesc will be invalid.
I'm
* not sure it's worth worrying about that though.
*/
Updated patches attached, but I still need to update the docs.
--
Jim Nasby, Chief Data Architect, Austin TX
OpenSCG http://OpenSCG.com
From 0a2ef661f55a047763a43b0eebd7483760e4a427 Mon Sep 17 00:00:00 2001
From: Jim Nasby <jim.na...@bluetreble.com>
Date: Wed, 5 Apr 2017 20:52:39 -0500
Subject: [PATCH 1/2] Add SPI_execute_callback
Instead of placing results in a tuplestore, this method of execution
uses the supplied callback when creating the Portal for a query.
---
src/backend/executor/spi.c | 80 ++++++++++++++++++++++++++++++++++++++++------
src/backend/tcop/dest.c | 15 +++++++++
src/include/executor/spi.h | 4 +++
src/include/tcop/dest.h | 1 +
4 files changed, 90 insertions(+), 10 deletions(-)
diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c
index ca547dc6d9..4f6c3011f9 100644
--- a/src/backend/executor/spi.c
+++ b/src/backend/executor/spi.c
@@ -55,7 +55,8 @@ static void _SPI_prepare_oneshot_plan(const char *src,
SPIPlanPtr plan);
static int _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
Snapshot snapshot, Snapshot
crosscheck_snapshot,
- bool read_only, bool fire_triggers, uint64
tcount);
+ bool read_only, bool fire_triggers, uint64
tcount,
+ DestReceiver *callback);
static ParamListInfo _SPI_convert_params(int nargs, Oid *argtypes,
Datum *Values, const char *Nulls);
@@ -321,7 +322,35 @@ SPI_execute(const char *src, bool read_only, long tcount)
res = _SPI_execute_plan(&plan, NULL,
InvalidSnapshot,
InvalidSnapshot,
- read_only, true,
tcount);
+ read_only, true,
tcount, NULL);
+
+ _SPI_end_call(true);
+ return res;
+}
+
+int
+SPI_execute_callback(const char *src, bool read_only, long tcount,
+ DestReceiver *callback)
+{
+ _SPI_plan plan;
+ int res;
+
+ if (src == NULL || tcount < 0)
+ return SPI_ERROR_ARGUMENT;
+
+ res = _SPI_begin_call(true);
+ if (res < 0)
+ return res;
+
+ memset(&plan, 0, sizeof(_SPI_plan));
+ plan.magic = _SPI_PLAN_MAGIC;
+ plan.cursor_options = 0;
+
+ _SPI_prepare_oneshot_plan(src, &plan);
+
+ res = _SPI_execute_plan(&plan, NULL,
+ InvalidSnapshot,
InvalidSnapshot,
+ read_only, true,
tcount, callback);
_SPI_end_call(true);
return res;
@@ -355,7 +384,34 @@ SPI_execute_plan(SPIPlanPtr plan, Datum *Values, const
char *Nulls,
_SPI_convert_params(plan->nargs, plan->argtypes,
Values, Nulls),
InvalidSnapshot,
InvalidSnapshot,
- read_only, true,
tcount);
+ read_only, true,
tcount, NULL);
+
+ _SPI_end_call(true);
+ return res;
+}
+
+/* Execute a previously prepared plan with a callback */
+int
+SPI_execute_plan_callback(SPIPlanPtr plan, Datum *Values, const char *Nulls,
+ bool read_only, long tcount, DestReceiver
*callback)
+{
+ int res;
+
+ if (plan == NULL || plan->magic != _SPI_PLAN_MAGIC || tcount < 0)
+ return SPI_ERROR_ARGUMENT;
+
+ if (plan->nargs > 0 && Values == NULL)
+ return SPI_ERROR_PARAM;
+
+ res = _SPI_begin_call(true);
+ if (res < 0)
+ return res;
+
+ res = _SPI_execute_plan(plan,
+
_SPI_convert_params(plan->nargs, plan->argtypes,
+
Values, Nulls),
+ InvalidSnapshot,
InvalidSnapshot,
+ read_only, true,
tcount, callback);
_SPI_end_call(true);
return res;
@@ -384,7 +440,7 @@ SPI_execute_plan_with_paramlist(SPIPlanPtr plan,
ParamListInfo params,
res = _SPI_execute_plan(plan, params,
InvalidSnapshot,
InvalidSnapshot,
- read_only, true,
tcount);
+ read_only, true,
tcount, NULL);
_SPI_end_call(true);
return res;
@@ -425,7 +481,7 @@ SPI_execute_snapshot(SPIPlanPtr plan,
_SPI_convert_params(plan->nargs, plan->argtypes,
Values, Nulls),
snapshot,
crosscheck_snapshot,
- read_only,
fire_triggers, tcount);
+ read_only,
fire_triggers, tcount, NULL);
_SPI_end_call(true);
return res;
@@ -472,7 +528,7 @@ SPI_execute_with_args(const char *src,
res = _SPI_execute_plan(&plan, paramLI,
InvalidSnapshot,
InvalidSnapshot,
- read_only, true,
tcount);
+ read_only, true,
tcount, NULL);
_SPI_end_call(true);
return res;
@@ -1907,7 +1963,8 @@ _SPI_prepare_oneshot_plan(const char *src, SPIPlanPtr
plan)
static int
_SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
Snapshot snapshot, Snapshot
crosscheck_snapshot,
- bool read_only, bool fire_triggers, uint64
tcount)
+ bool read_only, bool fire_triggers, uint64
tcount,
+ DestReceiver *callback)
{
int my_res = 0;
uint64 my_processed = 0;
@@ -1918,6 +1975,7 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
ErrorContextCallback spierrcontext;
CachedPlan *cplan = NULL;
ListCell *lc1;
+ DestReceiver *dest = callback;
/*
* Setup error traceback support for ereport()
@@ -2037,7 +2095,6 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
{
PlannedStmt *stmt = castNode(PlannedStmt, lfirst(lc2));
bool canSetTag = stmt->canSetTag;
- DestReceiver *dest;
_SPI_current->processed = 0;
_SPI_current->lastoid = InvalidOid;
@@ -2082,7 +2139,8 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
UpdateActiveSnapshotCommandId();
}
- dest = CreateDestReceiver(canSetTag ? DestSPI :
DestNone);
+ if (!callback)
+ dest = CreateDestReceiver(canSetTag ? DestSPI :
DestNone);
if (stmt->utilityStmt == NULL)
{
@@ -2108,6 +2166,7 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
{
char
completionTag[COMPLETION_TAG_BUFSIZE];
+ Assert(!callback);
ProcessUtility(stmt,
plansource->query_string,
PROCESS_UTILITY_QUERY,
@@ -2281,7 +2340,8 @@ _SPI_pquery(QueryDesc *queryDesc, bool fire_triggers,
uint64 tcount)
switch (operation)
{
case CMD_SELECT:
- if (queryDesc->dest->mydest != DestSPI)
+ if (queryDesc->dest->mydest != DestSPI &&
+ queryDesc->dest->mydest !=
DestSPICallback)
{
/* Don't return SPI_OK_SELECT if we're
discarding result */
res = SPI_OK_UTILITY;
diff --git a/src/backend/tcop/dest.c b/src/backend/tcop/dest.c
index 28081c3765..f68b6e1b51 100644
--- a/src/backend/tcop/dest.c
+++ b/src/backend/tcop/dest.c
@@ -87,6 +87,15 @@ static DestReceiver spi_printtupDR = {
DestSPI
};
+/*
+ * This is strictly a starting point for creating a callback. It should not
+ * actually be used.
+ */
+static DestReceiver spi_callbackDR = {
+ donothingReceive, donothingStartup, donothingCleanup, donothingCleanup,
+ DestSPICallback
+};
+
/* Globally available receiver for DestNone */
DestReceiver *None_Receiver = &donothingDR;
@@ -126,6 +135,9 @@ CreateDestReceiver(CommandDest dest)
case DestSPI:
return &spi_printtupDR;
+ case DestSPICallback:
+ return &spi_callbackDR;
+
case DestTuplestore:
return CreateTuplestoreDestReceiver();
@@ -172,6 +184,7 @@ EndCommand(const char *commandTag, CommandDest dest)
case DestNone:
case DestDebug:
case DestSPI:
+ case DestSPICallback:
case DestTuplestore:
case DestIntoRel:
case DestCopyOut:
@@ -216,6 +229,7 @@ NullCommand(CommandDest dest)
case DestNone:
case DestDebug:
case DestSPI:
+ case DestSPICallback:
case DestTuplestore:
case DestIntoRel:
case DestCopyOut:
@@ -262,6 +276,7 @@ ReadyForQuery(CommandDest dest)
case DestNone:
case DestDebug:
case DestSPI:
+ case DestSPICallback:
case DestTuplestore:
case DestIntoRel:
case DestCopyOut:
diff --git a/src/include/executor/spi.h b/src/include/executor/spi.h
index 94a805d477..13719e1df5 100644
--- a/src/include/executor/spi.h
+++ b/src/include/executor/spi.h
@@ -80,11 +80,15 @@ extern PGDLLIMPORT int SPI_result;
extern int SPI_connect(void);
extern int SPI_finish(void);
extern int SPI_execute(const char *src, bool read_only, long tcount);
+extern int SPI_execute_callback(const char *src, bool read_only, long
tcount,
+
DestReceiver *callback);
extern int SPI_execute_plan(SPIPlanPtr plan, Datum *Values, const char *Nulls,
bool read_only, long tcount);
extern int SPI_execute_plan_with_paramlist(SPIPlanPtr plan,
ParamListInfo
params,
bool read_only,
long tcount);
+extern int SPI_execute_plan_callback(SPIPlanPtr plan, Datum *Values, const
char *Nulls,
+ bool read_only, long tcount, DestReceiver
*callback);
extern int SPI_exec(const char *src, long tcount);
extern int SPI_execp(SPIPlanPtr plan, Datum *Values, const char *Nulls,
long tcount);
diff --git a/src/include/tcop/dest.h b/src/include/tcop/dest.h
index c459af2e13..1d1d641ae0 100644
--- a/src/include/tcop/dest.h
+++ b/src/include/tcop/dest.h
@@ -91,6 +91,7 @@ typedef enum
DestRemoteExecute, /* sent to frontend, in Execute
command */
DestRemoteSimple, /* sent to frontend, w/no
catalog access */
DestSPI, /* results sent to SPI
manager */
+ DestSPICallback, /* results sent to
user-specified callback function */
DestTuplestore, /* results sent to Tuplestore */
DestIntoRel, /* results sent to relation
(SELECT INTO) */
DestCopyOut, /* results sent to COPY TO code
*/
--
2.11.1
From d693fa42135e5f773cc8affcd26eba4d2ef22f2b Mon Sep 17 00:00:00 2001
From: Jim Nasby <jim.na...@bluetreble.com>
Date: Wed, 5 Apr 2017 21:30:39 -0500
Subject: [PATCH 2/2] Switch plpython to using SPI_execute_callback
This is a bare minimum patch to switch plpython to using SPI callbacks
in lieu of a tuplestore. Simple testing shows a ~27% speedup with a
simple generate_series(1,10000000).
---
src/pl/plpython/plpy_main.c | 13 ++
src/pl/plpython/plpy_main.h | 3 +
src/pl/plpython/plpy_spi.c | 313 ++++++++++++++++++++++++++++++++------------
3 files changed, 248 insertions(+), 81 deletions(-)
diff --git a/src/pl/plpython/plpy_main.c b/src/pl/plpython/plpy_main.c
index 860b804e54..07501f18f2 100644
--- a/src/pl/plpython/plpy_main.c
+++ b/src/pl/plpython/plpy_main.c
@@ -403,6 +403,19 @@ PLy_current_execution_context(void)
return PLy_execution_contexts;
}
+PLyExecutionContext *
+PLy_switch_execution_context(PLyExecutionContext *new)
+{
+ PLyExecutionContext *last = PLy_execution_contexts;
+
+ if (PLy_execution_contexts == NULL)
+ elog(ERROR, "no Python function is currently executing");
+
+ PLy_execution_contexts = new;
+
+ return last;
+}
+
MemoryContext
PLy_get_scratch_context(PLyExecutionContext *context)
{
diff --git a/src/pl/plpython/plpy_main.h b/src/pl/plpython/plpy_main.h
index 10426c4323..fe30dbc14b 100644
--- a/src/pl/plpython/plpy_main.h
+++ b/src/pl/plpython/plpy_main.h
@@ -25,6 +25,9 @@ typedef struct PLyExecutionContext
/* Get the current execution context */
extern PLyExecutionContext *PLy_current_execution_context(void);
+/* Switch execution context (similar to MemoryContextSwitchTo */
+extern PLyExecutionContext *PLy_switch_execution_context(PLyExecutionContext
*new);
+
/* Get the scratch memory context for specified execution context */
extern MemoryContext PLy_get_scratch_context(PLyExecutionContext *context);
diff --git a/src/pl/plpython/plpy_spi.c b/src/pl/plpython/plpy_spi.c
index c6856ccbac..236cc6d998 100644
--- a/src/pl/plpython/plpy_spi.c
+++ b/src/pl/plpython/plpy_spi.c
@@ -28,9 +28,27 @@
#include "plpy_procedure.h"
#include "plpy_resultobject.h"
+typedef struct
+{
+ DestReceiver pub;
+ PLyExecutionContext *exec_ctx;
+ MemoryContext parent_ctx;
+ MemoryContext cb_ctx;
+ TupleDesc desc;
+ PLyTypeInfo *args;
+
+ PyObject *result;
+} CallbackState;
+
+static void PLy_CSStartup(DestReceiver *self, int operation, TupleDesc
typeinfo);
+static void PLy_CSDestroy(DestReceiver *self);
+static bool PLy_CSreceive(TupleTableSlot *slot, DestReceiver *self);
+static CallbackState *PLy_Callback_New(PLyExecutionContext *exec_ctx);
+static CallbackState *PLy_Callback_Free(CallbackState *callback);
+static PLyResultObject *PLyCSNewResult(CallbackState *myState);
static PyObject *PLy_spi_execute_query(char *query, long limit);
-static PyObject *PLy_spi_execute_fetch_result(SPITupleTable *tuptable,
+static PyObject *PLy_spi_execute_fetch_result(CallbackState *callback,
uint64 rows, int
status);
static void PLy_spi_exception_set(PyObject *excclass, ErrorData *edata);
@@ -195,6 +213,8 @@ PLy_spi_execute(PyObject *self, PyObject *args)
PyObject *
PLy_spi_execute_plan(PyObject *ob, PyObject *list, long limit)
{
+ PLyExecutionContext *exec_ctx = PLy_current_execution_context();
+ CallbackState *callback;
volatile int nargs;
int i,
rv;
@@ -237,12 +257,12 @@ PLy_spi_execute_plan(PyObject *ob, PyObject *list, long
limit)
oldcontext = CurrentMemoryContext;
oldowner = CurrentResourceOwner;
+ callback = PLy_Callback_New(exec_ctx);
PLy_spi_subtransaction_begin(oldcontext, oldowner);
PG_TRY();
{
- PLyExecutionContext *exec_ctx = PLy_current_execution_context();
char *volatile nulls;
volatile int j;
@@ -288,9 +308,10 @@ PLy_spi_execute_plan(PyObject *ob, PyObject *list, long
limit)
}
}
- rv = SPI_execute_plan(plan->plan, plan->values, nulls,
-
exec_ctx->curr_proc->fn_readonly, limit);
- ret = PLy_spi_execute_fetch_result(SPI_tuptable, SPI_processed,
rv);
+ rv = SPI_execute_plan_callback(plan->plan, plan->values, nulls,
+
exec_ctx->curr_proc->fn_readonly, limit,
+ (DestReceiver *)
callback);
+ ret = PLy_spi_execute_fetch_result(callback, SPI_processed, rv);
if (nargs > 0)
pfree(nulls);
@@ -315,9 +336,11 @@ PLy_spi_execute_plan(PyObject *ob, PyObject *list, long
limit)
}
PLy_spi_subtransaction_abort(oldcontext, oldowner);
+ PLy_Callback_Free(callback);
return NULL;
}
PG_END_TRY();
+ callback = PLy_Callback_Free(callback);
for (i = 0; i < nargs; i++)
{
@@ -343,9 +366,11 @@ PLy_spi_execute_plan(PyObject *ob, PyObject *list, long
limit)
static PyObject *
PLy_spi_execute_query(char *query, long limit)
{
- int rv;
+ PLyExecutionContext *exec_ctx = PLy_current_execution_context();
+ CallbackState *callback = PLy_Callback_New(exec_ctx);
volatile MemoryContext oldcontext;
volatile ResourceOwner oldowner;
+ int rv;
PyObject *ret = NULL;
oldcontext = CurrentMemoryContext;
@@ -355,20 +380,22 @@ PLy_spi_execute_query(char *query, long limit)
PG_TRY();
{
- PLyExecutionContext *exec_ctx = PLy_current_execution_context();
-
pg_verifymbstr(query, strlen(query), false);
- rv = SPI_execute(query, exec_ctx->curr_proc->fn_readonly,
limit);
- ret = PLy_spi_execute_fetch_result(SPI_tuptable, SPI_processed,
rv);
+ rv = SPI_execute_callback(query,
exec_ctx->curr_proc->fn_readonly, limit,
+ (DestReceiver *) callback);
+
+ ret = PLy_spi_execute_fetch_result(callback, SPI_processed, rv);
PLy_spi_subtransaction_commit(oldcontext, oldowner);
}
PG_CATCH();
{
PLy_spi_subtransaction_abort(oldcontext, oldowner);
+ PLy_Callback_Free(callback);
return NULL;
}
PG_END_TRY();
+ callback = PLy_Callback_Free(callback);
if (rv < 0)
{
@@ -382,94 +409,218 @@ PLy_spi_execute_query(char *query, long limit)
return ret;
}
-static PyObject *
-PLy_spi_execute_fetch_result(SPITupleTable *tuptable, uint64 rows, int status)
+static CallbackState *
+PLy_Callback_New(PLyExecutionContext *exec_ctx)
+{
+ MemoryContext oldcontext, cb_ctx;
+ CallbackState *callback;
+
+ /* XXX does this really need palloc0? */
+ callback = palloc0(sizeof(CallbackState));
+
+ /*
+ * Use a new context to make cleanup easier. Allocate it in the current
+ * context so we don't have to worry about cleaning it up if there's an
+ * error.
+ */
+ cb_ctx = AllocSetContextCreate(CurrentMemoryContext,
+ "PL/Python
callback context",
+
ALLOCSET_DEFAULT_SIZES);
+
+ oldcontext = MemoryContextSwitchTo(cb_ctx);
+ callback->parent_ctx = oldcontext;
+ callback->cb_ctx = cb_ctx;
+ memcpy(&(callback->pub), CreateDestReceiver(DestSPICallback),
sizeof(DestReceiver));
+ callback->pub.receiveSlot = PLy_CSreceive;
+ callback->pub.rStartup = PLy_CSStartup;
+ callback->pub.rDestroy = PLy_CSDestroy;
+ callback->exec_ctx = exec_ctx;
+
+ MemoryContextSwitchTo(oldcontext);
+
+ return callback;
+}
+
+static CallbackState *
+PLy_Callback_Free(CallbackState *callback)
+{
+ if (callback)
+ {
+ if (callback->cb_ctx)
+ (callback->pub.rDestroy) ((DestReceiver *) callback);
+
+ pfree(callback);
+ }
+
+ return (CallbackState *) NULL;
+}
+
+static PLyResultObject *
+PLyCSNewResult(CallbackState *myState)
{
+ MemoryContext oldctx;
+
+ /* The result info needs to be in the parent context */
+ oldctx = MemoryContextSwitchTo(myState->parent_ctx);
+ myState->result = PLy_result_new();
+ if (myState->result == NULL)
+ PLy_elog(ERROR, "could not create new result object");
+
+ MemoryContextSwitchTo(oldctx);
+ return (PLyResultObject *) myState->result;
+}
+
+void
+PLy_CSStartup(DestReceiver *self, int operation, TupleDesc typeinfo)
+{
+ PLyExecutionContext *old_exec_ctx;
+ CallbackState *myState = (CallbackState *) self;
PLyResultObject *result;
- volatile MemoryContext oldcontext;
+ PLyTypeInfo *args;
+ MemoryContext mctx, old_mctx;
+
+ /*
+ * We may be in a different execution context when we're called, so
switch
+ * back to our original one.
+ */
+ mctx = myState->cb_ctx;
+ old_exec_ctx = PLy_switch_execution_context(myState->exec_ctx);
+ old_mctx = MemoryContextSwitchTo(mctx);
+
+ /*
+ * We need to store this because the TupleDesc that the receive function
+ * gets has no names.
+ */
+ myState->desc = typeinfo;
+
+ /* Setup type conversion info */
+ myState->args = args = palloc0(sizeof(PLyTypeInfo));
+ PLy_typeinfo_init(args, mctx);
+ PLy_input_tuple_funcs(args, typeinfo);
+
+ /* result is actually myState.result */
+ result = PLyCSNewResult(myState);
+
+ /*
+ * Save tuple descriptor for later use by result set metadata
+ * functions. Save it in TopMemoryContext so that it survives outside
of
+ * an SPI context. We trust that PLy_result_dealloc() will clean it up
+ * when the time is right. The difference between result and everything
+ * else is that result needs to survive after the portal is destroyed,
+ * because result is what's handed back to the plpython function. While
+ * it's tempting to use something other than TopMemoryContext, that
won't
+ * work: the user could potentially put result into the global
dictionary,
+ * which means it could survive as long as the session does. This might
+ * result in a leak if an error happens and the result doesn't get
+ * dereferenced, but if that happens it means the python GC has failed
us,
+ * at which point we probably have bigger problems.
+ *
+ * This still isn't perfect though; if something the result tupledesc
+ * references has it's OID changed then the tupledesc will be invalid.
I'm
+ * not sure it's worth worrying about that though.
+ */
+ MemoryContextSwitchTo(TopMemoryContext);
+ result->tupdesc = CreateTupleDescCopy(typeinfo);
+
+ MemoryContextSwitchTo(old_mctx);
+ PLy_switch_execution_context(old_exec_ctx);
+}
+
+void
+PLy_CSDestroy(DestReceiver *self)
+{
+ CallbackState *myState = (CallbackState *) self;
+ MemoryContext cb_ctx = myState->cb_ctx;
- result = (PLyResultObject *) PLy_result_new();
- Py_DECREF(result->status);
- result->status = PyInt_FromLong(status);
+ MemoryContextDelete(cb_ctx);
+ myState->cb_ctx = 0;
+}
- if (status > 0 && tuptable == NULL)
+static bool
+PLy_CSreceive(TupleTableSlot *slot, DestReceiver *self)
+{
+ CallbackState *myState = (CallbackState *) self;
+ TupleDesc desc = myState->desc;
+ PLyTypeInfo *args = myState->args;
+ PLyResultObject *result = (PLyResultObject *) myState->result;
+ PLyExecutionContext *old_exec_ctx =
PLy_switch_execution_context(myState->exec_ctx);
+ MemoryContext scratch_context =
PLy_get_scratch_context(myState->exec_ctx);
+ MemoryContext oldcontext = CurrentMemoryContext;
+ int rv = 1;
+ PyObject *row;
+
+ /* Verify saved state matches incoming slot */
+ Assert(desc->tdtypeid == slot->tts_tupleDescriptor->tdtypeid);
+ Assert(args->in.r.natts == slot->tts_tupleDescriptor->natts);
+
+ /* Make sure the tuple is fully deconstructed */
+ slot_getallattrs(slot);
+
+ /*
+ * Do the work in the scratch context to avoid leaking memory from the
+ * datatype output function calls.
+ */
+ MemoryContextSwitchTo(scratch_context);
+
+ PG_TRY();
{
- Py_DECREF(result->nrows);
- result->nrows = (rows > (uint64) LONG_MAX) ?
- PyFloat_FromDouble((double) rows) :
- PyInt_FromLong((long) rows);
+ row = PLyDict_FromTuple(args, ExecFetchSlotTuple(slot), desc);
}
- else if (status > 0 && tuptable != NULL)
+ PG_CATCH();
{
- PLyTypeInfo args;
- MemoryContext cxt;
+ Py_XDECREF(row);
+ MemoryContextSwitchTo(oldcontext);
+ PLy_switch_execution_context(old_exec_ctx);
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
- Py_DECREF(result->nrows);
- result->nrows = (rows > (uint64) LONG_MAX) ?
- PyFloat_FromDouble((double) rows) :
- PyInt_FromLong((long) rows);
+ /*
+ * If we tried to do this in the PG_CATCH we'd have to mark row
+ * as volatile, but that won't work with PyList_Append, so just
+ * test the error code after doing Py_DECREF().
+ */
+ if (row)
+ {
+ rv = PyList_Append(result->rows, row);
+ Py_DECREF(row);
+ }
- cxt = AllocSetContextCreate(CurrentMemoryContext,
-
"PL/Python temp context",
-
ALLOCSET_DEFAULT_SIZES);
- PLy_typeinfo_init(&args, cxt);
+ if (rv != 0)
+ ereport(ERROR,
+ (errmsg("unable to append value to list")));
- oldcontext = CurrentMemoryContext;
- PG_TRY();
- {
- MemoryContext oldcontext2;
+ MemoryContextSwitchTo(oldcontext);
+ MemoryContextReset(scratch_context);
+ PLy_switch_execution_context(old_exec_ctx);
- if (rows)
- {
- uint64 i;
-
- /*
- * PyList_New() and PyList_SetItem() use
Py_ssize_t for list
- * size and list indices; so we cannot support
a result larger
- * than PY_SSIZE_T_MAX.
- */
- if (rows > (uint64) PY_SSIZE_T_MAX)
- ereport(ERROR,
-
(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
- errmsg("query result
has too many rows to fit in a Python list")));
-
- Py_DECREF(result->rows);
- result->rows = PyList_New(rows);
-
- PLy_input_tuple_funcs(&args, tuptable->tupdesc);
- for (i = 0; i < rows; i++)
- {
- PyObject *row =
PLyDict_FromTuple(&args,
-
tuptable->vals[i],
-
tuptable->tupdesc);
+ return true;
+}
- PyList_SetItem(result->rows, i, row);
- }
- }
+static PyObject *
+PLy_spi_execute_fetch_result(CallbackState *callback, uint64 rows, int status)
+{
+ PLyResultObject *result = (PLyResultObject *) callback->result;
+
+ /* If status < 0 this stuff would just get thrown away anyway. */
+ if (status > 0)
+ {
+ if (!result)
+ {
/*
- * Save tuple descriptor for later use by result set
metadata
- * functions. Save it in TopMemoryContext so that it
survives
- * outside of an SPI context. We trust that
PLy_result_dealloc()
- * will clean it up when the time is right. (Do this
as late as
- * possible, to minimize the number of ways the tupdesc
could get
- * leaked due to errors.)
+ * This happens if the command returned no results.
Create a dummy result set.
*/
- oldcontext2 = MemoryContextSwitchTo(TopMemoryContext);
- result->tupdesc =
CreateTupleDescCopy(tuptable->tupdesc);
- MemoryContextSwitchTo(oldcontext2);
+ result = PLyCSNewResult(callback);
+ callback->result = (PyObject *) result;
}
- PG_CATCH();
- {
- MemoryContextSwitchTo(oldcontext);
- MemoryContextDelete(cxt);
- Py_DECREF(result);
- PG_RE_THROW();
- }
- PG_END_TRY();
- MemoryContextDelete(cxt);
- SPI_freetuptable(tuptable);
+ Py_DECREF(result->status);
+ result->status = PyInt_FromLong(status);
+ Py_DECREF(result->nrows);
+ result->nrows = (rows > (uint64) LONG_MAX) ?
+ PyFloat_FromDouble((double) rows) :
+ PyInt_FromLong((long) rows);
}
return (PyObject *) result;
--
2.11.1
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers