On 4/4/17 7:44 PM, Craig Ringer wrote:
On 5 April 2017 at 08:23, Craig Ringer <cr...@2ndquadrant.com> wrote:
On 5 April 2017 at 08:00, Craig Ringer <cr...@2ndquadrant.com> wrote:
This patch fails to update the documentation at all.

https://www.postgresql.org/docs/devel/static/spi.html

I'll fix that soon.

missing newline

Fixed.

+/* Execute a previously prepared plan with a callback Destination */


caps "Destination"

Hmm, I capitalized it since DestReceiver is capitalized. I guess it's best to just drop Destination.

+                // XXX throw error if callback is set

Fixed (opted to just use an Assert).

+static DestReceiver spi_callbackDR = {
+    donothingReceive, donothingStartup, donothingCleanup, donothingCleanup,
+    DestSPICallback
+};
Presumably that's a default destination you're then supposed to modify
with your own callbacks? There aren't any comments to indicate what's
going on here.

Correct. Added the following comment:

/*
 * This is strictly a starting point for creating a callback. It should not
 * actually be used.
 */



Comments on patch 2:

If this is the "bare minimum" patch, what is pending? Does it impose
any downsides or limits?

No limits. I'm not aware of any downsides.

It's "bare minimum" because a follow-on step is to allow different methods of returning results. In particular, my testing indicates that patch 1 + returning a dict of lists (as opposed to the current list of dicts) results in a 460% improvement vs ~30% with patch 2.

+/* Get switch execution contexts */
+extern PLyExecutionContext
*PLy_switch_execution_context(PLyExecutionContext *new);

Comment makes no sense to me. This seems something like memory context
switch, where you supply the new and return the old. But the comment
doesn't explain it at all.

Comment changed to
/* Switch execution context (similar to MemoryContextSwitchTo */

+void PLy_CSStartup(DestReceiver *self, int operation, TupleDesc typeinfo);
+void PLy_CSDestroy(DestReceiver *self);

These are declared in the plpy_spi.c. Why aren't these static?

Derp. Fixed.

+    volatile MemoryContext oldcontext;
+    volatile ResourceOwner oldowner;
     int         rv;
-    volatile MemoryContext oldcontext;
-    volatile ResourceOwner oldowner;

Unnecessary code movement.

IMHO it reads better that way. I've left it for now so $COMMITTER can decide, but if it really bugs you let me know and I'll swap it.

In PLy_Callback_New, I think your use of a sub-context is sensible. Is
it necessary to palloc0 though?

Hrm, maybe not... but it seems like cheap insurance considering the amount of other stuff involved in just starting a new SPI call. And honestly, I'd rather not mess with it at this point. :) I have added an XXX comment though.

+static CallbackState *
+PLy_Callback_Free(CallbackState *callback)

The code here looks like it could be part of spi.c's callback support,
rather than plpy_spi specific, since you provide a destroy callback in
the SPI callback struct.

I added this for use in PG_CATCH() blocks, because it's not clear to me that the portal gets cleaned up in those cases. It's certainly possible that it's pointless.

FWIW, I'm pretty sure I copied that pattern from somewhere else... probably plpgsql or pltcl.

+    /* We need to store this because the TupleDesc the receive
function gets has no names. */
+    myState->desc = typeinfo;

Is it safe to just store the pointer to the TupleDesc here? What's the lifetime?

Only Portal lifetime.

+     * will clean it up when the time is right. XXX This might result in a leak
+     * if an error happens and the result doesn't get dereferenced.
+     */
+    MemoryContextSwitchTo(TopMemoryContext);
+    result->tupdesc = CreateTupleDescCopy(typeinfo);

especially given this XXX comment...

I've changed the comment to the following. Hopefully this clarifies things:

        /*
         * Save tuple descriptor for later use by result set metadata
         * functions.  Save it in TopMemoryContext so that it survives outside 
of
         * an SPI context.  We trust that PLy_result_dealloc() will clean it up
         * when the time is right. The difference between result and everything
         * else is that result needs to survive after the portal is destroyed,
         * because result is what's handed back to the plpython function. While
         * it's tempting to use something other than TopMemoryContext, that 
won't
         * work: the user could potentially put result into the global 
dictionary,
         * which means it could survive as long as the session does.  This might
         * result in a leak if an error happens and the result doesn't get
         * dereferenced, but if that happens it means the python GC has failed 
us,
         * at which point we probably have bigger problems.
         *
         * This still isn't perfect though; if something the result tupledesc
         * references has it's OID changed then the tupledesc will be invalid. 
I'm
         * not sure it's worth worrying about that though.
         */

Updated patches attached, but I still need to update the docs.
--
Jim Nasby, Chief Data Architect, Austin TX
OpenSCG                 http://OpenSCG.com
From 0a2ef661f55a047763a43b0eebd7483760e4a427 Mon Sep 17 00:00:00 2001
From: Jim Nasby <jim.na...@bluetreble.com>
Date: Wed, 5 Apr 2017 20:52:39 -0500
Subject: [PATCH 1/2] Add SPI_execute_callback

Instead of placing results in a tuplestore, this method of execution
uses the supplied callback when creating the Portal for a query.
---
 src/backend/executor/spi.c | 80 ++++++++++++++++++++++++++++++++++++++++------
 src/backend/tcop/dest.c    | 15 +++++++++
 src/include/executor/spi.h |  4 +++
 src/include/tcop/dest.h    |  1 +
 4 files changed, 90 insertions(+), 10 deletions(-)

diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c
index ca547dc6d9..4f6c3011f9 100644
--- a/src/backend/executor/spi.c
+++ b/src/backend/executor/spi.c
@@ -55,7 +55,8 @@ static void _SPI_prepare_oneshot_plan(const char *src, 
SPIPlanPtr plan);
 
 static int _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
                                  Snapshot snapshot, Snapshot 
crosscheck_snapshot,
-                                 bool read_only, bool fire_triggers, uint64 
tcount);
+                                 bool read_only, bool fire_triggers, uint64 
tcount,
+                                 DestReceiver *callback);
 
 static ParamListInfo _SPI_convert_params(int nargs, Oid *argtypes,
                                        Datum *Values, const char *Nulls);
@@ -321,7 +322,35 @@ SPI_execute(const char *src, bool read_only, long tcount)
 
        res = _SPI_execute_plan(&plan, NULL,
                                                        InvalidSnapshot, 
InvalidSnapshot,
-                                                       read_only, true, 
tcount);
+                                                       read_only, true, 
tcount, NULL);
+
+       _SPI_end_call(true);
+       return res;
+}
+
+int
+SPI_execute_callback(const char *src, bool read_only, long tcount,
+               DestReceiver *callback)
+{
+       _SPI_plan       plan;
+       int                     res;
+
+       if (src == NULL || tcount < 0)
+               return SPI_ERROR_ARGUMENT;
+
+       res = _SPI_begin_call(true);
+       if (res < 0)
+               return res;
+
+       memset(&plan, 0, sizeof(_SPI_plan));
+       plan.magic = _SPI_PLAN_MAGIC;
+       plan.cursor_options = 0;
+
+       _SPI_prepare_oneshot_plan(src, &plan);
+
+       res = _SPI_execute_plan(&plan, NULL,
+                                                       InvalidSnapshot, 
InvalidSnapshot,
+                                                       read_only, true, 
tcount, callback);
 
        _SPI_end_call(true);
        return res;
@@ -355,7 +384,34 @@ SPI_execute_plan(SPIPlanPtr plan, Datum *Values, const 
char *Nulls,
                                                        
_SPI_convert_params(plan->nargs, plan->argtypes,
                                                                                
                Values, Nulls),
                                                        InvalidSnapshot, 
InvalidSnapshot,
-                                                       read_only, true, 
tcount);
+                                                       read_only, true, 
tcount, NULL);
+
+       _SPI_end_call(true);
+       return res;
+}
+
+/* Execute a previously prepared plan with a callback */
+int
+SPI_execute_plan_callback(SPIPlanPtr plan, Datum *Values, const char *Nulls,
+                                bool read_only, long tcount, DestReceiver 
*callback)
+{
+       int                     res;
+
+       if (plan == NULL || plan->magic != _SPI_PLAN_MAGIC || tcount < 0)
+               return SPI_ERROR_ARGUMENT;
+
+       if (plan->nargs > 0 && Values == NULL)
+               return SPI_ERROR_PARAM;
+
+       res = _SPI_begin_call(true);
+       if (res < 0)
+               return res;
+
+       res = _SPI_execute_plan(plan,
+                                                       
_SPI_convert_params(plan->nargs, plan->argtypes,
+                                                                               
                Values, Nulls),
+                                                       InvalidSnapshot, 
InvalidSnapshot,
+                                                       read_only, true, 
tcount, callback);
 
        _SPI_end_call(true);
        return res;
@@ -384,7 +440,7 @@ SPI_execute_plan_with_paramlist(SPIPlanPtr plan, 
ParamListInfo params,
 
        res = _SPI_execute_plan(plan, params,
                                                        InvalidSnapshot, 
InvalidSnapshot,
-                                                       read_only, true, 
tcount);
+                                                       read_only, true, 
tcount, NULL);
 
        _SPI_end_call(true);
        return res;
@@ -425,7 +481,7 @@ SPI_execute_snapshot(SPIPlanPtr plan,
                                                        
_SPI_convert_params(plan->nargs, plan->argtypes,
                                                                                
                Values, Nulls),
                                                        snapshot, 
crosscheck_snapshot,
-                                                       read_only, 
fire_triggers, tcount);
+                                                       read_only, 
fire_triggers, tcount, NULL);
 
        _SPI_end_call(true);
        return res;
@@ -472,7 +528,7 @@ SPI_execute_with_args(const char *src,
 
        res = _SPI_execute_plan(&plan, paramLI,
                                                        InvalidSnapshot, 
InvalidSnapshot,
-                                                       read_only, true, 
tcount);
+                                                       read_only, true, 
tcount, NULL);
 
        _SPI_end_call(true);
        return res;
@@ -1907,7 +1963,8 @@ _SPI_prepare_oneshot_plan(const char *src, SPIPlanPtr 
plan)
 static int
 _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
                                  Snapshot snapshot, Snapshot 
crosscheck_snapshot,
-                                 bool read_only, bool fire_triggers, uint64 
tcount)
+                                 bool read_only, bool fire_triggers, uint64 
tcount,
+                                 DestReceiver *callback)
 {
        int                     my_res = 0;
        uint64          my_processed = 0;
@@ -1918,6 +1975,7 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
        ErrorContextCallback spierrcontext;
        CachedPlan *cplan = NULL;
        ListCell   *lc1;
+       DestReceiver *dest = callback;
 
        /*
         * Setup error traceback support for ereport()
@@ -2037,7 +2095,6 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
                {
                        PlannedStmt *stmt = castNode(PlannedStmt, lfirst(lc2));
                        bool            canSetTag = stmt->canSetTag;
-                       DestReceiver *dest;
 
                        _SPI_current->processed = 0;
                        _SPI_current->lastoid = InvalidOid;
@@ -2082,7 +2139,8 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
                                UpdateActiveSnapshotCommandId();
                        }
 
-                       dest = CreateDestReceiver(canSetTag ? DestSPI : 
DestNone);
+                       if (!callback)
+                               dest = CreateDestReceiver(canSetTag ? DestSPI : 
DestNone);
 
                        if (stmt->utilityStmt == NULL)
                        {
@@ -2108,6 +2166,7 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
                        {
                                char            
completionTag[COMPLETION_TAG_BUFSIZE];
 
+                               Assert(!callback);
                                ProcessUtility(stmt,
                                                           
plansource->query_string,
                                                           
PROCESS_UTILITY_QUERY,
@@ -2281,7 +2340,8 @@ _SPI_pquery(QueryDesc *queryDesc, bool fire_triggers, 
uint64 tcount)
        switch (operation)
        {
                case CMD_SELECT:
-                       if (queryDesc->dest->mydest != DestSPI)
+                       if (queryDesc->dest->mydest != DestSPI &&
+                                       queryDesc->dest->mydest != 
DestSPICallback)
                        {
                                /* Don't return SPI_OK_SELECT if we're 
discarding result */
                                res = SPI_OK_UTILITY;
diff --git a/src/backend/tcop/dest.c b/src/backend/tcop/dest.c
index 28081c3765..f68b6e1b51 100644
--- a/src/backend/tcop/dest.c
+++ b/src/backend/tcop/dest.c
@@ -87,6 +87,15 @@ static DestReceiver spi_printtupDR = {
        DestSPI
 };
 
+/*
+ * This is strictly a starting point for creating a callback. It should not
+ * actually be used.
+ */
+static DestReceiver spi_callbackDR = {
+       donothingReceive, donothingStartup, donothingCleanup, donothingCleanup,
+       DestSPICallback
+};
+
 /* Globally available receiver for DestNone */
 DestReceiver *None_Receiver = &donothingDR;
 
@@ -126,6 +135,9 @@ CreateDestReceiver(CommandDest dest)
                case DestSPI:
                        return &spi_printtupDR;
 
+               case DestSPICallback:
+                       return &spi_callbackDR;
+
                case DestTuplestore:
                        return CreateTuplestoreDestReceiver();
 
@@ -172,6 +184,7 @@ EndCommand(const char *commandTag, CommandDest dest)
                case DestNone:
                case DestDebug:
                case DestSPI:
+               case DestSPICallback:
                case DestTuplestore:
                case DestIntoRel:
                case DestCopyOut:
@@ -216,6 +229,7 @@ NullCommand(CommandDest dest)
                case DestNone:
                case DestDebug:
                case DestSPI:
+               case DestSPICallback:
                case DestTuplestore:
                case DestIntoRel:
                case DestCopyOut:
@@ -262,6 +276,7 @@ ReadyForQuery(CommandDest dest)
                case DestNone:
                case DestDebug:
                case DestSPI:
+               case DestSPICallback:
                case DestTuplestore:
                case DestIntoRel:
                case DestCopyOut:
diff --git a/src/include/executor/spi.h b/src/include/executor/spi.h
index 94a805d477..13719e1df5 100644
--- a/src/include/executor/spi.h
+++ b/src/include/executor/spi.h
@@ -80,11 +80,15 @@ extern PGDLLIMPORT int SPI_result;
 extern int     SPI_connect(void);
 extern int     SPI_finish(void);
 extern int     SPI_execute(const char *src, bool read_only, long tcount);
+extern int     SPI_execute_callback(const char *src, bool read_only, long 
tcount,
+                                                                       
DestReceiver *callback);
 extern int SPI_execute_plan(SPIPlanPtr plan, Datum *Values, const char *Nulls,
                                 bool read_only, long tcount);
 extern int SPI_execute_plan_with_paramlist(SPIPlanPtr plan,
                                                                ParamListInfo 
params,
                                                                bool read_only, 
long tcount);
+extern int SPI_execute_plan_callback(SPIPlanPtr plan, Datum *Values, const 
char *Nulls,
+                                bool read_only, long tcount, DestReceiver 
*callback);
 extern int     SPI_exec(const char *src, long tcount);
 extern int SPI_execp(SPIPlanPtr plan, Datum *Values, const char *Nulls,
                  long tcount);
diff --git a/src/include/tcop/dest.h b/src/include/tcop/dest.h
index c459af2e13..1d1d641ae0 100644
--- a/src/include/tcop/dest.h
+++ b/src/include/tcop/dest.h
@@ -91,6 +91,7 @@ typedef enum
        DestRemoteExecute,                      /* sent to frontend, in Execute 
command */
        DestRemoteSimple,                       /* sent to frontend, w/no 
catalog access */
        DestSPI,                                        /* results sent to SPI 
manager */
+       DestSPICallback,                        /* results sent to 
user-specified callback function */
        DestTuplestore,                         /* results sent to Tuplestore */
        DestIntoRel,                            /* results sent to relation 
(SELECT INTO) */
        DestCopyOut,                            /* results sent to COPY TO code 
*/
-- 
2.11.1

From d693fa42135e5f773cc8affcd26eba4d2ef22f2b Mon Sep 17 00:00:00 2001
From: Jim Nasby <jim.na...@bluetreble.com>
Date: Wed, 5 Apr 2017 21:30:39 -0500
Subject: [PATCH 2/2] Switch plpython to using SPI_execute_callback

This is a bare minimum patch to switch plpython to using SPI callbacks
in lieu of a tuplestore. Simple testing shows a ~27% speedup with a
simple generate_series(1,10000000).
---
 src/pl/plpython/plpy_main.c |  13 ++
 src/pl/plpython/plpy_main.h |   3 +
 src/pl/plpython/plpy_spi.c  | 313 ++++++++++++++++++++++++++++++++------------
 3 files changed, 248 insertions(+), 81 deletions(-)

diff --git a/src/pl/plpython/plpy_main.c b/src/pl/plpython/plpy_main.c
index 860b804e54..07501f18f2 100644
--- a/src/pl/plpython/plpy_main.c
+++ b/src/pl/plpython/plpy_main.c
@@ -403,6 +403,19 @@ PLy_current_execution_context(void)
        return PLy_execution_contexts;
 }
 
+PLyExecutionContext *
+PLy_switch_execution_context(PLyExecutionContext *new)
+{
+       PLyExecutionContext *last = PLy_execution_contexts;
+
+       if (PLy_execution_contexts == NULL)
+               elog(ERROR, "no Python function is currently executing");
+
+       PLy_execution_contexts = new;
+
+       return last;
+}
+
 MemoryContext
 PLy_get_scratch_context(PLyExecutionContext *context)
 {
diff --git a/src/pl/plpython/plpy_main.h b/src/pl/plpython/plpy_main.h
index 10426c4323..fe30dbc14b 100644
--- a/src/pl/plpython/plpy_main.h
+++ b/src/pl/plpython/plpy_main.h
@@ -25,6 +25,9 @@ typedef struct PLyExecutionContext
 /* Get the current execution context */
 extern PLyExecutionContext *PLy_current_execution_context(void);
 
+/* Switch execution context (similar to MemoryContextSwitchTo */
+extern PLyExecutionContext *PLy_switch_execution_context(PLyExecutionContext 
*new);
+
 /* Get the scratch memory context for specified execution context */
 extern MemoryContext PLy_get_scratch_context(PLyExecutionContext *context);
 
diff --git a/src/pl/plpython/plpy_spi.c b/src/pl/plpython/plpy_spi.c
index c6856ccbac..236cc6d998 100644
--- a/src/pl/plpython/plpy_spi.c
+++ b/src/pl/plpython/plpy_spi.c
@@ -28,9 +28,27 @@
 #include "plpy_procedure.h"
 #include "plpy_resultobject.h"
 
+typedef struct
+{
+       DestReceiver pub;
+       PLyExecutionContext *exec_ctx;
+       MemoryContext parent_ctx;
+       MemoryContext cb_ctx;
+       TupleDesc       desc;
+       PLyTypeInfo *args;
+
+       PyObject        *result;
+} CallbackState;
+
+static void PLy_CSStartup(DestReceiver *self, int operation, TupleDesc 
typeinfo);
+static void PLy_CSDestroy(DestReceiver *self);
+static bool PLy_CSreceive(TupleTableSlot *slot, DestReceiver *self);
+static CallbackState *PLy_Callback_New(PLyExecutionContext *exec_ctx);
+static CallbackState *PLy_Callback_Free(CallbackState *callback);
+static PLyResultObject *PLyCSNewResult(CallbackState *myState);
 
 static PyObject *PLy_spi_execute_query(char *query, long limit);
-static PyObject *PLy_spi_execute_fetch_result(SPITupleTable *tuptable,
+static PyObject *PLy_spi_execute_fetch_result(CallbackState *callback,
                                                         uint64 rows, int 
status);
 static void PLy_spi_exception_set(PyObject *excclass, ErrorData *edata);
 
@@ -195,6 +213,8 @@ PLy_spi_execute(PyObject *self, PyObject *args)
 PyObject *
 PLy_spi_execute_plan(PyObject *ob, PyObject *list, long limit)
 {
+       PLyExecutionContext *exec_ctx = PLy_current_execution_context();
+       CallbackState   *callback;
        volatile int nargs;
        int                     i,
                                rv;
@@ -237,12 +257,12 @@ PLy_spi_execute_plan(PyObject *ob, PyObject *list, long 
limit)
 
        oldcontext = CurrentMemoryContext;
        oldowner = CurrentResourceOwner;
+       callback = PLy_Callback_New(exec_ctx);
 
        PLy_spi_subtransaction_begin(oldcontext, oldowner);
 
        PG_TRY();
        {
-               PLyExecutionContext *exec_ctx = PLy_current_execution_context();
                char       *volatile nulls;
                volatile int j;
 
@@ -288,9 +308,10 @@ PLy_spi_execute_plan(PyObject *ob, PyObject *list, long 
limit)
                        }
                }
 
-               rv = SPI_execute_plan(plan->plan, plan->values, nulls,
-                                                         
exec_ctx->curr_proc->fn_readonly, limit);
-               ret = PLy_spi_execute_fetch_result(SPI_tuptable, SPI_processed, 
rv);
+               rv = SPI_execute_plan_callback(plan->plan, plan->values, nulls,
+                                                       
exec_ctx->curr_proc->fn_readonly, limit,
+                                                       (DestReceiver *) 
callback);
+               ret = PLy_spi_execute_fetch_result(callback, SPI_processed, rv);
 
                if (nargs > 0)
                        pfree(nulls);
@@ -315,9 +336,11 @@ PLy_spi_execute_plan(PyObject *ob, PyObject *list, long 
limit)
                }
 
                PLy_spi_subtransaction_abort(oldcontext, oldowner);
+               PLy_Callback_Free(callback);
                return NULL;
        }
        PG_END_TRY();
+       callback = PLy_Callback_Free(callback);
 
        for (i = 0; i < nargs; i++)
        {
@@ -343,9 +366,11 @@ PLy_spi_execute_plan(PyObject *ob, PyObject *list, long 
limit)
 static PyObject *
 PLy_spi_execute_query(char *query, long limit)
 {
-       int                     rv;
+       PLyExecutionContext *exec_ctx = PLy_current_execution_context();
+       CallbackState   *callback = PLy_Callback_New(exec_ctx);
        volatile MemoryContext oldcontext;
        volatile ResourceOwner oldowner;
+       int                     rv;
        PyObject   *ret = NULL;
 
        oldcontext = CurrentMemoryContext;
@@ -355,20 +380,22 @@ PLy_spi_execute_query(char *query, long limit)
 
        PG_TRY();
        {
-               PLyExecutionContext *exec_ctx = PLy_current_execution_context();
-
                pg_verifymbstr(query, strlen(query), false);
-               rv = SPI_execute(query, exec_ctx->curr_proc->fn_readonly, 
limit);
-               ret = PLy_spi_execute_fetch_result(SPI_tuptable, SPI_processed, 
rv);
+               rv = SPI_execute_callback(query, 
exec_ctx->curr_proc->fn_readonly, limit,
+                               (DestReceiver *) callback);
+
+               ret = PLy_spi_execute_fetch_result(callback, SPI_processed, rv);
 
                PLy_spi_subtransaction_commit(oldcontext, oldowner);
        }
        PG_CATCH();
        {
                PLy_spi_subtransaction_abort(oldcontext, oldowner);
+               PLy_Callback_Free(callback);
                return NULL;
        }
        PG_END_TRY();
+       callback = PLy_Callback_Free(callback);
 
        if (rv < 0)
        {
@@ -382,94 +409,218 @@ PLy_spi_execute_query(char *query, long limit)
        return ret;
 }
 
-static PyObject *
-PLy_spi_execute_fetch_result(SPITupleTable *tuptable, uint64 rows, int status)
+static CallbackState *
+PLy_Callback_New(PLyExecutionContext *exec_ctx)
+{
+       MemoryContext oldcontext, cb_ctx;
+       CallbackState *callback;
+
+       /* XXX does this really need palloc0? */
+       callback = palloc0(sizeof(CallbackState));
+
+       /*
+        * Use a new context to make cleanup easier. Allocate it in the current
+        * context so we don't have to worry about cleaning it up if there's an
+        * error.
+        */
+       cb_ctx = AllocSetContextCreate(CurrentMemoryContext,
+                                                               "PL/Python 
callback context",
+                                                               
ALLOCSET_DEFAULT_SIZES);
+
+       oldcontext = MemoryContextSwitchTo(cb_ctx);
+       callback->parent_ctx = oldcontext;
+       callback->cb_ctx = cb_ctx;
+       memcpy(&(callback->pub), CreateDestReceiver(DestSPICallback), 
sizeof(DestReceiver));
+       callback->pub.receiveSlot = PLy_CSreceive;
+       callback->pub.rStartup = PLy_CSStartup;
+       callback->pub.rDestroy = PLy_CSDestroy;
+       callback->exec_ctx = exec_ctx;
+
+       MemoryContextSwitchTo(oldcontext);
+
+       return callback;
+}
+
+static CallbackState *
+PLy_Callback_Free(CallbackState *callback)
+{
+       if (callback)
+       {
+               if (callback->cb_ctx)
+                       (callback->pub.rDestroy) ((DestReceiver *) callback);
+
+               pfree(callback);
+       }
+
+       return (CallbackState *) NULL;
+}
+
+static PLyResultObject *
+PLyCSNewResult(CallbackState *myState)
 {
+       MemoryContext oldctx;
+
+       /* The result info needs to be in the parent context */
+       oldctx = MemoryContextSwitchTo(myState->parent_ctx);
+       myState->result = PLy_result_new();
+       if (myState->result == NULL)
+               PLy_elog(ERROR, "could not create new result object");
+
+       MemoryContextSwitchTo(oldctx);
+       return (PLyResultObject *) myState->result;
+}
+
+void
+PLy_CSStartup(DestReceiver *self, int operation, TupleDesc typeinfo)
+{
+       PLyExecutionContext *old_exec_ctx;
+       CallbackState *myState = (CallbackState *) self;
        PLyResultObject *result;
-       volatile MemoryContext oldcontext;
+       PLyTypeInfo *args;
+       MemoryContext mctx, old_mctx;
+
+       /*
+        * We may be in a different execution context when we're called, so 
switch
+        * back to our original one.
+        */
+       mctx = myState->cb_ctx;
+       old_exec_ctx = PLy_switch_execution_context(myState->exec_ctx);
+       old_mctx = MemoryContextSwitchTo(mctx);
+
+       /*
+        * We need to store this because the TupleDesc that the receive function
+        * gets has no names.
+        */
+       myState->desc = typeinfo;
+
+       /* Setup type conversion info */
+       myState->args = args = palloc0(sizeof(PLyTypeInfo));
+       PLy_typeinfo_init(args, mctx);
+       PLy_input_tuple_funcs(args, typeinfo);
+
+       /* result is actually myState.result */
+       result = PLyCSNewResult(myState);
+
+       /*
+        * Save tuple descriptor for later use by result set metadata
+        * functions.  Save it in TopMemoryContext so that it survives outside 
of
+        * an SPI context.  We trust that PLy_result_dealloc() will clean it up
+        * when the time is right. The difference between result and everything
+        * else is that result needs to survive after the portal is destroyed,
+        * because result is what's handed back to the plpython function. While
+        * it's tempting to use something other than TopMemoryContext, that 
won't
+        * work: the user could potentially put result into the global 
dictionary,
+        * which means it could survive as long as the session does.  This might
+        * result in a leak if an error happens and the result doesn't get
+        * dereferenced, but if that happens it means the python GC has failed 
us,
+        * at which point we probably have bigger problems.
+        *
+        * This still isn't perfect though; if something the result tupledesc
+        * references has it's OID changed then the tupledesc will be invalid. 
I'm
+        * not sure it's worth worrying about that though.
+        */
+       MemoryContextSwitchTo(TopMemoryContext);
+       result->tupdesc = CreateTupleDescCopy(typeinfo);
+
+       MemoryContextSwitchTo(old_mctx);
+       PLy_switch_execution_context(old_exec_ctx);
+}
+
+void
+PLy_CSDestroy(DestReceiver *self)
+{
+       CallbackState *myState = (CallbackState *) self;
+       MemoryContext cb_ctx = myState->cb_ctx;
 
-       result = (PLyResultObject *) PLy_result_new();
-       Py_DECREF(result->status);
-       result->status = PyInt_FromLong(status);
+       MemoryContextDelete(cb_ctx);
+       myState->cb_ctx = 0;
+}
 
-       if (status > 0 && tuptable == NULL)
+static bool
+PLy_CSreceive(TupleTableSlot *slot, DestReceiver *self)
+{
+       CallbackState   *myState = (CallbackState *) self;
+       TupleDesc               desc = myState->desc;
+       PLyTypeInfo     *args = myState->args;
+       PLyResultObject *result = (PLyResultObject *) myState->result;
+       PLyExecutionContext *old_exec_ctx = 
PLy_switch_execution_context(myState->exec_ctx);
+       MemoryContext   scratch_context = 
PLy_get_scratch_context(myState->exec_ctx);
+       MemoryContext   oldcontext = CurrentMemoryContext;
+       int                     rv = 1;
+       PyObject   *row;
+
+       /* Verify saved state matches incoming slot */
+       Assert(desc->tdtypeid == slot->tts_tupleDescriptor->tdtypeid);
+       Assert(args->in.r.natts == slot->tts_tupleDescriptor->natts);
+
+       /* Make sure the tuple is fully deconstructed */
+       slot_getallattrs(slot);
+
+       /*
+        * Do the work in the scratch context to avoid leaking memory from the
+        * datatype output function calls.
+        */
+       MemoryContextSwitchTo(scratch_context);
+
+       PG_TRY();
        {
-               Py_DECREF(result->nrows);
-               result->nrows = (rows > (uint64) LONG_MAX) ?
-                       PyFloat_FromDouble((double) rows) :
-                       PyInt_FromLong((long) rows);
+               row = PLyDict_FromTuple(args, ExecFetchSlotTuple(slot), desc);
        }
-       else if (status > 0 && tuptable != NULL)
+       PG_CATCH();
        {
-               PLyTypeInfo args;
-               MemoryContext cxt;
+               Py_XDECREF(row);
+               MemoryContextSwitchTo(oldcontext);
+               PLy_switch_execution_context(old_exec_ctx);
+               PG_RE_THROW();
+       }
+       PG_END_TRY();
 
-               Py_DECREF(result->nrows);
-               result->nrows = (rows > (uint64) LONG_MAX) ?
-                       PyFloat_FromDouble((double) rows) :
-                       PyInt_FromLong((long) rows);
+       /*
+        * If we tried to do this in the PG_CATCH we'd have to mark row
+        * as volatile, but that won't work with PyList_Append, so just
+        * test the error code after doing Py_DECREF().
+        */
+       if (row)
+       {
+               rv = PyList_Append(result->rows, row);
+               Py_DECREF(row);
+       }
 
-               cxt = AllocSetContextCreate(CurrentMemoryContext,
-                                                                       
"PL/Python temp context",
-                                                                       
ALLOCSET_DEFAULT_SIZES);
-               PLy_typeinfo_init(&args, cxt);
+       if (rv != 0)
+               ereport(ERROR,
+                               (errmsg("unable to append value to list")));
 
-               oldcontext = CurrentMemoryContext;
-               PG_TRY();
-               {
-                       MemoryContext oldcontext2;
+       MemoryContextSwitchTo(oldcontext);
+       MemoryContextReset(scratch_context);
+       PLy_switch_execution_context(old_exec_ctx);
 
-                       if (rows)
-                       {
-                               uint64          i;
-
-                               /*
-                                * PyList_New() and PyList_SetItem() use 
Py_ssize_t for list
-                                * size and list indices; so we cannot support 
a result larger
-                                * than PY_SSIZE_T_MAX.
-                                */
-                               if (rows > (uint64) PY_SSIZE_T_MAX)
-                                       ereport(ERROR,
-                                                       
(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
-                                                        errmsg("query result 
has too many rows to fit in a Python list")));
-
-                               Py_DECREF(result->rows);
-                               result->rows = PyList_New(rows);
-
-                               PLy_input_tuple_funcs(&args, tuptable->tupdesc);
-                               for (i = 0; i < rows; i++)
-                               {
-                                       PyObject   *row = 
PLyDict_FromTuple(&args,
-                                                                               
                                tuptable->vals[i],
-                                                                               
                                tuptable->tupdesc);
+       return true;
+}
 
-                                       PyList_SetItem(result->rows, i, row);
-                               }
-                       }
 
+static PyObject *
+PLy_spi_execute_fetch_result(CallbackState *callback, uint64 rows, int status)
+{
+       PLyResultObject *result = (PLyResultObject *) callback->result;
+
+       /* If status < 0 this stuff would just get thrown away anyway. */
+       if (status > 0)
+       {
+               if (!result)
+               {
                        /*
-                        * Save tuple descriptor for later use by result set 
metadata
-                        * functions.  Save it in TopMemoryContext so that it 
survives
-                        * outside of an SPI context.  We trust that 
PLy_result_dealloc()
-                        * will clean it up when the time is right.  (Do this 
as late as
-                        * possible, to minimize the number of ways the tupdesc 
could get
-                        * leaked due to errors.)
+                        * This happens if the command returned no results. 
Create a dummy result set.
                         */
-                       oldcontext2 = MemoryContextSwitchTo(TopMemoryContext);
-                       result->tupdesc = 
CreateTupleDescCopy(tuptable->tupdesc);
-                       MemoryContextSwitchTo(oldcontext2);
+                       result = PLyCSNewResult(callback);
+                       callback->result = (PyObject *) result;
                }
-               PG_CATCH();
-               {
-                       MemoryContextSwitchTo(oldcontext);
-                       MemoryContextDelete(cxt);
-                       Py_DECREF(result);
-                       PG_RE_THROW();
-               }
-               PG_END_TRY();
 
-               MemoryContextDelete(cxt);
-               SPI_freetuptable(tuptable);
+               Py_DECREF(result->status);
+               result->status = PyInt_FromLong(status);
+               Py_DECREF(result->nrows);
+               result->nrows = (rows > (uint64) LONG_MAX) ?
+                       PyFloat_FromDouble((double) rows) :
+                       PyInt_FromLong((long) rows);
        }
 
        return (PyObject *) result;
-- 
2.11.1

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to