Hi, attached is a patch implementing the usage of SPI cursors in PL/Python. Currently when trying to process a large table in PL/Python you have slurp it all into memory (that's what plpy.execute does).
This patch allows reading the result set in smaller chunks, using a SPI cursor behind the scenes. Example usage: cursor = plpy.cursor("select a, b from hugetable") for row in cursor: plpy.info("a is %s and b is %s" % (row['a'], row['b'])) The patch itself is simple, but there's a lot of boilerplate dedicated to opening a subtransaction and handling prepared plans. I'd like to do some refactoring of they way PL/Python uses SPI to reduce the amount of boilerplate needed, but that'll come as a separate patch (just before the patch to split plpython.c in smaller chunks). This feature has been sponsored by Nomao. Cheers, Jan PS: I already added it to the November CF. J
>From 9ad14957e7b4ae19667df3bb8cc2aa5ef5bf96c8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Urba=C5=84ski?= <wulc...@wulczer.org> Date: Tue, 13 Sep 2011 14:42:41 +0200 Subject: [PATCH] Add cursor support to plpythonu. Exposes SPI cursors as plpythonu objects allowing processing large result sets without loading them entirely into memory, as plpy.execute is doing. --- doc/src/sgml/plpython.sgml | 80 ++++ src/pl/plpython/expected/plpython_spi.out | 151 +++++++ src/pl/plpython/expected/plpython_test.out | 6 +- src/pl/plpython/plpython.c | 605 ++++++++++++++++++++++++++++ src/pl/plpython/sql/plpython_spi.sql | 116 ++++++ 5 files changed, 955 insertions(+), 3 deletions(-) diff --git a/doc/src/sgml/plpython.sgml b/doc/src/sgml/plpython.sgml index eda2bbf..d08c3d1 100644 *** a/doc/src/sgml/plpython.sgml --- b/doc/src/sgml/plpython.sgml *************** $$ LANGUAGE plpythonu; *** 892,897 **** --- 892,906 ---- </para> <para> + Note that calling <literal>plpy.execute</literal> will cause the entire + result set to be read into memory. Only use that function when you are sure + that the result set will be relatively small. If you don't want to risk + excessive memory usage when fetching large results, + use <literal>plpy.cursor</literal> rather + than <literal>plpy.execute</literal>. + </para> + + <para> For example: <programlisting> rv = plpy.execute("SELECT * FROM my_table", 5) *************** $$ LANGUAGE plpythonu; *** 958,963 **** --- 967,1043 ---- </sect2> + <sect2> + <title>Accessing data with cursors</title> + + <para> + The <literal>plpy.cursor</literal> function accepts the same arguments + as <literal>plpy.execute</literal> (except for <literal>limit</literal>) + and returns a cursor object, which allows you to process large result sets + in smaller chunks. As with <literal>plpy.execute</literal>, either a query + string or a plan object along with a list of arguments can be used. The + cursor object provides a <literal>fetch</literal> method that accepts an + integer paramter and returns a result object. Each time you + call <literal>fetch</literal>, the returned object will contain the next + batch of rows, never larger than the parameter value. Once all rows are + exhausted, <literal>fetch</literal> starts returning an empty result + object. Cursor objects also provide an + <ulink url="http://docs.python.org/library/stdtypes.html#iterator-types">iterator + interface</ulink>, yielding one row at a time until all rows are exhausted. + Data fetched that way is not returned as result objects, but rather as + dictionaries, each dictionary corresponding to a single result row. + </para> + + <para> + Cursors are automatically disposed of, but if you want to explicitly + release all resources held by a cursor, use the <literal>close</literal> + method. Once closed, a cursor cannot be fetched from anymore. + </para> + + <note> + <para> + Do not confuse objects created by <literal>plpy.cursor</literal> with + DBAPI cursors as defined by + the <ulink url="http://www.python.org/dev/peps/pep-0249/">Python Database API specification</ulink>. + They don't have anything in common except for the name. + </para> + </note> + + <para> + An example of two ways of processing data from a large table would be: + <programlisting> + CREATE FUNCTION count_odd_iterator() RETURNS integer AS $$ + odd = 0 + for row in plpy.cursor("select num from largetable"): + if row['num'] % 2: + odd += 1 + return odd + $$ LANGUAGE plpythonu; + + CREATE FUNCTION count_odd_fetch(batch_size integer) RETURNS integer AS $$ + odd = 0 + cursor = plpy.cursor("select num from largetable") + while True: + rows = cursor.fetch(batch_size) + if not rows: + break + for row in rows: + if row['num'] % 2: + odd += 1 + return odd + $$ LANGUAGE plpythonu; + + CREATE FUNCTION count_odd_prepared() RETURNS integer AS $$ + odd = 0 + plan = plpy.prepare("select num from largetable where num % $1 != 0", ["integer"]) + rows = list(plpy.cursor(plan, [2])) + + return len(rows) + $$ LANGUAGE plpythonu; + </programlisting> + </para> + </sect2> + <sect2 id="plpython-trapping"> <title>Trapping Errors</title> diff --git a/src/pl/plpython/expected/plpython_spi.out b/src/pl/plpython/expected/plpython_spi.out index 7f4ae5c..3b4d7a3 100644 *** a/src/pl/plpython/expected/plpython_spi.out --- b/src/pl/plpython/expected/plpython_spi.out *************** CONTEXT: PL/Python function "result_nro *** 133,135 **** --- 133,286 ---- 2 (1 row) + -- cursor objects + CREATE FUNCTION simple_cursor_test() RETURNS int AS $$ + res = plpy.cursor("select fname, lname from users") + does = 0 + for row in res: + if row['lname'] == 'doe': + does += 1 + return does + $$ LANGUAGE plpythonu; + CREATE FUNCTION double_cursor_close() RETURNS int AS $$ + res = plpy.cursor("select fname, lname from users") + res.close() + res.close() + $$ LANGUAGE plpythonu; + CREATE FUNCTION cursor_fetch() RETURNS int AS $$ + res = plpy.cursor("select fname, lname from users") + assert len(res.fetch(3)) == 3 + assert len(res.fetch(3)) == 1 + assert len(res.fetch(3)) == 0 + assert len(res.fetch(3)) == 0 + try: + # use next() or __next__(), the method name changed in + # http://www.python.org/dev/peps/pep-3114/ + try: + res.next() + except AttributeError: + res.__next__() + except StopIteration: + pass + else: + assert False, "StopIteration not raised" + $$ LANGUAGE plpythonu; + CREATE FUNCTION cursor_mix_next_and_fetch() RETURNS int AS $$ + res = plpy.cursor("select fname, lname from users order by fname") + assert len(res.fetch(2)) == 2 + + item = None + try: + item = res.next() + except AttributeError: + item = res.__next__() + assert item['fname'] == 'rick' + + assert len(res.fetch(2)) == 1 + $$ LANGUAGE plpythonu; + CREATE FUNCTION fetch_after_close() RETURNS int AS $$ + res = plpy.cursor("select fname, lname from users") + res.close() + try: + res.fetch(1) + except ValueError: + pass + else: + assert False, "ValueError not raised" + $$ LANGUAGE plpythonu; + CREATE FUNCTION next_after_close() RETURNS int AS $$ + res = plpy.cursor("select fname, lname from users") + res.close() + try: + try: + res.next() + except AttributeError: + res.__next__() + except ValueError: + pass + else: + assert False, "ValueError not raised" + $$ LANGUAGE plpythonu; + CREATE FUNCTION cursor_fetch_next_empty() RETURNS int AS $$ + res = plpy.cursor("select fname, lname from users where false") + assert len(res.fetch(1)) == 0 + try: + try: + res.next() + except AttributeError: + res.__next__() + except StopIteration: + pass + else: + assert False, "StopIteration not raised" + $$ LANGUAGE plpythonu; + CREATE FUNCTION cursor_plan() RETURNS SETOF text AS $$ + plan = plpy.prepare( + "select fname, lname from users where fname like $1 || '%' order by fname", + ["text"]) + for row in plpy.cursor(plan, ["w"]): + yield row['fname'] + for row in plpy.cursor(plan, ["j"]): + yield row['fname'] + $$ LANGUAGE plpythonu; + CREATE FUNCTION cursor_plan_wrong_args() RETURNS SETOF text AS $$ + plan = plpy.prepare("select fname, lname from users where fname like $1 || '%'", + ["text"]) + c = plpy.cursor(plan, ["a", "b"]) + $$ LANGUAGE plpythonu; + SELECT simple_cursor_test(); + simple_cursor_test + -------------------- + 3 + (1 row) + + SELECT double_cursor_close(); + double_cursor_close + --------------------- + + (1 row) + + SELECT cursor_fetch(); + cursor_fetch + -------------- + + (1 row) + + SELECT cursor_mix_next_and_fetch(); + cursor_mix_next_and_fetch + --------------------------- + + (1 row) + + SELECT fetch_after_close(); + fetch_after_close + ------------------- + + (1 row) + + SELECT next_after_close(); + next_after_close + ------------------ + + (1 row) + + SELECT cursor_fetch_next_empty(); + cursor_fetch_next_empty + ------------------------- + + (1 row) + + SELECT cursor_plan(); + cursor_plan + ------------- + willem + jane + john + (3 rows) + + SELECT cursor_plan_wrong_args(); + ERROR: TypeError: Expected sequence of 1 argument, got 2: ['a', 'b'] + CONTEXT: Traceback (most recent call last): + PL/Python function "cursor_plan_wrong_args", line 4, in <module> + c = plpy.cursor(plan, ["a", "b"]) + PL/Python function "cursor_plan_wrong_args" diff --git a/src/pl/plpython/expected/plpython_test.out b/src/pl/plpython/expected/plpython_test.out index f2dda66..a884fc0 100644 *** a/src/pl/plpython/expected/plpython_test.out --- b/src/pl/plpython/expected/plpython_test.out *************** contents.sort() *** 43,51 **** return ", ".join(contents) $$ LANGUAGE plpythonu; select module_contents(); ! module_contents ! ---------------------------------------------------------------------------------------------------------------------------------------------------------------------- ! Error, Fatal, SPIError, debug, error, execute, fatal, info, log, notice, prepare, quote_ident, quote_literal, quote_nullable, spiexceptions, subtransaction, warning (1 row) CREATE FUNCTION elog_test() RETURNS void --- 43,51 ---- return ", ".join(contents) $$ LANGUAGE plpythonu; select module_contents(); ! module_contents ! ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ ! Error, Fatal, SPIError, cursor, debug, error, execute, fatal, info, log, notice, prepare, quote_ident, quote_literal, quote_nullable, spiexceptions, subtransaction, warning (1 row) CREATE FUNCTION elog_test() RETURNS void diff --git a/src/pl/plpython/plpython.c b/src/pl/plpython/plpython.c index 93e8043..d56b2d7 100644 *** a/src/pl/plpython/plpython.c --- b/src/pl/plpython/plpython.c *************** typedef int Py_ssize_t; *** 134,139 **** --- 134,144 ---- PyObject_HEAD_INIT(type) size, #endif + /* Python 3 removed the Py_TPFLAGS_HAVE_ITER flag */ + #if PY_MAJOR_VERSION >= 3 + #define Py_TPFLAGS_HAVE_ITER 0 + #endif + /* define our text domain for translations */ #undef TEXTDOMAIN #define TEXTDOMAIN PG_TEXTDOMAIN("plpython") *************** typedef struct PLySubtransactionObject *** 310,315 **** --- 315,328 ---- bool exited; } PLySubtransactionObject; + typedef struct PLyCursorObject + { + PyObject_HEAD + Portal portal; + PLyTypeInfo result; + bool closed; + } PLyCursorObject; + /* A list of all known exceptions, generated from backend/utils/errcodes.txt */ typedef struct ExceptionMap { *************** static char PLy_subtransaction_doc[] = { *** 486,491 **** --- 499,508 ---- "PostgreSQL subtransaction context manager" }; + static char PLy_cursor_doc[] = { + "Wrapper around a PostgreSQL cursor" + }; + /* * the function definitions *************** static void PLy_subtransaction_dealloc(P *** 2963,2968 **** --- 2980,2993 ---- static PyObject *PLy_subtransaction_enter(PyObject *, PyObject *); static PyObject *PLy_subtransaction_exit(PyObject *, PyObject *); + static PyObject *PLy_cursor(PyObject *, PyObject *); + static PyObject *PLy_cursor_query(char *); + static PyObject *PLy_cursor_plan(PyObject *, PyObject *); + static void PLy_cursor_dealloc(PyObject *); + static PyObject *PLy_cursor_iternext(PyObject *); + static PyObject *PLy_cursor_fetch(PyObject *, PyObject *); + static PyObject *PLy_cursor_close(PyObject *, PyObject *); + static PyMethodDef PLy_plan_methods[] = { {"status", PLy_plan_status, METH_VARARGS, NULL}, *************** static PyTypeObject PLy_SubtransactionTy *** 3099,3104 **** --- 3124,3170 ---- PLy_subtransaction_methods, /* tp_tpmethods */ }; + static PyMethodDef PLy_cursor_methods[] = { + {"fetch", PLy_cursor_fetch, METH_VARARGS, NULL}, + {"close", PLy_cursor_close, METH_NOARGS, NULL}, + {NULL, NULL, 0, NULL} + }; + + static PyTypeObject PLy_CursorType = { + PyVarObject_HEAD_INIT(NULL, 0) + "PLyCursor", /* tp_name */ + sizeof(PLyCursorObject), /* tp_size */ + 0, /* tp_itemsize */ + + /* + * methods + */ + PLy_cursor_dealloc, /* tp_dealloc */ + 0, /* tp_print */ + 0, /* tp_getattr */ + 0, /* tp_setattr */ + 0, /* tp_compare */ + 0, /* tp_repr */ + 0, /* tp_as_number */ + 0, /* tp_as_sequence */ + 0, /* tp_as_mapping */ + 0, /* tp_hash */ + 0, /* tp_call */ + 0, /* tp_str */ + 0, /* tp_getattro */ + 0, /* tp_setattro */ + 0, /* tp_as_buffer */ + Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE | Py_TPFLAGS_HAVE_ITER, /* tp_flags */ + PLy_cursor_doc, /* tp_doc */ + 0, /* tp_traverse */ + 0, /* tp_clear */ + 0, /* tp_richcompare */ + 0, /* tp_weaklistoffset */ + PyObject_SelfIter, /* tp_iter */ + PLy_cursor_iternext, /* tp_iternext */ + PLy_cursor_methods, /* tp_tpmethods */ + }; + static PyMethodDef PLy_methods[] = { /* * logging methods *************** static PyMethodDef PLy_methods[] = { *** 3133,3138 **** --- 3199,3209 ---- */ {"subtransaction", PLy_subtransaction, METH_NOARGS, NULL}, + /* + * create a cursor + */ + {"cursor", PLy_cursor, METH_VARARGS, NULL}, + {NULL, NULL, 0, NULL} }; *************** PLy_spi_execute_fetch_result(SPITupleTab *** 3833,3838 **** --- 3904,4441 ---- return (PyObject *) result; } + /* + * c = plpy.cursor("select * from largetable") + * c = plpy.cursor(plan, []) + */ + static PyObject * + PLy_cursor(PyObject *self, PyObject *args) + { + char *query; + PyObject *plan; + PyObject *planargs = NULL; + + if (PyArg_ParseTuple(args, "s", &query)) + return PLy_cursor_query(query); + + PyErr_Clear(); + + if (PyArg_ParseTuple(args, "O|O", &plan, &planargs)) + return PLy_cursor_plan(plan, planargs); + + PLy_exception_set(PLy_exc_error, "plpy.cursor expected a query or a plan"); + return NULL; + } + + static PyObject * + PLy_cursor_query(char *query) + { + PLyCursorObject *cursor; + volatile MemoryContext oldcontext; + volatile ResourceOwner oldowner; + + if ((cursor = PyObject_New(PLyCursorObject, &PLy_CursorType)) == NULL) + return NULL; + cursor->portal = NULL; + cursor->closed = false; + PLy_typeinfo_init(&cursor->result); + + oldcontext = CurrentMemoryContext; + oldowner = CurrentResourceOwner; + + BeginInternalSubTransaction(NULL); + MemoryContextSwitchTo(oldcontext); + + PG_TRY(); + { + SPIPlanPtr plan; + Portal portal; + + pg_verifymbstr(query, strlen(query), false); + + plan = SPI_prepare(query, 0, NULL); + if (plan == NULL) + elog(ERROR, "SPI_prepare failed: %s", + SPI_result_code_string(SPI_result)); + + portal = SPI_cursor_open(NULL, plan, NULL, NULL, + PLy_curr_procedure->fn_readonly); + SPI_freeplan(plan); + + if (portal == NULL) + elog(ERROR, "SPI_cursor_open() failed:%s", + SPI_result_code_string(SPI_result)); + + cursor->portal = portal; + + /* Commit the inner transaction, return to outer xact context */ + ReleaseCurrentSubTransaction(); + MemoryContextSwitchTo(oldcontext); + CurrentResourceOwner = oldowner; + + /* + * AtEOSubXact_SPI() should not have popped any SPI context, but just + * in case it did, make sure we remain connected. + */ + SPI_restore_connection(); + } + PG_CATCH(); + { + ErrorData *edata; + PLyExceptionEntry *entry; + PyObject *exc; + + /* Save error info */ + MemoryContextSwitchTo(oldcontext); + edata = CopyErrorData(); + FlushErrorState(); + + /* Abort the inner transaction */ + RollbackAndReleaseCurrentSubTransaction(); + MemoryContextSwitchTo(oldcontext); + CurrentResourceOwner = oldowner; + + Py_DECREF(cursor); + + /* + * If AtEOSubXact_SPI() popped any SPI context of the subxact, it will + * have left us in a disconnected state. We need this hack to return + * to connected state. + */ + SPI_restore_connection(); + + /* Look up the correct exception */ + entry = hash_search(PLy_spi_exceptions, &(edata->sqlerrcode), + HASH_FIND, NULL); + /* We really should find it, but just in case have a fallback */ + Assert(entry != NULL); + exc = entry ? entry->exc : PLy_exc_spi_error; + /* Make Python raise the exception */ + PLy_spi_exception_set(exc, edata); + return NULL; + } + PG_END_TRY(); + + Assert(cursor->portal != NULL); + return (PyObject *) cursor; + } + + static PyObject * + PLy_cursor_plan(PyObject *ob, PyObject *args) + { + PLyCursorObject *cursor; + volatile int nargs; + int i; + PLyPlanObject *plan; + volatile MemoryContext oldcontext; + volatile ResourceOwner oldowner; + + if (args != NULL) + { + if (!PySequence_Check(args) || PyString_Check(args) || PyUnicode_Check(args)) + { + PLy_exception_set(PyExc_TypeError, "plpy.cursor takes a sequence as its second argument"); + return NULL; + } + nargs = PySequence_Length(args); + } + else + nargs = 0; + + plan = (PLyPlanObject *) ob; + + if (nargs != plan->nargs) + { + char *sv; + PyObject *so = PyObject_Str(args); + + if (!so) + PLy_elog(ERROR, "could not execute plan"); + sv = PyString_AsString(so); + PLy_exception_set_plural(PyExc_TypeError, + "Expected sequence of %d argument, got %d: %s", + "Expected sequence of %d arguments, got %d: %s", + plan->nargs, + plan->nargs, nargs, sv); + Py_DECREF(so); + + return NULL; + } + + if ((cursor = PyObject_New(PLyCursorObject, &PLy_CursorType)) == NULL) + return NULL; + cursor->portal = NULL; + cursor->closed = false; + PLy_typeinfo_init(&cursor->result); + + oldcontext = CurrentMemoryContext; + oldowner = CurrentResourceOwner; + + BeginInternalSubTransaction(NULL); + MemoryContextSwitchTo(oldcontext); + + PG_TRY(); + { + Portal portal; + char *volatile nulls; + volatile int j; + + if (nargs > 0) + nulls = palloc(nargs * sizeof(char)); + else + nulls = NULL; + + for (j = 0; j < nargs; j++) + { + PyObject *elem; + + elem = PySequence_GetItem(args, j); + if (elem != Py_None) + { + PG_TRY(); + { + plan->values[j] = + plan->args[j].out.d.func(&(plan->args[j].out.d), + -1, + elem); + } + PG_CATCH(); + { + Py_DECREF(elem); + PG_RE_THROW(); + } + PG_END_TRY(); + + Py_DECREF(elem); + nulls[j] = ' '; + } + else + { + Py_DECREF(elem); + plan->values[j] = + InputFunctionCall(&(plan->args[j].out.d.typfunc), + NULL, + plan->args[j].out.d.typioparam, + -1); + nulls[j] = 'n'; + } + } + + portal = SPI_cursor_open(NULL, plan->plan, plan->values, nulls, + PLy_curr_procedure->fn_readonly); + if (portal == NULL) + elog(ERROR, "SPI_cursor_open() failed:%s", + SPI_result_code_string(SPI_result)); + + cursor->portal = portal; + + /* Commit the inner transaction, return to outer xact context */ + ReleaseCurrentSubTransaction(); + MemoryContextSwitchTo(oldcontext); + CurrentResourceOwner = oldowner; + + /* + * AtEOSubXact_SPI() should not have popped any SPI context, but just + * in case it did, make sure we remain connected. + */ + SPI_restore_connection(); + } + PG_CATCH(); + { + int k; + ErrorData *edata; + PLyExceptionEntry *entry; + PyObject *exc; + + /* Save error info */ + MemoryContextSwitchTo(oldcontext); + edata = CopyErrorData(); + FlushErrorState(); + + /* cleanup plan->values array */ + for (k = 0; k < nargs; k++) + { + if (!plan->args[k].out.d.typbyval && + (plan->values[k] != PointerGetDatum(NULL))) + { + pfree(DatumGetPointer(plan->values[k])); + plan->values[k] = PointerGetDatum(NULL); + } + } + + /* Abort the inner transaction */ + RollbackAndReleaseCurrentSubTransaction(); + MemoryContextSwitchTo(oldcontext); + CurrentResourceOwner = oldowner; + + Py_DECREF(cursor); + + /* + * If AtEOSubXact_SPI() popped any SPI context of the subxact, it will + * have left us in a disconnected state. We need this hack to return + * to connected state. + */ + SPI_restore_connection(); + + /* Look up the correct exception */ + entry = hash_search(PLy_spi_exceptions, &(edata->sqlerrcode), + HASH_FIND, NULL); + /* We really should find it, but just in case have a fallback */ + Assert(entry != NULL); + exc = entry ? entry->exc : PLy_exc_spi_error; + /* Make Python raise the exception */ + PLy_spi_exception_set(exc, edata); + return NULL; + } + PG_END_TRY(); + + for (i = 0; i < nargs; i++) + { + if (!plan->args[i].out.d.typbyval && + (plan->values[i] != PointerGetDatum(NULL))) + { + pfree(DatumGetPointer(plan->values[i])); + plan->values[i] = PointerGetDatum(NULL); + } + } + + Assert(cursor->portal != NULL); + return (PyObject *) cursor; + } + + static void + PLy_cursor_dealloc(PyObject *arg) + { + PLy_cursor_close(arg, NULL); + arg->ob_type->tp_free(arg); + } + + static PyObject * + PLy_cursor_iternext(PyObject *self) + { + PLyCursorObject *cursor; + PyObject *ret; + volatile MemoryContext oldcontext; + volatile ResourceOwner oldowner; + + cursor = (PLyCursorObject *) self; + + if (cursor->closed) + { + PLy_exception_set(PyExc_ValueError, "iterating a closed cursor"); + return NULL; + } + + oldcontext = CurrentMemoryContext; + oldowner = CurrentResourceOwner; + + BeginInternalSubTransaction(NULL); + MemoryContextSwitchTo(oldcontext); + + PG_TRY(); + { + + SPI_cursor_fetch(cursor->portal, true, 1); + if (SPI_processed == 0) + { + PyErr_SetNone(PyExc_StopIteration); + ret = NULL; + } + else + { + if (cursor->result.is_rowtype != 1) + PLy_input_tuple_funcs(&cursor->result, SPI_tuptable->tupdesc); + + ret = PLyDict_FromTuple(&cursor->result, SPI_tuptable->vals[0], + SPI_tuptable->tupdesc); + } + + SPI_freetuptable(SPI_tuptable); + + /* Commit the inner transaction, return to outer xact context */ + ReleaseCurrentSubTransaction(); + MemoryContextSwitchTo(oldcontext); + CurrentResourceOwner = oldowner; + + /* + * AtEOSubXact_SPI() should not have popped any SPI context, but just + * in case it did, make sure we remain connected. + */ + SPI_restore_connection(); + } + PG_CATCH(); + { + ErrorData *edata; + PLyExceptionEntry *entry; + PyObject *exc; + + /* Save error info */ + MemoryContextSwitchTo(oldcontext); + edata = CopyErrorData(); + FlushErrorState(); + + /* Abort the inner transaction */ + RollbackAndReleaseCurrentSubTransaction(); + MemoryContextSwitchTo(oldcontext); + CurrentResourceOwner = oldowner; + + SPI_freetuptable(SPI_tuptable); + + /* + * If AtEOSubXact_SPI() popped any SPI context of the subxact, it will + * have left us in a disconnected state. We need this hack to return + * to connected state. + */ + SPI_restore_connection(); + + /* Look up the correct exception */ + entry = hash_search(PLy_spi_exceptions, &edata->sqlerrcode, + HASH_FIND, NULL); + /* We really should find it, but just in case have a fallback */ + Assert(entry != NULL); + exc = entry ? entry->exc : PLy_exc_spi_error; + /* Make Python raise the exception */ + PLy_spi_exception_set(exc, edata); + return NULL; + } + PG_END_TRY(); + + return ret; + } + + static PyObject * + PLy_cursor_fetch(PyObject *self, PyObject *args) + { + PLyCursorObject *cursor; + int count; + PLyResultObject *ret; + volatile MemoryContext oldcontext; + volatile ResourceOwner oldowner; + + if (!PyArg_ParseTuple(args, "i", &count)) + return NULL; + + cursor = (PLyCursorObject *) self; + + if (cursor->closed) + { + PLy_exception_set(PyExc_ValueError, "fetch on a closed cursor"); + return NULL; + } + + ret = (PLyResultObject *) PLy_result_new(); + if (ret == NULL) + return NULL; + + oldcontext = CurrentMemoryContext; + oldowner = CurrentResourceOwner; + + BeginInternalSubTransaction(NULL); + MemoryContextSwitchTo(oldcontext); + + PG_TRY(); + { + SPI_cursor_fetch(cursor->portal, true, count); + + if (cursor->result.is_rowtype != 1) + PLy_input_tuple_funcs(&cursor->result, SPI_tuptable->tupdesc); + + Py_DECREF(ret->status); + ret->status = PyInt_FromLong(SPI_OK_FETCH); + + Py_DECREF(ret->nrows); + ret->nrows = PyInt_FromLong(SPI_processed); + + if (SPI_processed != 0) + { + int i; + + Py_DECREF(ret->rows); + ret->rows = PyList_New(SPI_processed); + + for (i = 0; i < SPI_processed; i++) + { + PyObject *row = PLyDict_FromTuple(&cursor->result, + SPI_tuptable->vals[i], + SPI_tuptable->tupdesc); + PyList_SetItem(ret->rows, i, row); + } + } + + SPI_freetuptable(SPI_tuptable); + + /* Commit the inner transaction, return to outer xact context */ + ReleaseCurrentSubTransaction(); + MemoryContextSwitchTo(oldcontext); + CurrentResourceOwner = oldowner; + + /* + * AtEOSubXact_SPI() should not have popped any SPI context, but just + * in case it did, make sure we remain connected. + */ + SPI_restore_connection(); + } + PG_CATCH(); + { + ErrorData *edata; + PLyExceptionEntry *entry; + PyObject *exc; + + /* Save error info */ + MemoryContextSwitchTo(oldcontext); + edata = CopyErrorData(); + FlushErrorState(); + + /* Abort the inner transaction */ + RollbackAndReleaseCurrentSubTransaction(); + MemoryContextSwitchTo(oldcontext); + CurrentResourceOwner = oldowner; + + SPI_freetuptable(SPI_tuptable); + + /* + * If AtEOSubXact_SPI() popped any SPI context of the subxact, it will + * have left us in a disconnected state. We need this hack to return + * to connected state. + */ + SPI_restore_connection(); + + /* Look up the correct exception */ + entry = hash_search(PLy_spi_exceptions, &edata->sqlerrcode, + HASH_FIND, NULL); + /* We really should find it, but just in case have a fallback */ + Assert(entry != NULL); + exc = entry ? entry->exc : PLy_exc_spi_error; + /* Make Python raise the exception */ + PLy_spi_exception_set(exc, edata); + return NULL; + } + PG_END_TRY(); + + return (PyObject *) ret; + } + + static PyObject * + PLy_cursor_close(PyObject *self, PyObject *unused) + { + PLyCursorObject *cursor = (PLyCursorObject *) self; + + if (!cursor->closed) + { + if (cursor->portal != NULL) + { + SPI_cursor_close(cursor->portal); + cursor->portal = NULL; + } + + PLy_typeinfo_dealloc(&cursor->result); + cursor->closed = true; + } + + Py_INCREF(Py_None); + return Py_None; + } + /* s = plpy.subtransaction() */ static PyObject * PLy_subtransaction(PyObject *self, PyObject *unused) *************** PLy_init_plpy(void) *** 4184,4189 **** --- 4787,4794 ---- elog(ERROR, "could not initialize PLy_ResultType"); if (PyType_Ready(&PLy_SubtransactionType) < 0) elog(ERROR, "could not initialize PLy_SubtransactionType"); + if (PyType_Ready(&PLy_CursorType) < 0) + elog(ERROR, "could not initialize PLy_CursorType"); #if PY_MAJOR_VERSION >= 3 PyModule_Create(&PLy_module); diff --git a/src/pl/plpython/sql/plpython_spi.sql b/src/pl/plpython/sql/plpython_spi.sql index 7f8f6a3..874b31e 100644 *** a/src/pl/plpython/sql/plpython_spi.sql --- b/src/pl/plpython/sql/plpython_spi.sql *************** else: *** 105,107 **** --- 105,223 ---- $$ LANGUAGE plpythonu; SELECT result_nrows_test(); + + + -- cursor objects + + CREATE FUNCTION simple_cursor_test() RETURNS int AS $$ + res = plpy.cursor("select fname, lname from users") + does = 0 + for row in res: + if row['lname'] == 'doe': + does += 1 + return does + $$ LANGUAGE plpythonu; + + CREATE FUNCTION double_cursor_close() RETURNS int AS $$ + res = plpy.cursor("select fname, lname from users") + res.close() + res.close() + $$ LANGUAGE plpythonu; + + CREATE FUNCTION cursor_fetch() RETURNS int AS $$ + res = plpy.cursor("select fname, lname from users") + assert len(res.fetch(3)) == 3 + assert len(res.fetch(3)) == 1 + assert len(res.fetch(3)) == 0 + assert len(res.fetch(3)) == 0 + try: + # use next() or __next__(), the method name changed in + # http://www.python.org/dev/peps/pep-3114/ + try: + res.next() + except AttributeError: + res.__next__() + except StopIteration: + pass + else: + assert False, "StopIteration not raised" + $$ LANGUAGE plpythonu; + + CREATE FUNCTION cursor_mix_next_and_fetch() RETURNS int AS $$ + res = plpy.cursor("select fname, lname from users order by fname") + assert len(res.fetch(2)) == 2 + + item = None + try: + item = res.next() + except AttributeError: + item = res.__next__() + assert item['fname'] == 'rick' + + assert len(res.fetch(2)) == 1 + $$ LANGUAGE plpythonu; + + CREATE FUNCTION fetch_after_close() RETURNS int AS $$ + res = plpy.cursor("select fname, lname from users") + res.close() + try: + res.fetch(1) + except ValueError: + pass + else: + assert False, "ValueError not raised" + $$ LANGUAGE plpythonu; + + CREATE FUNCTION next_after_close() RETURNS int AS $$ + res = plpy.cursor("select fname, lname from users") + res.close() + try: + try: + res.next() + except AttributeError: + res.__next__() + except ValueError: + pass + else: + assert False, "ValueError not raised" + $$ LANGUAGE plpythonu; + + CREATE FUNCTION cursor_fetch_next_empty() RETURNS int AS $$ + res = plpy.cursor("select fname, lname from users where false") + assert len(res.fetch(1)) == 0 + try: + try: + res.next() + except AttributeError: + res.__next__() + except StopIteration: + pass + else: + assert False, "StopIteration not raised" + $$ LANGUAGE plpythonu; + + CREATE FUNCTION cursor_plan() RETURNS SETOF text AS $$ + plan = plpy.prepare( + "select fname, lname from users where fname like $1 || '%' order by fname", + ["text"]) + for row in plpy.cursor(plan, ["w"]): + yield row['fname'] + for row in plpy.cursor(plan, ["j"]): + yield row['fname'] + $$ LANGUAGE plpythonu; + + CREATE FUNCTION cursor_plan_wrong_args() RETURNS SETOF text AS $$ + plan = plpy.prepare("select fname, lname from users where fname like $1 || '%'", + ["text"]) + c = plpy.cursor(plan, ["a", "b"]) + $$ LANGUAGE plpythonu; + + SELECT simple_cursor_test(); + SELECT double_cursor_close(); + SELECT cursor_fetch(); + SELECT cursor_mix_next_and_fetch(); + SELECT fetch_after_close(); + SELECT next_after_close(); + SELECT cursor_fetch_next_empty(); + SELECT cursor_plan(); + SELECT cursor_plan_wrong_args(); -- 1.7.6.3
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers