In the attached patch (snippet below), I'm seeing something strange with
args->in.r.atts[]. Prior to entering the PG_TRY block, I can inspect
things in lldb just fine:
(lldb) call args->in.r.atts[0].func
(PLyDatumToObFunc) $49 = 0x000000010fc4dc70
(plpython3.so`PLyString_FromDatum at plpy_typeio.c:621)
(lldb) call &args->in.r.atts[0]
(PLyDatumToOb *) $50 = 0x00007fd2b302f6d0
(lldb) call args->in.r.atts[0]
(PLyDatumToOb) $51 = {
func = 0x000000010fc4dc70 (plpython3.so`PLyString_FromDatum at
plpy_typeio.c:621)
typfunc = {
fn_addr = 0x000000010f478b50 (postgres`textout at varlena.c:521)
fn_oid = 47
...
But I'm getting a EXC_BAD_ACCESS (code=1, address=0xb302f6d0) on the
last if in the snippet below. Looking at the variables again, I see:
(lldb) call args->in.r.atts[i].func
error: Couldn't apply expression side effects : Couldn't dematerialize a
result variable: couldn't read its memory
(lldb) call args->in.r.atts[i]
error: Couldn't apply expression side effects : Couldn't dematerialize a
result variable: couldn't read its memory
I saw the comment on PG_TRY about marking things as volatile, but my
understanding from the comment is I shouldn't even need to do that,
since these variables aren't being modified.
static bool
PLy_CSreceive(TupleTableSlot *slot, DestReceiver *self)
{
volatile TupleDesc desc = slot->tts_tupleDescriptor;
volatile CallbackState *myState = (CallbackState *) self;
volatile PLyTypeInfo *args = myState->args;
PLyExecutionContext *old_exec_ctx =
PLy_switch_execution_context(myState->exec_ctx);
MemoryContext scratch_context =
PLy_get_scratch_context(myState->exec_ctx);
MemoryContext oldcontext = CurrentMemoryContext;
/* Verify saved state matches incoming slot */
Assert(myState->desc == desc);
Assert(args->in.r.natts == desc->natts);
/* Make sure the tuple is fully deconstructed */
slot_getallattrs(slot);
MemoryContextSwitchTo(scratch_context);
PG_TRY();
{
int i, rv;
/*
* Do the work in the scratch context to avoid leaking memory
from the
* datatype output function calls.
*/
for (i = 0; i < desc->natts; i++)
{
PyObject *value = NULL;
if (desc->attrs[i]->attisdropped)
continue;
if (myState->lists[i] == NULL)
ereport(ERROR,
(errmsg("missing list for attribute
%d", i)));
/* XXX If the function can't be null, ditch that check
*/
if (slot->tts_isnull[i] || args->in.r.atts[i].func ==
NULL)
--
Jim Nasby, Data Architect, Blue Treble Consulting, Austin TX
Experts in Analytics, Data Architecture and PostgreSQL
Data in Trouble? Get it in Treble! http://BlueTreble.com
855-TREBLE2 (855-873-2532)
diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c
index 80fc4c4725..ace30d75f0 100644
--- a/src/backend/executor/spi.c
+++ b/src/backend/executor/spi.c
@@ -55,7 +55,8 @@ static void _SPI_prepare_oneshot_plan(const char *src,
SPIPlanPtr plan);
static int _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
Snapshot snapshot, Snapshot
crosscheck_snapshot,
- bool read_only, bool fire_triggers, uint64
tcount);
+ bool read_only, bool fire_triggers, uint64
tcount,
+ DestReceiver *callback);
static ParamListInfo _SPI_convert_params(int nargs, Oid *argtypes,
Datum *Values, const char *Nulls);
@@ -320,7 +321,34 @@ SPI_execute(const char *src, bool read_only, long tcount)
res = _SPI_execute_plan(&plan, NULL,
InvalidSnapshot,
InvalidSnapshot,
- read_only, true,
tcount);
+ read_only, true,
tcount, NULL);
+
+ _SPI_end_call(true);
+ return res;
+}
+int
+SPI_execute_callback(const char *src, bool read_only, long tcount,
+ DestReceiver *callback)
+{
+ _SPI_plan plan;
+ int res;
+
+ if (src == NULL || tcount < 0)
+ return SPI_ERROR_ARGUMENT;
+
+ res = _SPI_begin_call(true);
+ if (res < 0)
+ return res;
+
+ memset(&plan, 0, sizeof(_SPI_plan));
+ plan.magic = _SPI_PLAN_MAGIC;
+ plan.cursor_options = 0;
+
+ _SPI_prepare_oneshot_plan(src, &plan);
+
+ res = _SPI_execute_plan(&plan, NULL,
+ InvalidSnapshot,
InvalidSnapshot,
+ read_only, true,
tcount, callback);
_SPI_end_call(true);
return res;
@@ -354,7 +382,34 @@ SPI_execute_plan(SPIPlanPtr plan, Datum *Values, const
char *Nulls,
_SPI_convert_params(plan->nargs, plan->argtypes,
Values, Nulls),
InvalidSnapshot,
InvalidSnapshot,
- read_only, true,
tcount);
+ read_only, true,
tcount, NULL);
+
+ _SPI_end_call(true);
+ return res;
+}
+
+/* Execute a previously prepared plan with a callback Destination */
+int
+SPI_execute_plan_callback(SPIPlanPtr plan, Datum *Values, const char *Nulls,
+ bool read_only, long tcount, DestReceiver
*callback)
+{
+ int res;
+
+ if (plan == NULL || plan->magic != _SPI_PLAN_MAGIC || tcount < 0)
+ return SPI_ERROR_ARGUMENT;
+
+ if (plan->nargs > 0 && Values == NULL)
+ return SPI_ERROR_PARAM;
+
+ res = _SPI_begin_call(true);
+ if (res < 0)
+ return res;
+
+ res = _SPI_execute_plan(plan,
+
_SPI_convert_params(plan->nargs, plan->argtypes,
+
Values, Nulls),
+ InvalidSnapshot,
InvalidSnapshot,
+ read_only, true,
tcount, callback);
_SPI_end_call(true);
return res;
@@ -383,7 +438,7 @@ SPI_execute_plan_with_paramlist(SPIPlanPtr plan,
ParamListInfo params,
res = _SPI_execute_plan(plan, params,
InvalidSnapshot,
InvalidSnapshot,
- read_only, true,
tcount);
+ read_only, true,
tcount, NULL);
_SPI_end_call(true);
return res;
@@ -424,7 +479,7 @@ SPI_execute_snapshot(SPIPlanPtr plan,
_SPI_convert_params(plan->nargs, plan->argtypes,
Values, Nulls),
snapshot,
crosscheck_snapshot,
- read_only,
fire_triggers, tcount);
+ read_only,
fire_triggers, tcount, NULL);
_SPI_end_call(true);
return res;
@@ -471,7 +526,7 @@ SPI_execute_with_args(const char *src,
res = _SPI_execute_plan(&plan, paramLI,
InvalidSnapshot,
InvalidSnapshot,
- read_only, true,
tcount);
+ read_only, true,
tcount, NULL);
_SPI_end_call(true);
return res;
@@ -1892,7 +1947,8 @@ _SPI_prepare_oneshot_plan(const char *src, SPIPlanPtr
plan)
static int
_SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
Snapshot snapshot, Snapshot
crosscheck_snapshot,
- bool read_only, bool fire_triggers, uint64
tcount)
+ bool read_only, bool fire_triggers, uint64
tcount,
+ DestReceiver *callback)
{
int my_res = 0;
uint64 my_processed = 0;
@@ -1903,6 +1959,7 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
ErrorContextCallback spierrcontext;
CachedPlan *cplan = NULL;
ListCell *lc1;
+ DestReceiver *dest = callback;
/*
* Setup error traceback support for ereport()
@@ -2020,7 +2077,6 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
{
Node *stmt = (Node *) lfirst(lc2);
bool canSetTag;
- DestReceiver *dest;
_SPI_current->processed = 0;
_SPI_current->lastoid = InvalidOid;
@@ -2072,7 +2128,8 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
UpdateActiveSnapshotCommandId();
}
- dest = CreateDestReceiver(canSetTag ? DestSPI :
DestNone);
+ if (!callback)
+ dest = CreateDestReceiver(canSetTag ? DestSPI :
DestNone);
if (IsA(stmt, PlannedStmt) &&
((PlannedStmt *) stmt)->utilityStmt == NULL)
@@ -2098,6 +2155,7 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
{
char
completionTag[COMPLETION_TAG_BUFSIZE];
+ // XXX throw error if callback is set
ProcessUtility(stmt,
plansource->query_string,
PROCESS_UTILITY_QUERY,
diff --git a/src/backend/tcop/dest.c b/src/backend/tcop/dest.c
index 60a92801f1..20b6dbf4ff 100644
--- a/src/backend/tcop/dest.c
+++ b/src/backend/tcop/dest.c
@@ -81,6 +81,11 @@ static DestReceiver spi_printtupDR = {
DestSPI
};
+static DestReceiver spi_callbackDR = {
+ donothingReceive, donothingStartup, donothingCleanup, donothingCleanup,
+ DestSPICallback
+};
+
/* Globally available receiver for DestNone */
DestReceiver *None_Receiver = &donothingDR;
@@ -117,6 +122,9 @@ CreateDestReceiver(CommandDest dest)
case DestSPI:
return &spi_printtupDR;
+ case DestSPICallback:
+ return &spi_callbackDR;
+
case DestTuplestore:
return CreateTuplestoreDestReceiver();
diff --git a/src/include/executor/spi.h b/src/include/executor/spi.h
index 76ba394a2b..15187c47e3 100644
--- a/src/include/executor/spi.h
+++ b/src/include/executor/spi.h
@@ -74,11 +74,15 @@ extern PGDLLIMPORT int SPI_result;
extern int SPI_connect(void);
extern int SPI_finish(void);
extern int SPI_execute(const char *src, bool read_only, long tcount);
+extern int SPI_execute_callback(const char *src, bool read_only, long
tcount,
+
DestReceiver *callback);
extern int SPI_execute_plan(SPIPlanPtr plan, Datum *Values, const char *Nulls,
bool read_only, long tcount);
extern int SPI_execute_plan_with_paramlist(SPIPlanPtr plan,
ParamListInfo
params,
bool read_only,
long tcount);
+extern int SPI_execute_plan_callback(SPIPlanPtr plan, Datum *Values, const
char *Nulls,
+ bool read_only, long tcount, DestReceiver
*callback);
extern int SPI_exec(const char *src, long tcount);
extern int SPI_execp(SPIPlanPtr plan, Datum *Values, const char *Nulls,
long tcount);
diff --git a/src/include/tcop/dest.h b/src/include/tcop/dest.h
index dd80433f74..5389f496a2 100644
--- a/src/include/tcop/dest.h
+++ b/src/include/tcop/dest.h
@@ -90,6 +90,7 @@ typedef enum
DestRemote, /* results sent to
frontend process */
DestRemoteExecute, /* sent to frontend, in Execute
command */
DestSPI, /* results sent to SPI
manager */
+ DestSPICallback, /* results sent to
user-specified callback function */
DestTuplestore, /* results sent to Tuplestore */
DestIntoRel, /* results sent to relation
(SELECT INTO) */
DestCopyOut, /* results sent to COPY TO code
*/
diff --git a/src/pl/plpython/plpy_main.c b/src/pl/plpython/plpy_main.c
index 860b804e54..07501f18f2 100644
--- a/src/pl/plpython/plpy_main.c
+++ b/src/pl/plpython/plpy_main.c
@@ -403,6 +403,19 @@ PLy_current_execution_context(void)
return PLy_execution_contexts;
}
+PLyExecutionContext *
+PLy_switch_execution_context(PLyExecutionContext *new)
+{
+ PLyExecutionContext *last = PLy_execution_contexts;
+
+ if (PLy_execution_contexts == NULL)
+ elog(ERROR, "no Python function is currently executing");
+
+ PLy_execution_contexts = new;
+
+ return last;
+}
+
MemoryContext
PLy_get_scratch_context(PLyExecutionContext *context)
{
diff --git a/src/pl/plpython/plpy_main.h b/src/pl/plpython/plpy_main.h
index 10426c4323..7cbe0a8d35 100644
--- a/src/pl/plpython/plpy_main.h
+++ b/src/pl/plpython/plpy_main.h
@@ -25,6 +25,9 @@ typedef struct PLyExecutionContext
/* Get the current execution context */
extern PLyExecutionContext *PLy_current_execution_context(void);
+/* Get switch execution contexts */
+extern PLyExecutionContext *PLy_switch_execution_context(PLyExecutionContext
*new);
+
/* Get the scratch memory context for specified execution context */
extern MemoryContext PLy_get_scratch_context(PLyExecutionContext *context);
diff --git a/src/pl/plpython/plpy_spi.c b/src/pl/plpython/plpy_spi.c
index 07ab6a087e..a2096b6506 100644
--- a/src/pl/plpython/plpy_spi.c
+++ b/src/pl/plpython/plpy_spi.c
@@ -28,8 +28,27 @@
#include "plpy_procedure.h"
#include "plpy_resultobject.h"
+typedef struct
+{
+ DestReceiver pub;
+ PLyExecutionContext *exec_ctx;
+ MemoryContext mctx;
+ TupleDesc desc;
+ PLyTypeInfo *args;
+
+ /* Dictionary of Lists */
+ PyObject *dict;
+ PyObject **lists;
+} CallbackState;
+
+
+
+void PLy_CSStartup(DestReceiver *self, int operation, TupleDesc typeinfo);
+void PLy_CSDestroy(DestReceiver *self);
+static bool PLy_CSreceive(TupleTableSlot *slot, DestReceiver *self);
static PyObject *PLy_spi_execute_query(char *query, long limit);
+static PyObject *PLy_spi_execute_query2(char *query, long limit);
static PyObject *PLy_spi_execute_plan(PyObject *ob, PyObject *list, long
limit);
static PyObject *PLy_spi_execute_fetch_result(SPITupleTable *tuptable,
uint64 rows, int
status);
@@ -341,8 +360,156 @@ PLy_spi_execute_plan(PyObject *ob, PyObject *list, long
limit)
return ret;
}
+void
+PLy_CSStartup(DestReceiver *self, int operation, TupleDesc typeinfo)
+{
+ PLyExecutionContext *old_exec_ctx;
+ CallbackState *myState = (CallbackState *) self;
+ PLyTypeInfo args;
+
+ MemoryContext mctx, old_mctx;
+ PyObject *dict;
+ PyObject **lists;
+ int i;
+
+ /*
+ * We may be in a different execution context when we're called, so
switch
+ * back to our original one.
+ */
+ mctx = myState->mctx;
+ old_exec_ctx = PLy_switch_execution_context(myState->exec_ctx);
+ old_mctx = MemoryContextSwitchTo(mctx);
+
+ /* Store this as a sanity check */
+ myState->desc = typeinfo;
+
+ /* Setup type conversion info */
+ PLy_typeinfo_init(&args, mctx);
+ myState->args = &args;
+ PLy_input_tuple_funcs(&args, typeinfo);
+
+ /*
+ * We never palloc python objects, but this is an array of object
pointers,
+ * so it's OK.
+ */
+ myState->lists = lists = palloc0(args.in.r.natts * sizeof(PyObject *));
+
+ myState->dict = dict = PyDict_New();
+ if (dict == NULL)
+ PLy_elog(ERROR, "could not create new dictionary");
+
+
+ for (i = 0; i < args.in.r.natts; i++)
+ {
+ char *key;
+ PyObject *value;
+
+ if (typeinfo->attrs[i]->attisdropped)
+ continue;
+
+ /* NOTE: If size is > 0 then the list must get initialized! */
+ value = PyList_New(0);
+ if (value == NULL)
+ PLy_elog(ERROR, "could not create new list");
+
+ key = NameStr(typeinfo->attrs[i]->attname);
+ PyDict_SetItemString(dict, key, value);
+ Py_DECREF(value);
+
+ /* We want fast access to the lists, so we store them in our
array of pointers */
+ lists[i] = value;
+ }
+
+ MemoryContextSwitchTo(old_mctx);
+ PLy_switch_execution_context(old_exec_ctx);
+}
+
+void
+PLy_CSDestroy(DestReceiver *self)
+{
+ CallbackState *myState = (CallbackState *) self;
+
+ MemoryContextDelete(myState->mctx);
+}
+
+static bool
+PLy_CSreceive(TupleTableSlot *slot, DestReceiver *self)
+{
+ volatile TupleDesc desc = slot->tts_tupleDescriptor;
+ volatile CallbackState *myState = (CallbackState *) self;
+ volatile PLyTypeInfo *args = myState->args;
+
+ PLyExecutionContext *old_exec_ctx =
PLy_switch_execution_context(myState->exec_ctx);
+ MemoryContext scratch_context =
PLy_get_scratch_context(myState->exec_ctx);
+ MemoryContext oldcontext = CurrentMemoryContext;
+
+ /* Verify saved state matches incoming slot */
+ Assert(myState->desc == desc);
+ Assert(args->in.r.natts == desc->natts);
+
+ /* Make sure the tuple is fully deconstructed */
+ slot_getallattrs(slot);
+
+ MemoryContextSwitchTo(scratch_context);
+
+ PG_TRY();
+ {
+ int i, rv;
+
+ /*
+ * Do the work in the scratch context to avoid leaking memory
from the
+ * datatype output function calls.
+ */
+ for (i = 0; i < desc->natts; i++)
+ {
+ PyObject *value = NULL;
+
+ if (desc->attrs[i]->attisdropped)
+ continue;
+
+ if (myState->lists[i] == NULL)
+ ereport(ERROR,
+ (errmsg("missing list for
attribute %d", i)));
+ /* XXX If the function can't be null, ditch that check
*/
+ if (slot->tts_isnull[i] || args->in.r.atts[i].func ==
NULL)
+ {
+ Py_INCREF(Py_None);
+ value = Py_None;
+ }
+ else
+ value = (args->in.r.atts[i].func)
(&args->in.r.atts[i], slot->tts_values[i]);
+
+ /*
+ * If we tried to do this in the PG_CATCH we'd have to
mark value
+ * as volatile, but that won't work with PyList_Append,
so just
+ * test the error code after doing Py_DECREF().
+ */
+ rv = PyList_Append(myState->lists[i], value);
+ Py_DECREF(value);
+
+ if (rv != 0)
+ ereport(ERROR,
+ (errmsg("unable to append value
to list")));
+ }
+ MemoryContextSwitchTo(oldcontext);
+ /* Should we just do this once, for the whole tuple?? */
+ MemoryContextReset(scratch_context);
+ }
+ PG_CATCH();
+ {
+ MemoryContextSwitchTo(oldcontext);
+ PLy_switch_execution_context(old_exec_ctx);
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+ PLy_switch_execution_context(old_exec_ctx);
+
+ /* If we get here then we were successful */
+ return true;
+}
+
static PyObject *
-PLy_spi_execute_query(char *query, long limit)
+PLy_spi_execute_query2(char *query, long limit)
{
int rv;
volatile MemoryContext oldcontext;
@@ -384,6 +551,67 @@ PLy_spi_execute_query(char *query, long limit)
}
static PyObject *
+PLy_spi_execute_query(char *query, long limit)
+{
+ PLyExecutionContext *exec_ctx = PLy_current_execution_context();
+ int rv;
+ volatile MemoryContext oldcontext, cb_ctx;
+ volatile ResourceOwner oldowner;
+ PyObject *ret = NULL;
+ CallbackState *callback;
+
+ oldowner = CurrentResourceOwner;
+
+ /* Use a new context to make cleanup easier */
+ cb_ctx = AllocSetContextCreate(CurrentMemoryContext,
+ "PL/Python
callback context",
+
ALLOCSET_DEFAULT_SIZES);
+
+ oldcontext = MemoryContextSwitchTo(cb_ctx);
+ callback = palloc0(sizeof(CallbackState));
+ callback->mctx = cb_ctx;
+ memcpy(&(callback->pub), CreateDestReceiver(DestSPICallback),
sizeof(DestReceiver));
+ callback->pub.receiveSlot = PLy_CSreceive;
+ callback->pub.rStartup = PLy_CSStartup;
+ callback->pub.rDestroy = PLy_CSDestroy;
+ callback->exec_ctx = exec_ctx;
+
+ PLy_spi_subtransaction_begin(oldcontext, oldowner);
+
+ PG_TRY();
+ {
+
+ pg_verifymbstr(query, strlen(query), false);
+ rv = SPI_execute_callback(query,
exec_ctx->curr_proc->fn_readonly, limit,
+ (DestReceiver *) callback);
+ /*
+ * callback->dict gets set in PLy_CSStartup, which happens
during
+ * executor startup. It's not valid before then.
+ */
+ ret = callback->dict;
+
+ PLy_spi_subtransaction_commit(oldcontext, oldowner);
+ }
+ PG_CATCH();
+ {
+ PLy_spi_subtransaction_abort(oldcontext, oldowner);
+ return NULL;
+ }
+ PG_END_TRY();
+
+ if (rv < 0)
+ {
+ Py_XDECREF(ret);
+ PLy_exception_set(PLy_exc_spi_error,
+ "SPI_execute failed: %s",
+ SPI_result_code_string(rv));
+ return NULL;
+ }
+
+ return ret;
+}
+
+static PyObject *
PLy_spi_execute_fetch_result(SPITupleTable *tuptable, uint64 rows, int status)
{
PLyResultObject *result;
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers