On 1/24/17 10:43 PM, Jim Nasby wrote:

I strongly suggest making this design effort a separate thread, and
focusing on the SPI improvements that give "free" no-user-action
performance boosts here.

Fair enough. I posted the SPI portion of that yesterday. That should be
useful for pl/R and possibly pl/perl. pl/tcl could make use of it, but
it would end up executing arbitrary tcl code in the middle of portal
execution, which doesn't strike me as a great idea. Unfortunately, I
don't think plpgsql could make much use of this for similar reasons.

I'll post a plpython patch that doesn't add the output format control.

I've attached the results of that. Unfortunately the speed improvement is only 27% at this point (with 9999999 tuples). Presumably that's because it's constructing a brand new dictionary from scratch for each tuple.
--
Jim Nasby, Data Architect, Blue Treble Consulting, Austin TX
Experts in Analytics, Data Architecture and PostgreSQL
Data in Trouble? Get it in Treble! http://BlueTreble.com
855-TREBLE2 (855-873-2532)
>From 7ef3e944c1ee8266d70fafae080afc6beb492102 Mon Sep 17 00:00:00 2001
From: Jim Nasby <jim.na...@bluetreble.com>
Date: Wed, 25 Jan 2017 12:57:40 -0600
Subject: [PATCH 1/2] Add SPI_execute_callback() and callback-based
 DestReceiver.

Instead of placing results in a tuplestore, this method of execution
uses the supplied callback when creating the Portal for a query.
---
 src/backend/executor/spi.c | 76 ++++++++++++++++++++++++++++++++++++++++------
 src/backend/tcop/dest.c    | 11 +++++++
 src/include/executor/spi.h |  4 +++
 src/include/tcop/dest.h    |  1 +
 4 files changed, 83 insertions(+), 9 deletions(-)

diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c
index 55f97b14e6..d55e06509f 100644
--- a/src/backend/executor/spi.c
+++ b/src/backend/executor/spi.c
@@ -55,7 +55,8 @@ static void _SPI_prepare_oneshot_plan(const char *src, 
SPIPlanPtr plan);
 
 static int _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
                                  Snapshot snapshot, Snapshot 
crosscheck_snapshot,
-                                 bool read_only, bool fire_triggers, uint64 
tcount);
+                                 bool read_only, bool fire_triggers, uint64 
tcount,
+                                 DestReceiver *callback);
 
 static ParamListInfo _SPI_convert_params(int nargs, Oid *argtypes,
                                        Datum *Values, const char *Nulls);
@@ -320,7 +321,34 @@ SPI_execute(const char *src, bool read_only, long tcount)
 
        res = _SPI_execute_plan(&plan, NULL,
                                                        InvalidSnapshot, 
InvalidSnapshot,
-                                                       read_only, true, 
tcount);
+                                                       read_only, true, 
tcount, NULL);
+
+       _SPI_end_call(true);
+       return res;
+}
+int
+SPI_execute_callback(const char *src, bool read_only, long tcount,
+               DestReceiver *callback)
+{
+       _SPI_plan       plan;
+       int                     res;
+
+       if (src == NULL || tcount < 0)
+               return SPI_ERROR_ARGUMENT;
+
+       res = _SPI_begin_call(true);
+       if (res < 0)
+               return res;
+
+       memset(&plan, 0, sizeof(_SPI_plan));
+       plan.magic = _SPI_PLAN_MAGIC;
+       plan.cursor_options = 0;
+
+       _SPI_prepare_oneshot_plan(src, &plan);
+
+       res = _SPI_execute_plan(&plan, NULL,
+                                                       InvalidSnapshot, 
InvalidSnapshot,
+                                                       read_only, true, 
tcount, callback);
 
        _SPI_end_call(true);
        return res;
@@ -354,7 +382,34 @@ SPI_execute_plan(SPIPlanPtr plan, Datum *Values, const 
char *Nulls,
                                                        
_SPI_convert_params(plan->nargs, plan->argtypes,
                                                                                
                Values, Nulls),
                                                        InvalidSnapshot, 
InvalidSnapshot,
-                                                       read_only, true, 
tcount);
+                                                       read_only, true, 
tcount, NULL);
+
+       _SPI_end_call(true);
+       return res;
+}
+
+/* Execute a previously prepared plan with a callback Destination */
+int
+SPI_execute_plan_callback(SPIPlanPtr plan, Datum *Values, const char *Nulls,
+                                bool read_only, long tcount, DestReceiver 
*callback)
+{
+       int                     res;
+
+       if (plan == NULL || plan->magic != _SPI_PLAN_MAGIC || tcount < 0)
+               return SPI_ERROR_ARGUMENT;
+
+       if (plan->nargs > 0 && Values == NULL)
+               return SPI_ERROR_PARAM;
+
+       res = _SPI_begin_call(true);
+       if (res < 0)
+               return res;
+
+       res = _SPI_execute_plan(plan,
+                                                       
_SPI_convert_params(plan->nargs, plan->argtypes,
+                                                                               
                Values, Nulls),
+                                                       InvalidSnapshot, 
InvalidSnapshot,
+                                                       read_only, true, 
tcount, callback);
 
        _SPI_end_call(true);
        return res;
@@ -383,7 +438,7 @@ SPI_execute_plan_with_paramlist(SPIPlanPtr plan, 
ParamListInfo params,
 
        res = _SPI_execute_plan(plan, params,
                                                        InvalidSnapshot, 
InvalidSnapshot,
-                                                       read_only, true, 
tcount);
+                                                       read_only, true, 
tcount, NULL);
 
        _SPI_end_call(true);
        return res;
@@ -424,7 +479,7 @@ SPI_execute_snapshot(SPIPlanPtr plan,
                                                        
_SPI_convert_params(plan->nargs, plan->argtypes,
                                                                                
                Values, Nulls),
                                                        snapshot, 
crosscheck_snapshot,
-                                                       read_only, 
fire_triggers, tcount);
+                                                       read_only, 
fire_triggers, tcount, NULL);
 
        _SPI_end_call(true);
        return res;
@@ -471,7 +526,7 @@ SPI_execute_with_args(const char *src,
 
        res = _SPI_execute_plan(&plan, paramLI,
                                                        InvalidSnapshot, 
InvalidSnapshot,
-                                                       read_only, true, 
tcount);
+                                                       read_only, true, 
tcount, NULL);
 
        _SPI_end_call(true);
        return res;
@@ -1892,7 +1947,8 @@ _SPI_prepare_oneshot_plan(const char *src, SPIPlanPtr 
plan)
 static int
 _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
                                  Snapshot snapshot, Snapshot 
crosscheck_snapshot,
-                                 bool read_only, bool fire_triggers, uint64 
tcount)
+                                 bool read_only, bool fire_triggers, uint64 
tcount,
+                                 DestReceiver *callback)
 {
        int                     my_res = 0;
        uint64          my_processed = 0;
@@ -1903,6 +1959,7 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
        ErrorContextCallback spierrcontext;
        CachedPlan *cplan = NULL;
        ListCell   *lc1;
+       DestReceiver *dest = callback;
 
        /*
         * Setup error traceback support for ereport()
@@ -2020,7 +2077,6 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
                {
                        PlannedStmt *stmt = castNode(PlannedStmt, lfirst(lc2));
                        bool            canSetTag = stmt->canSetTag;
-                       DestReceiver *dest;
 
                        _SPI_current->processed = 0;
                        _SPI_current->lastoid = InvalidOid;
@@ -2065,7 +2121,8 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
                                UpdateActiveSnapshotCommandId();
                        }
 
-                       dest = CreateDestReceiver(canSetTag ? DestSPI : 
DestNone);
+                       if (!callback)
+                               dest = CreateDestReceiver(canSetTag ? DestSPI : 
DestNone);
 
                        if (stmt->utilityStmt == NULL)
                        {
@@ -2090,6 +2147,7 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
                        {
                                char            
completionTag[COMPLETION_TAG_BUFSIZE];
 
+                               // XXX throw error if callback is set
                                ProcessUtility(stmt,
                                                           
plansource->query_string,
                                                           
PROCESS_UTILITY_QUERY,
diff --git a/src/backend/tcop/dest.c b/src/backend/tcop/dest.c
index 28081c3765..bd671e0b26 100644
--- a/src/backend/tcop/dest.c
+++ b/src/backend/tcop/dest.c
@@ -87,6 +87,11 @@ static DestReceiver spi_printtupDR = {
        DestSPI
 };
 
+static DestReceiver spi_callbackDR = {
+       donothingReceive, donothingStartup, donothingCleanup, donothingCleanup,
+       DestSPICallback
+};
+
 /* Globally available receiver for DestNone */
 DestReceiver *None_Receiver = &donothingDR;
 
@@ -126,6 +131,9 @@ CreateDestReceiver(CommandDest dest)
                case DestSPI:
                        return &spi_printtupDR;
 
+               case DestSPICallback:
+                       return &spi_callbackDR;
+
                case DestTuplestore:
                        return CreateTuplestoreDestReceiver();
 
@@ -172,6 +180,7 @@ EndCommand(const char *commandTag, CommandDest dest)
                case DestNone:
                case DestDebug:
                case DestSPI:
+               case DestSPICallback:
                case DestTuplestore:
                case DestIntoRel:
                case DestCopyOut:
@@ -216,6 +225,7 @@ NullCommand(CommandDest dest)
                case DestNone:
                case DestDebug:
                case DestSPI:
+               case DestSPICallback:
                case DestTuplestore:
                case DestIntoRel:
                case DestCopyOut:
@@ -262,6 +272,7 @@ ReadyForQuery(CommandDest dest)
                case DestNone:
                case DestDebug:
                case DestSPI:
+               case DestSPICallback:
                case DestTuplestore:
                case DestIntoRel:
                case DestCopyOut:
diff --git a/src/include/executor/spi.h b/src/include/executor/spi.h
index a18ae63245..d779511130 100644
--- a/src/include/executor/spi.h
+++ b/src/include/executor/spi.h
@@ -74,11 +74,15 @@ extern PGDLLIMPORT int SPI_result;
 extern int     SPI_connect(void);
 extern int     SPI_finish(void);
 extern int     SPI_execute(const char *src, bool read_only, long tcount);
+extern int     SPI_execute_callback(const char *src, bool read_only, long 
tcount,
+                                                                       
DestReceiver *callback);
 extern int SPI_execute_plan(SPIPlanPtr plan, Datum *Values, const char *Nulls,
                                 bool read_only, long tcount);
 extern int SPI_execute_plan_with_paramlist(SPIPlanPtr plan,
                                                                ParamListInfo 
params,
                                                                bool read_only, 
long tcount);
+extern int SPI_execute_plan_callback(SPIPlanPtr plan, Datum *Values, const 
char *Nulls,
+                                bool read_only, long tcount, DestReceiver 
*callback);
 extern int     SPI_exec(const char *src, long tcount);
 extern int SPI_execp(SPIPlanPtr plan, Datum *Values, const char *Nulls,
                  long tcount);
diff --git a/src/include/tcop/dest.h b/src/include/tcop/dest.h
index c459af2e13..1d1d641ae0 100644
--- a/src/include/tcop/dest.h
+++ b/src/include/tcop/dest.h
@@ -91,6 +91,7 @@ typedef enum
        DestRemoteExecute,                      /* sent to frontend, in Execute 
command */
        DestRemoteSimple,                       /* sent to frontend, w/no 
catalog access */
        DestSPI,                                        /* results sent to SPI 
manager */
+       DestSPICallback,                        /* results sent to 
user-specified callback function */
        DestTuplestore,                         /* results sent to Tuplestore */
        DestIntoRel,                            /* results sent to relation 
(SELECT INTO) */
        DestCopyOut,                            /* results sent to COPY TO code 
*/
-- 
2.11.1

>From 0fb68b2e2649ab81fae8c86c45c1522f1b0d56ab Mon Sep 17 00:00:00 2001
From: Jim Nasby <jim.na...@bluetreble.com>
Date: Tue, 28 Feb 2017 21:54:45 -0600
Subject: [PATCH 2/2] Minimal adoption of SPI callbacks in plpython

---
 src/pl/plpython/plpy_main.c |  13 ++
 src/pl/plpython/plpy_main.h |   3 +
 src/pl/plpython/plpy_spi.c  | 294 ++++++++++++++++++++++++++++++++------------
 3 files changed, 231 insertions(+), 79 deletions(-)

diff --git a/src/pl/plpython/plpy_main.c b/src/pl/plpython/plpy_main.c
index 860b804e54..07501f18f2 100644
--- a/src/pl/plpython/plpy_main.c
+++ b/src/pl/plpython/plpy_main.c
@@ -403,6 +403,19 @@ PLy_current_execution_context(void)
        return PLy_execution_contexts;
 }
 
+PLyExecutionContext *
+PLy_switch_execution_context(PLyExecutionContext *new)
+{
+       PLyExecutionContext *last = PLy_execution_contexts;
+
+       if (PLy_execution_contexts == NULL)
+               elog(ERROR, "no Python function is currently executing");
+
+       PLy_execution_contexts = new;
+
+       return last;
+}
+
 MemoryContext
 PLy_get_scratch_context(PLyExecutionContext *context)
 {
diff --git a/src/pl/plpython/plpy_main.h b/src/pl/plpython/plpy_main.h
index 10426c4323..7cbe0a8d35 100644
--- a/src/pl/plpython/plpy_main.h
+++ b/src/pl/plpython/plpy_main.h
@@ -25,6 +25,9 @@ typedef struct PLyExecutionContext
 /* Get the current execution context */
 extern PLyExecutionContext *PLy_current_execution_context(void);
 
+/* Get switch execution contexts */
+extern PLyExecutionContext *PLy_switch_execution_context(PLyExecutionContext 
*new);
+
 /* Get the scratch memory context for specified execution context */
 extern MemoryContext PLy_get_scratch_context(PLyExecutionContext *context);
 
diff --git a/src/pl/plpython/plpy_spi.c b/src/pl/plpython/plpy_spi.c
index 07ab6a087e..6f48e70329 100644
--- a/src/pl/plpython/plpy_spi.c
+++ b/src/pl/plpython/plpy_spi.c
@@ -28,10 +28,30 @@
 #include "plpy_procedure.h"
 #include "plpy_resultobject.h"
 
+typedef struct
+{
+       DestReceiver pub;
+       PLyExecutionContext *exec_ctx;
+       MemoryContext parent_ctx;
+       MemoryContext cb_ctx;
+       TupleDesc       desc;
+       PLyTypeInfo *args;
+
+       PyObject        *result;
+} CallbackState;
+
 
+
+void PLy_CSStartup(DestReceiver *self, int operation, TupleDesc typeinfo);
+void PLy_CSDestroy(DestReceiver *self);
+static bool PLy_CSreceive(TupleTableSlot *slot, DestReceiver *self);
+static CallbackState *PLy_Callback_New(PLyExecutionContext *exec_ctx);
+static CallbackState *PLy_Callback_Free(CallbackState *callback);
+static PLyResultObject *PLyCSNewResult(CallbackState *myState);
+ 
 static PyObject *PLy_spi_execute_query(char *query, long limit);
 static PyObject *PLy_spi_execute_plan(PyObject *ob, PyObject *list, long 
limit);
-static PyObject *PLy_spi_execute_fetch_result(SPITupleTable *tuptable,
+static PyObject *PLy_spi_execute_fetch_result(CallbackState    *callback,
                                                         uint64 rows, int 
status);
 static void PLy_spi_exception_set(PyObject *excclass, ErrorData *edata);
 
@@ -196,6 +216,7 @@ PLy_spi_execute(PyObject *self, PyObject *args)
 static PyObject *
 PLy_spi_execute_plan(PyObject *ob, PyObject *list, long limit)
 {
+       CallbackState   *callback;
        volatile int nargs;
        int                     i,
                                rv;
@@ -289,9 +310,11 @@ PLy_spi_execute_plan(PyObject *ob, PyObject *list, long 
limit)
                        }
                }
 
-               rv = SPI_execute_plan(plan->plan, plan->values, nulls,
-                                                         
exec_ctx->curr_proc->fn_readonly, limit);
-               ret = PLy_spi_execute_fetch_result(SPI_tuptable, SPI_processed, 
rv);
+               callback = PLy_Callback_New(exec_ctx);
+               rv = SPI_execute_plan_callback(plan->plan, plan->values, nulls,
+                                                       
exec_ctx->curr_proc->fn_readonly, limit,
+                                                       (DestReceiver *) 
callback);
+               ret = PLy_spi_execute_fetch_result(callback, SPI_processed, rv);
 
                if (nargs > 0)
                        pfree(nulls);
@@ -316,9 +339,11 @@ PLy_spi_execute_plan(PyObject *ob, PyObject *list, long 
limit)
                }
 
                PLy_spi_subtransaction_abort(oldcontext, oldowner);
+               PLy_Callback_Free(callback);
                return NULL;
        }
        PG_END_TRY();
+       callback = PLy_Callback_Free(callback);
 
        for (i = 0; i < nargs; i++)
        {
@@ -344,6 +369,8 @@ PLy_spi_execute_plan(PyObject *ob, PyObject *list, long 
limit)
 static PyObject *
 PLy_spi_execute_query(char *query, long limit)
 {
+       PLyExecutionContext *exec_ctx = PLy_current_execution_context();
+       CallbackState   *callback;
        int                     rv;
        volatile MemoryContext oldcontext;
        volatile ResourceOwner oldowner;
@@ -356,20 +383,23 @@ PLy_spi_execute_query(char *query, long limit)
 
        PG_TRY();
        {
-               PLyExecutionContext *exec_ctx = PLy_current_execution_context();
-
                pg_verifymbstr(query, strlen(query), false);
-               rv = SPI_execute(query, exec_ctx->curr_proc->fn_readonly, 
limit);
-               ret = PLy_spi_execute_fetch_result(SPI_tuptable, SPI_processed, 
rv);
+               callback = PLy_Callback_New(exec_ctx);
+               rv = SPI_execute_callback(query, 
exec_ctx->curr_proc->fn_readonly, limit,
+                               (DestReceiver *) callback);
+
+               ret = PLy_spi_execute_fetch_result(callback, SPI_processed, rv);
 
                PLy_spi_subtransaction_commit(oldcontext, oldowner);
        }
        PG_CATCH();
        {
                PLy_spi_subtransaction_abort(oldcontext, oldowner);
+               PLy_Callback_Free(callback);
                return NULL;
        }
        PG_END_TRY();
+       callback = PLy_Callback_Free(callback);
 
        if (rv < 0)
        {
@@ -383,94 +413,200 @@ PLy_spi_execute_query(char *query, long limit)
        return ret;
 }
 
-static PyObject *
-PLy_spi_execute_fetch_result(SPITupleTable *tuptable, uint64 rows, int status)
+static CallbackState *
+PLy_Callback_New(PLyExecutionContext *exec_ctx)
 {
+       volatile MemoryContext oldcontext, cb_ctx;
+       CallbackState *callback;
+
+       callback = palloc0(sizeof(CallbackState));
+
+       /*
+        * Use a new context to make cleanup easier. Allocate it in the current
+        * context so we don't have to worry about cleaning it up if there's an
+        * error.
+        */
+       cb_ctx = AllocSetContextCreate(CurrentMemoryContext,
+                                                               "PL/Python 
callback context",
+                                                               
ALLOCSET_DEFAULT_SIZES);
+
+       oldcontext = MemoryContextSwitchTo(cb_ctx);
+       callback->parent_ctx = oldcontext;
+       callback->cb_ctx = cb_ctx;
+       memcpy(&(callback->pub), CreateDestReceiver(DestSPICallback), 
sizeof(DestReceiver));
+       callback->pub.receiveSlot = PLy_CSreceive;
+       callback->pub.rStartup = PLy_CSStartup;
+       callback->pub.rDestroy = PLy_CSDestroy;
+       callback->exec_ctx = exec_ctx;
+       
+       MemoryContextSwitchTo(oldcontext);
+
+       return callback;
+}
+
+static CallbackState *
+PLy_Callback_Free(CallbackState *callback)
+{
+       if (callback)
+       {
+               if (callback->cb_ctx)
+                       (callback->pub.rDestroy) ((DestReceiver *) callback);
+
+               pfree(callback);
+       }
+
+       return (CallbackState *) NULL;
+}
+
+static PLyResultObject *
+PLyCSNewResult(CallbackState *myState)
+{
+       MemoryContext oldctx;
+
+       /* The result info needs to be in the parent context */
+       oldctx = MemoryContextSwitchTo(myState->parent_ctx);
+       myState->result = PLy_result_new();
+       if (myState->result == NULL)
+               PLy_elog(ERROR, "could not create new result object");
+
+       MemoryContextSwitchTo(oldctx);
+       return (PLyResultObject *) myState->result;
+}
+
+void
+PLy_CSStartup(DestReceiver *self, int operation, TupleDesc typeinfo)
+{
+       PLyExecutionContext *old_exec_ctx;
+       CallbackState *myState = (CallbackState *) self;
        PLyResultObject *result;
-       volatile MemoryContext oldcontext;
+       PLyTypeInfo *args;
+       MemoryContext mctx, old_mctx;
 
-       result = (PLyResultObject *) PLy_result_new();
-       Py_DECREF(result->status);
-       result->status = PyInt_FromLong(status);
+       /*
+        * We may be in a different execution context when we're called, so 
switch
+        * back to our original one.
+        */
+       mctx = myState->cb_ctx;
+       old_exec_ctx = PLy_switch_execution_context(myState->exec_ctx);
+       old_mctx = MemoryContextSwitchTo(mctx);
+       
+       /* We need to store this because the TupleDesc the receive function 
gets has no names. */
+       myState->desc = typeinfo;
+
+       /* Setup type conversion info */
+       myState->args = args = palloc0(sizeof(PLyTypeInfo));
+       PLy_typeinfo_init(args, mctx);
+       PLy_input_tuple_funcs(args, typeinfo);
+
+       result = PLyCSNewResult(myState);
+       
+       /*
+        * Save tuple descriptor for later use by result set metadata
+        * functions.  Save it in TopMemoryContext so that it survives
+        * outside of an SPI context.  We trust that PLy_result_dealloc()
+        * will clean it up when the time is right. XXX This might result in a 
leak
+        * if an error happens and the result doesn't get dereferenced. 
+        */
+       MemoryContextSwitchTo(TopMemoryContext);
+       result->tupdesc = CreateTupleDescCopy(typeinfo);
+
+       MemoryContextSwitchTo(old_mctx);
+       PLy_switch_execution_context(old_exec_ctx);
+}
 
-       if (status > 0 && tuptable == NULL)
+void
+PLy_CSDestroy(DestReceiver *self)
+{
+       CallbackState *myState = (CallbackState *) self;
+       MemoryContext cb_ctx = myState->cb_ctx;
+
+       MemoryContextDelete(cb_ctx);
+       myState->cb_ctx = 0;
+}
+
+static bool
+PLy_CSreceive(TupleTableSlot *slot, DestReceiver *self)
+{
+       TupleDesc               slotdesc = slot->tts_tupleDescriptor;
+       CallbackState   *myState = (CallbackState *) self;
+       TupleDesc               desc = myState->desc;
+       PLyTypeInfo     *args = myState->args;
+       PLyResultObject *result = (PLyResultObject *) myState->result;
+       PLyExecutionContext *old_exec_ctx = 
PLy_switch_execution_context(myState->exec_ctx);
+       MemoryContext   scratch_context = 
PLy_get_scratch_context(myState->exec_ctx);
+       MemoryContext   oldcontext = CurrentMemoryContext;
+       int                     rv;
+       PyObject   *row;
+
+       /* Verify saved state matches incoming slot */
+       Assert(desc->tdtypeid == slotdesc->tdtypeid);
+       Assert(args->in.r.natts == slotdesc->natts);
+
+       /* Make sure the tuple is fully deconstructed */
+       slot_getallattrs(slot);
+
+       /*
+        * Do the work in the scratch context to avoid leaking memory from the
+        * datatype output function calls.
+        */
+       MemoryContextSwitchTo(scratch_context);
+
+       PG_TRY();
        {
-               Py_DECREF(result->nrows);
-               result->nrows = (rows > (uint64) LONG_MAX) ?
-                       PyFloat_FromDouble((double) rows) :
-                       PyInt_FromLong((long) rows);
+               row = PLyDict_FromTuple(args, ExecFetchSlotTuple(slot), desc);
        }
-       else if (status > 0 && tuptable != NULL)
+       PG_CATCH();
        {
-               PLyTypeInfo args;
-               MemoryContext cxt;
+               Py_XDECREF(row);
+               MemoryContextSwitchTo(oldcontext);
+               PLy_switch_execution_context(old_exec_ctx);
+               PG_RE_THROW();
+       }
+       PG_END_TRY();
 
-               Py_DECREF(result->nrows);
-               result->nrows = (rows > (uint64) LONG_MAX) ?
-                       PyFloat_FromDouble((double) rows) :
-                       PyInt_FromLong((long) rows);
+       /*
+        * If we tried to do this in the PG_CATCH we'd have to mark value
+        * as volatile, but that won't work with PyList_Append, so just
+        * test the error code after doing Py_DECREF().
+        */
+       rv = PyList_Append(result->rows, row);
+       Py_DECREF(row);
+       
+       if (rv != 0)
+               ereport(ERROR,
+                               (errmsg("unable to append value to list")));
 
-               cxt = AllocSetContextCreate(CurrentMemoryContext,
-                                                                       
"PL/Python temp context",
-                                                                       
ALLOCSET_DEFAULT_SIZES);
-               PLy_typeinfo_init(&args, cxt);
+       MemoryContextSwitchTo(oldcontext);
+       MemoryContextReset(scratch_context);
+       PLy_switch_execution_context(old_exec_ctx);
 
-               oldcontext = CurrentMemoryContext;
-               PG_TRY();
-               {
-                       MemoryContext oldcontext2;
+       return true;
+}
 
-                       if (rows)
-                       {
-                               uint64          i;
-
-                               /*
-                                * PyList_New() and PyList_SetItem() use 
Py_ssize_t for list
-                                * size and list indices; so we cannot support 
a result larger
-                                * than PY_SSIZE_T_MAX.
-                                */
-                               if (rows > (uint64) PY_SSIZE_T_MAX)
-                                       ereport(ERROR,
-                                                       
(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
-                                                        errmsg("query result 
has too many rows to fit in a Python list")));
-
-                               Py_DECREF(result->rows);
-                               result->rows = PyList_New(rows);
-
-                               PLy_input_tuple_funcs(&args, tuptable->tupdesc);
-                               for (i = 0; i < rows; i++)
-                               {
-                                       PyObject   *row = 
PLyDict_FromTuple(&args,
-                                                                               
                                tuptable->vals[i],
-                                                                               
                                tuptable->tupdesc);
 
-                                       PyList_SetItem(result->rows, i, row);
-                               }
-                       }
+static PyObject *
+PLy_spi_execute_fetch_result(CallbackState *callback, uint64 rows, int status)
+{
+       PLyResultObject *result = (PLyResultObject *) callback->result;
 
+       /* If status < 0 this stuff would just get thrown away anyway. */
+       if (status > 0)
+       {
+               if (!result)
+               {
                        /*
-                        * Save tuple descriptor for later use by result set 
metadata
-                        * functions.  Save it in TopMemoryContext so that it 
survives
-                        * outside of an SPI context.  We trust that 
PLy_result_dealloc()
-                        * will clean it up when the time is right.  (Do this 
as late as
-                        * possible, to minimize the number of ways the tupdesc 
could get
-                        * leaked due to errors.)
+                        * This happens if the command returned no results. 
Create a dummy result set.
                         */
-                       oldcontext2 = MemoryContextSwitchTo(TopMemoryContext);
-                       result->tupdesc = 
CreateTupleDescCopy(tuptable->tupdesc);
-                       MemoryContextSwitchTo(oldcontext2);
-               }
-               PG_CATCH();
-               {
-                       MemoryContextSwitchTo(oldcontext);
-                       MemoryContextDelete(cxt);
-                       Py_DECREF(result);
-                       PG_RE_THROW();
+                       result = PLyCSNewResult(callback);
+                       callback->result = (PyObject *) result;
                }
-               PG_END_TRY();
 
-               MemoryContextDelete(cxt);
-               SPI_freetuptable(tuptable);
+               Py_DECREF(result->status);
+               result->status = PyInt_FromLong(status);
+               Py_DECREF(result->nrows);
+               result->nrows = (rows > (uint64) LONG_MAX) ?
+                       PyFloat_FromDouble((double) rows) :
+                       PyInt_FromLong((long) rows);
        }
 
        return (PyObject *) result;
-- 
2.11.1

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to