In the attached patch (snippet below), I'm seeing something strange with args->in.r.atts[]. Prior to entering the PG_TRY block, I can inspect things in lldb just fine:

(lldb) call args->in.r.atts[0].func
(PLyDatumToObFunc) $49 = 0x000000010fc4dc70 (plpython3.so`PLyString_FromDatum at plpy_typeio.c:621)
(lldb) call &args->in.r.atts[0]
(PLyDatumToOb *) $50 = 0x00007fd2b302f6d0
(lldb) call args->in.r.atts[0]
(PLyDatumToOb) $51 = {
func = 0x000000010fc4dc70 (plpython3.so`PLyString_FromDatum at plpy_typeio.c:621)
  typfunc = {
    fn_addr = 0x000000010f478b50 (postgres`textout at varlena.c:521)
    fn_oid = 47
...

But I'm getting a EXC_BAD_ACCESS (code=1, address=0xb302f6d0) on the last if in the snippet below. Looking at the variables again, I see:

(lldb) call args->in.r.atts[i].func
error: Couldn't apply expression side effects : Couldn't dematerialize a result variable: couldn't read its memory
(lldb) call args->in.r.atts[i]
error: Couldn't apply expression side effects : Couldn't dematerialize a result variable: couldn't read its memory

I saw the comment on PG_TRY about marking things as volatile, but my understanding from the comment is I shouldn't even need to do that, since these variables aren't being modified.

static bool
PLy_CSreceive(TupleTableSlot *slot, DestReceiver *self)
{
        volatile TupleDesc      desc = slot->tts_tupleDescriptor;
        volatile CallbackState *myState = (CallbackState *) self;
        volatile PLyTypeInfo *args = myState->args;

        PLyExecutionContext *old_exec_ctx = 
PLy_switch_execution_context(myState->exec_ctx);
        MemoryContext scratch_context = 
PLy_get_scratch_context(myState->exec_ctx);
        MemoryContext oldcontext = CurrentMemoryContext;

        /* Verify saved state matches incoming slot */
        Assert(myState->desc == desc);
        Assert(args->in.r.natts == desc->natts);

        /* Make sure the tuple is fully deconstructed */
        slot_getallattrs(slot);

        MemoryContextSwitchTo(scratch_context);

        PG_TRY();
        {
                int                     i, rv;

                /*
                 * Do the work in the scratch context to avoid leaking memory 
from the
                 * datatype output function calls.
                 */
                for (i = 0; i < desc->natts; i++)
                {
                        PyObject   *value = NULL;

                        if (desc->attrs[i]->attisdropped)
                                continue;

                        if (myState->lists[i] == NULL)
                                ereport(ERROR,
                                                (errmsg("missing list for attribute 
%d", i)));
                        /* XXX If the function can't be null, ditch that check 
*/
                        if (slot->tts_isnull[i] || args->in.r.atts[i].func == 
NULL)
--
Jim Nasby, Data Architect, Blue Treble Consulting, Austin TX
Experts in Analytics, Data Architecture and PostgreSQL
Data in Trouble? Get it in Treble! http://BlueTreble.com
855-TREBLE2 (855-873-2532)
diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c
index 80fc4c4725..ace30d75f0 100644
--- a/src/backend/executor/spi.c
+++ b/src/backend/executor/spi.c
@@ -55,7 +55,8 @@ static void _SPI_prepare_oneshot_plan(const char *src, 
SPIPlanPtr plan);
 
 static int _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
                                  Snapshot snapshot, Snapshot 
crosscheck_snapshot,
-                                 bool read_only, bool fire_triggers, uint64 
tcount);
+                                 bool read_only, bool fire_triggers, uint64 
tcount,
+                                 DestReceiver *callback);
 
 static ParamListInfo _SPI_convert_params(int nargs, Oid *argtypes,
                                        Datum *Values, const char *Nulls);
@@ -320,7 +321,34 @@ SPI_execute(const char *src, bool read_only, long tcount)
 
        res = _SPI_execute_plan(&plan, NULL,
                                                        InvalidSnapshot, 
InvalidSnapshot,
-                                                       read_only, true, 
tcount);
+                                                       read_only, true, 
tcount, NULL);
+
+       _SPI_end_call(true);
+       return res;
+}
+int
+SPI_execute_callback(const char *src, bool read_only, long tcount,
+               DestReceiver *callback)
+{
+       _SPI_plan       plan;
+       int                     res;
+
+       if (src == NULL || tcount < 0)
+               return SPI_ERROR_ARGUMENT;
+
+       res = _SPI_begin_call(true);
+       if (res < 0)
+               return res;
+
+       memset(&plan, 0, sizeof(_SPI_plan));
+       plan.magic = _SPI_PLAN_MAGIC;
+       plan.cursor_options = 0;
+
+       _SPI_prepare_oneshot_plan(src, &plan);
+
+       res = _SPI_execute_plan(&plan, NULL,
+                                                       InvalidSnapshot, 
InvalidSnapshot,
+                                                       read_only, true, 
tcount, callback);
 
        _SPI_end_call(true);
        return res;
@@ -354,7 +382,34 @@ SPI_execute_plan(SPIPlanPtr plan, Datum *Values, const 
char *Nulls,
                                                        
_SPI_convert_params(plan->nargs, plan->argtypes,
                                                                                
                Values, Nulls),
                                                        InvalidSnapshot, 
InvalidSnapshot,
-                                                       read_only, true, 
tcount);
+                                                       read_only, true, 
tcount, NULL);
+
+       _SPI_end_call(true);
+       return res;
+}
+
+/* Execute a previously prepared plan with a callback Destination */
+int
+SPI_execute_plan_callback(SPIPlanPtr plan, Datum *Values, const char *Nulls,
+                                bool read_only, long tcount, DestReceiver 
*callback)
+{
+       int                     res;
+
+       if (plan == NULL || plan->magic != _SPI_PLAN_MAGIC || tcount < 0)
+               return SPI_ERROR_ARGUMENT;
+
+       if (plan->nargs > 0 && Values == NULL)
+               return SPI_ERROR_PARAM;
+
+       res = _SPI_begin_call(true);
+       if (res < 0)
+               return res;
+
+       res = _SPI_execute_plan(plan,
+                                                       
_SPI_convert_params(plan->nargs, plan->argtypes,
+                                                                               
                Values, Nulls),
+                                                       InvalidSnapshot, 
InvalidSnapshot,
+                                                       read_only, true, 
tcount, callback);
 
        _SPI_end_call(true);
        return res;
@@ -383,7 +438,7 @@ SPI_execute_plan_with_paramlist(SPIPlanPtr plan, 
ParamListInfo params,
 
        res = _SPI_execute_plan(plan, params,
                                                        InvalidSnapshot, 
InvalidSnapshot,
-                                                       read_only, true, 
tcount);
+                                                       read_only, true, 
tcount, NULL);
 
        _SPI_end_call(true);
        return res;
@@ -424,7 +479,7 @@ SPI_execute_snapshot(SPIPlanPtr plan,
                                                        
_SPI_convert_params(plan->nargs, plan->argtypes,
                                                                                
                Values, Nulls),
                                                        snapshot, 
crosscheck_snapshot,
-                                                       read_only, 
fire_triggers, tcount);
+                                                       read_only, 
fire_triggers, tcount, NULL);
 
        _SPI_end_call(true);
        return res;
@@ -471,7 +526,7 @@ SPI_execute_with_args(const char *src,
 
        res = _SPI_execute_plan(&plan, paramLI,
                                                        InvalidSnapshot, 
InvalidSnapshot,
-                                                       read_only, true, 
tcount);
+                                                       read_only, true, 
tcount, NULL);
 
        _SPI_end_call(true);
        return res;
@@ -1892,7 +1947,8 @@ _SPI_prepare_oneshot_plan(const char *src, SPIPlanPtr 
plan)
 static int
 _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
                                  Snapshot snapshot, Snapshot 
crosscheck_snapshot,
-                                 bool read_only, bool fire_triggers, uint64 
tcount)
+                                 bool read_only, bool fire_triggers, uint64 
tcount,
+                                 DestReceiver *callback)
 {
        int                     my_res = 0;
        uint64          my_processed = 0;
@@ -1903,6 +1959,7 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
        ErrorContextCallback spierrcontext;
        CachedPlan *cplan = NULL;
        ListCell   *lc1;
+       DestReceiver *dest = callback;
 
        /*
         * Setup error traceback support for ereport()
@@ -2020,7 +2077,6 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
                {
                        Node       *stmt = (Node *) lfirst(lc2);
                        bool            canSetTag;
-                       DestReceiver *dest;
 
                        _SPI_current->processed = 0;
                        _SPI_current->lastoid = InvalidOid;
@@ -2072,7 +2128,8 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
                                UpdateActiveSnapshotCommandId();
                        }
 
-                       dest = CreateDestReceiver(canSetTag ? DestSPI : 
DestNone);
+                       if (!callback)
+                               dest = CreateDestReceiver(canSetTag ? DestSPI : 
DestNone);
 
                        if (IsA(stmt, PlannedStmt) &&
                                ((PlannedStmt *) stmt)->utilityStmt == NULL)
@@ -2098,6 +2155,7 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
                        {
                                char            
completionTag[COMPLETION_TAG_BUFSIZE];
 
+                               // XXX throw error if callback is set
                                ProcessUtility(stmt,
                                                           
plansource->query_string,
                                                           
PROCESS_UTILITY_QUERY,
diff --git a/src/backend/tcop/dest.c b/src/backend/tcop/dest.c
index 60a92801f1..20b6dbf4ff 100644
--- a/src/backend/tcop/dest.c
+++ b/src/backend/tcop/dest.c
@@ -81,6 +81,11 @@ static DestReceiver spi_printtupDR = {
        DestSPI
 };
 
+static DestReceiver spi_callbackDR = {
+       donothingReceive, donothingStartup, donothingCleanup, donothingCleanup,
+       DestSPICallback
+};
+
 /* Globally available receiver for DestNone */
 DestReceiver *None_Receiver = &donothingDR;
 
@@ -117,6 +122,9 @@ CreateDestReceiver(CommandDest dest)
                case DestSPI:
                        return &spi_printtupDR;
 
+               case DestSPICallback:
+                       return &spi_callbackDR;
+
                case DestTuplestore:
                        return CreateTuplestoreDestReceiver();
 
diff --git a/src/include/executor/spi.h b/src/include/executor/spi.h
index 76ba394a2b..15187c47e3 100644
--- a/src/include/executor/spi.h
+++ b/src/include/executor/spi.h
@@ -74,11 +74,15 @@ extern PGDLLIMPORT int SPI_result;
 extern int     SPI_connect(void);
 extern int     SPI_finish(void);
 extern int     SPI_execute(const char *src, bool read_only, long tcount);
+extern int     SPI_execute_callback(const char *src, bool read_only, long 
tcount,
+                                                                       
DestReceiver *callback);
 extern int SPI_execute_plan(SPIPlanPtr plan, Datum *Values, const char *Nulls,
                                 bool read_only, long tcount);
 extern int SPI_execute_plan_with_paramlist(SPIPlanPtr plan,
                                                                ParamListInfo 
params,
                                                                bool read_only, 
long tcount);
+extern int SPI_execute_plan_callback(SPIPlanPtr plan, Datum *Values, const 
char *Nulls,
+                                bool read_only, long tcount, DestReceiver 
*callback);
 extern int     SPI_exec(const char *src, long tcount);
 extern int SPI_execp(SPIPlanPtr plan, Datum *Values, const char *Nulls,
                  long tcount);
diff --git a/src/include/tcop/dest.h b/src/include/tcop/dest.h
index dd80433f74..5389f496a2 100644
--- a/src/include/tcop/dest.h
+++ b/src/include/tcop/dest.h
@@ -90,6 +90,7 @@ typedef enum
        DestRemote,                                     /* results sent to 
frontend process */
        DestRemoteExecute,                      /* sent to frontend, in Execute 
command */
        DestSPI,                                        /* results sent to SPI 
manager */
+       DestSPICallback,                        /* results sent to 
user-specified callback function */
        DestTuplestore,                         /* results sent to Tuplestore */
        DestIntoRel,                            /* results sent to relation 
(SELECT INTO) */
        DestCopyOut,                            /* results sent to COPY TO code 
*/
diff --git a/src/pl/plpython/plpy_main.c b/src/pl/plpython/plpy_main.c
index 860b804e54..07501f18f2 100644
--- a/src/pl/plpython/plpy_main.c
+++ b/src/pl/plpython/plpy_main.c
@@ -403,6 +403,19 @@ PLy_current_execution_context(void)
        return PLy_execution_contexts;
 }
 
+PLyExecutionContext *
+PLy_switch_execution_context(PLyExecutionContext *new)
+{
+       PLyExecutionContext *last = PLy_execution_contexts;
+
+       if (PLy_execution_contexts == NULL)
+               elog(ERROR, "no Python function is currently executing");
+
+       PLy_execution_contexts = new;
+
+       return last;
+}
+
 MemoryContext
 PLy_get_scratch_context(PLyExecutionContext *context)
 {
diff --git a/src/pl/plpython/plpy_main.h b/src/pl/plpython/plpy_main.h
index 10426c4323..7cbe0a8d35 100644
--- a/src/pl/plpython/plpy_main.h
+++ b/src/pl/plpython/plpy_main.h
@@ -25,6 +25,9 @@ typedef struct PLyExecutionContext
 /* Get the current execution context */
 extern PLyExecutionContext *PLy_current_execution_context(void);
 
+/* Get switch execution contexts */
+extern PLyExecutionContext *PLy_switch_execution_context(PLyExecutionContext 
*new);
+
 /* Get the scratch memory context for specified execution context */
 extern MemoryContext PLy_get_scratch_context(PLyExecutionContext *context);
 
diff --git a/src/pl/plpython/plpy_spi.c b/src/pl/plpython/plpy_spi.c
index 07ab6a087e..a2096b6506 100644
--- a/src/pl/plpython/plpy_spi.c
+++ b/src/pl/plpython/plpy_spi.c
@@ -28,8 +28,27 @@
 #include "plpy_procedure.h"
 #include "plpy_resultobject.h"
 
+typedef struct
+{
+       DestReceiver pub;
+       PLyExecutionContext *exec_ctx;
+       MemoryContext mctx;
+       TupleDesc       desc;
+       PLyTypeInfo *args;
+
+       /* Dictionary of Lists */
+       PyObject        *dict;
+       PyObject        **lists;
+} CallbackState;
+
+
+
+void PLy_CSStartup(DestReceiver *self, int operation, TupleDesc typeinfo);
+void PLy_CSDestroy(DestReceiver *self);
+static bool PLy_CSreceive(TupleTableSlot *slot, DestReceiver *self);
 
 static PyObject *PLy_spi_execute_query(char *query, long limit);
+static PyObject *PLy_spi_execute_query2(char *query, long limit);
 static PyObject *PLy_spi_execute_plan(PyObject *ob, PyObject *list, long 
limit);
 static PyObject *PLy_spi_execute_fetch_result(SPITupleTable *tuptable,
                                                         uint64 rows, int 
status);
@@ -341,8 +360,156 @@ PLy_spi_execute_plan(PyObject *ob, PyObject *list, long 
limit)
        return ret;
 }
 
+void
+PLy_CSStartup(DestReceiver *self, int operation, TupleDesc typeinfo)
+{
+       PLyExecutionContext *old_exec_ctx;
+       CallbackState *myState = (CallbackState *) self;
+       PLyTypeInfo args;
+       
+       MemoryContext mctx, old_mctx;
+       PyObject *dict;
+       PyObject **lists;
+       int                     i;
+
+       /*
+        * We may be in a different execution context when we're called, so 
switch
+        * back to our original one.
+        */
+       mctx = myState->mctx;
+       old_exec_ctx = PLy_switch_execution_context(myState->exec_ctx);
+       old_mctx = MemoryContextSwitchTo(mctx);
+       
+       /* Store this as a sanity check */
+       myState->desc = typeinfo;
+
+       /* Setup type conversion info */
+       PLy_typeinfo_init(&args, mctx);
+       myState->args = &args;
+       PLy_input_tuple_funcs(&args, typeinfo);
+
+       /*
+        * We never palloc python objects, but this is an array of object 
pointers,
+        * so it's OK.
+        */
+       myState->lists = lists = palloc0(args.in.r.natts * sizeof(PyObject *));
+
+       myState->dict = dict = PyDict_New();
+       if (dict == NULL)
+               PLy_elog(ERROR, "could not create new dictionary");
+
+
+       for (i = 0; i < args.in.r.natts; i++)
+       {
+               char       *key;
+               PyObject   *value;
+
+               if (typeinfo->attrs[i]->attisdropped)
+                       continue;
+
+               /* NOTE: If size is > 0 then the list must get initialized! */
+               value = PyList_New(0);
+               if (value == NULL)
+                       PLy_elog(ERROR, "could not create new list");
+
+               key = NameStr(typeinfo->attrs[i]->attname);
+               PyDict_SetItemString(dict, key, value);
+               Py_DECREF(value);
+
+               /* We want fast access to the lists, so we store them in our 
array of pointers */
+               lists[i] = value;
+       }
+
+       MemoryContextSwitchTo(old_mctx);
+       PLy_switch_execution_context(old_exec_ctx);
+}
+
+void
+PLy_CSDestroy(DestReceiver *self)
+{
+       CallbackState *myState = (CallbackState *) self;
+       
+       MemoryContextDelete(myState->mctx);
+}
+
+static bool
+PLy_CSreceive(TupleTableSlot *slot, DestReceiver *self)
+{
+       volatile TupleDesc      desc = slot->tts_tupleDescriptor;
+       volatile CallbackState *myState = (CallbackState *) self;
+       volatile PLyTypeInfo *args = myState->args;
+
+       PLyExecutionContext *old_exec_ctx = 
PLy_switch_execution_context(myState->exec_ctx);
+       MemoryContext scratch_context = 
PLy_get_scratch_context(myState->exec_ctx);
+       MemoryContext oldcontext = CurrentMemoryContext;
+
+       /* Verify saved state matches incoming slot */
+       Assert(myState->desc == desc);
+       Assert(args->in.r.natts == desc->natts);
+
+       /* Make sure the tuple is fully deconstructed */
+       slot_getallattrs(slot);
+
+       MemoryContextSwitchTo(scratch_context);
+
+       PG_TRY();
+       {
+               int                     i, rv;
+
+               /*
+                * Do the work in the scratch context to avoid leaking memory 
from the
+                * datatype output function calls.
+                */
+               for (i = 0; i < desc->natts; i++)
+               {
+                       PyObject   *value = NULL;
+
+                       if (desc->attrs[i]->attisdropped)
+                               continue;
+
+                       if (myState->lists[i] == NULL)
+                               ereport(ERROR,
+                                               (errmsg("missing list for 
attribute %d", i)));
+                       /* XXX If the function can't be null, ditch that check 
*/
+                       if (slot->tts_isnull[i] || args->in.r.atts[i].func == 
NULL)
+                       {
+                               Py_INCREF(Py_None);
+                               value = Py_None;
+                       }
+                       else
+                               value = (args->in.r.atts[i].func) 
(&args->in.r.atts[i], slot->tts_values[i]);
+
+                       /*
+                        * If we tried to do this in the PG_CATCH we'd have to 
mark value
+                        * as volatile, but that won't work with PyList_Append, 
so just
+                        * test the error code after doing Py_DECREF().
+                        */
+                       rv = PyList_Append(myState->lists[i], value);
+                       Py_DECREF(value);
+                       
+                       if (rv != 0)
+                               ereport(ERROR,
+                                               (errmsg("unable to append value 
to list")));
+               }
+               MemoryContextSwitchTo(oldcontext);
+               /* Should we just do this once, for the whole tuple?? */
+               MemoryContextReset(scratch_context);
+       }
+       PG_CATCH();
+       {
+               MemoryContextSwitchTo(oldcontext);
+               PLy_switch_execution_context(old_exec_ctx);
+               PG_RE_THROW();
+       }
+       PG_END_TRY();
+       PLy_switch_execution_context(old_exec_ctx);
+
+       /* If we get here then we were successful */
+       return true;
+}
+
 static PyObject *
-PLy_spi_execute_query(char *query, long limit)
+PLy_spi_execute_query2(char *query, long limit)
 {
        int                     rv;
        volatile MemoryContext oldcontext;
@@ -384,6 +551,67 @@ PLy_spi_execute_query(char *query, long limit)
 }
 
 static PyObject *
+PLy_spi_execute_query(char *query, long limit)
+{
+       PLyExecutionContext *exec_ctx = PLy_current_execution_context();
+       int                     rv;
+       volatile MemoryContext oldcontext, cb_ctx;
+       volatile ResourceOwner oldowner;
+       PyObject   *ret = NULL;
+       CallbackState   *callback;
+
+       oldowner = CurrentResourceOwner;
+
+       /* Use a new context to make cleanup easier */
+       cb_ctx = AllocSetContextCreate(CurrentMemoryContext,
+                                                               "PL/Python 
callback context",
+                                                               
ALLOCSET_DEFAULT_SIZES);
+
+       oldcontext = MemoryContextSwitchTo(cb_ctx);
+       callback = palloc0(sizeof(CallbackState));
+       callback->mctx = cb_ctx;
+       memcpy(&(callback->pub), CreateDestReceiver(DestSPICallback), 
sizeof(DestReceiver));
+       callback->pub.receiveSlot = PLy_CSreceive;
+       callback->pub.rStartup = PLy_CSStartup;
+       callback->pub.rDestroy = PLy_CSDestroy;
+       callback->exec_ctx = exec_ctx;
+
+       PLy_spi_subtransaction_begin(oldcontext, oldowner);
+
+       PG_TRY();
+       {
+
+               pg_verifymbstr(query, strlen(query), false);
+               rv = SPI_execute_callback(query, 
exec_ctx->curr_proc->fn_readonly, limit,
+                               (DestReceiver *) callback);
+               /*
+                * callback->dict gets set in PLy_CSStartup, which happens 
during
+                * executor startup. It's not valid before then.
+                */
+               ret = callback->dict;
+
+               PLy_spi_subtransaction_commit(oldcontext, oldowner);
+       }
+       PG_CATCH();
+       {
+               PLy_spi_subtransaction_abort(oldcontext, oldowner);
+               return NULL;
+       }
+       PG_END_TRY();
+
+       if (rv < 0)
+       {
+               Py_XDECREF(ret);
+               PLy_exception_set(PLy_exc_spi_error,
+                                                 "SPI_execute failed: %s",
+                                                 SPI_result_code_string(rv));
+               return NULL;
+       }
+
+       return ret;
+}
+
+static PyObject *
 PLy_spi_execute_fetch_result(SPITupleTable *tuptable, uint64 rows, int status)
 {
        PLyResultObject *result;
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to