Hi,

attached is a patch implementing the usage of SPI cursors in PL/Python.
Currently when trying to process a large table in PL/Python you have
slurp it all into memory (that's what plpy.execute does).

This patch allows reading the result set in smaller chunks, using a SPI
cursor behind the scenes.

Example usage:

cursor = plpy.cursor("select a, b from hugetable")
for row in cursor:
    plpy.info("a is %s and b is %s" % (row['a'], row['b']))

The patch itself is simple, but there's a lot of boilerplate dedicated
to opening a subtransaction and handling prepared plans. I'd like to do
some refactoring of they way PL/Python uses SPI to reduce the amount of
boilerplate needed, but that'll come as a separate patch (just before
the patch to split plpython.c in smaller chunks).

This feature has been sponsored by Nomao.

Cheers,
Jan

PS: I already added it to the November CF.

J
>From 9ad14957e7b4ae19667df3bb8cc2aa5ef5bf96c8 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Jan=20Urba=C5=84ski?= <wulc...@wulczer.org>
Date: Tue, 13 Sep 2011 14:42:41 +0200
Subject: [PATCH] Add cursor support to plpythonu.

Exposes SPI cursors as plpythonu objects allowing processing large
result sets without loading them entirely into memory, as plpy.execute
is doing.
---
 doc/src/sgml/plpython.sgml                 |   80 ++++
 src/pl/plpython/expected/plpython_spi.out  |  151 +++++++
 src/pl/plpython/expected/plpython_test.out |    6 +-
 src/pl/plpython/plpython.c                 |  605 ++++++++++++++++++++++++++++
 src/pl/plpython/sql/plpython_spi.sql       |  116 ++++++
 5 files changed, 955 insertions(+), 3 deletions(-)

diff --git a/doc/src/sgml/plpython.sgml b/doc/src/sgml/plpython.sgml
index eda2bbf..d08c3d1 100644
*** a/doc/src/sgml/plpython.sgml
--- b/doc/src/sgml/plpython.sgml
*************** $$ LANGUAGE plpythonu;
*** 892,897 ****
--- 892,906 ----
    </para>
  
    <para>
+     Note that calling <literal>plpy.execute</literal> will cause the entire
+     result set to be read into memory. Only use that function when you are sure
+     that the result set will be relatively small.  If you don't want to risk
+     excessive memory usage when fetching large results,
+     use <literal>plpy.cursor</literal> rather
+     than <literal>plpy.execute</literal>.
+   </para>
+ 
+   <para>
     For example:
  <programlisting>
  rv = plpy.execute("SELECT * FROM my_table", 5)
*************** $$ LANGUAGE plpythonu;
*** 958,963 ****
--- 967,1043 ----
  
    </sect2>
  
+   <sect2>
+     <title>Accessing data with cursors</title>
+ 
+   <para>
+     The <literal>plpy.cursor</literal> function accepts the same arguments
+     as <literal>plpy.execute</literal> (except for <literal>limit</literal>)
+     and returns a cursor object, which allows you to process large result sets
+     in smaller chunks.  As with <literal>plpy.execute</literal>, either a query
+     string or a plan object along with a list of arguments can be used.  The
+     cursor object provides a <literal>fetch</literal> method that accepts an
+     integer paramter and returns a result object.  Each time you
+     call <literal>fetch</literal>, the returned object will contain the next
+     batch of rows, never larger than the parameter value.  Once all rows are
+     exhausted, <literal>fetch</literal> starts returning an empty result
+     object.  Cursor objects also provide an
+     <ulink url="http://docs.python.org/library/stdtypes.html#iterator-types";>iterator
+     interface</ulink>, yielding one row at a time until all rows are exhausted.
+     Data fetched that way is not returned as result objects, but rather as
+     dictionaries, each dictionary corresponding to a single result row.
+   </para>
+ 
+   <para>
+     Cursors are automatically disposed of, but if you want to explicitly
+     release all resources held by a cursor, use the <literal>close</literal>
+     method.  Once closed, a cursor cannot be fetched from anymore.
+   </para>
+ 
+    <note>
+     <para>
+       Do not confuse objects created by <literal>plpy.cursor</literal> with
+       DBAPI cursors as defined by
+       the <ulink url="http://www.python.org/dev/peps/pep-0249/";>Python Database API specification</ulink>.
+       They don't have anything in common except for the name.
+     </para>
+    </note>
+ 
+   <para>
+     An example of two ways of processing data from a large table would be:
+ <programlisting>
+ CREATE FUNCTION count_odd_iterator() RETURNS integer AS $$
+ odd = 0
+ for row in plpy.cursor("select num from largetable"):
+     if row['num'] % 2:
+          odd += 1
+ return odd
+ $$ LANGUAGE plpythonu;
+ 
+ CREATE FUNCTION count_odd_fetch(batch_size integer) RETURNS integer AS $$
+ odd = 0
+ cursor = plpy.cursor("select num from largetable")
+ while True:
+     rows = cursor.fetch(batch_size)
+     if not rows:
+         break
+     for row in rows:
+         if row['num'] % 2:
+             odd += 1
+ return odd
+ $$ LANGUAGE plpythonu;
+ 
+ CREATE FUNCTION count_odd_prepared() RETURNS integer AS $$
+ odd = 0
+ plan = plpy.prepare("select num from largetable where num % $1 != 0", ["integer"])
+ rows = list(plpy.cursor(plan, [2]))
+ 
+ return len(rows)
+ $$ LANGUAGE plpythonu;
+ </programlisting>
+   </para>
+   </sect2>
+ 
    <sect2 id="plpython-trapping">
     <title>Trapping Errors</title>
  
diff --git a/src/pl/plpython/expected/plpython_spi.out b/src/pl/plpython/expected/plpython_spi.out
index 7f4ae5c..3b4d7a3 100644
*** a/src/pl/plpython/expected/plpython_spi.out
--- b/src/pl/plpython/expected/plpython_spi.out
*************** CONTEXT:  PL/Python function "result_nro
*** 133,135 ****
--- 133,286 ----
                   2
  (1 row)
  
+ -- cursor objects
+ CREATE FUNCTION simple_cursor_test() RETURNS int AS $$
+ res = plpy.cursor("select fname, lname from users")
+ does = 0
+ for row in res:
+     if row['lname'] == 'doe':
+         does += 1
+ return does
+ $$ LANGUAGE plpythonu;
+ CREATE FUNCTION double_cursor_close() RETURNS int AS $$
+ res = plpy.cursor("select fname, lname from users")
+ res.close()
+ res.close()
+ $$ LANGUAGE plpythonu;
+ CREATE FUNCTION cursor_fetch() RETURNS int AS $$
+ res = plpy.cursor("select fname, lname from users")
+ assert len(res.fetch(3)) == 3
+ assert len(res.fetch(3)) == 1
+ assert len(res.fetch(3)) == 0
+ assert len(res.fetch(3)) == 0
+ try:
+     # use next() or __next__(), the method name changed in
+     # http://www.python.org/dev/peps/pep-3114/
+     try:
+         res.next()
+     except AttributeError:
+         res.__next__()
+ except StopIteration:
+     pass
+ else:
+     assert False, "StopIteration not raised"
+ $$ LANGUAGE plpythonu;
+ CREATE FUNCTION cursor_mix_next_and_fetch() RETURNS int AS $$
+ res = plpy.cursor("select fname, lname from users order by fname")
+ assert len(res.fetch(2)) == 2
+ 
+ item = None
+ try:
+     item = res.next()
+ except AttributeError:
+     item = res.__next__()
+ assert item['fname'] == 'rick'
+ 
+ assert len(res.fetch(2)) == 1
+ $$ LANGUAGE plpythonu;
+ CREATE FUNCTION fetch_after_close() RETURNS int AS $$
+ res = plpy.cursor("select fname, lname from users")
+ res.close()
+ try:
+     res.fetch(1)
+ except ValueError:
+     pass
+ else:
+     assert False, "ValueError not raised"
+ $$ LANGUAGE plpythonu;
+ CREATE FUNCTION next_after_close() RETURNS int AS $$
+ res = plpy.cursor("select fname, lname from users")
+ res.close()
+ try:
+     try:
+         res.next()
+     except AttributeError:
+         res.__next__()
+ except ValueError:
+     pass
+ else:
+     assert False, "ValueError not raised"
+ $$ LANGUAGE plpythonu;
+ CREATE FUNCTION cursor_fetch_next_empty() RETURNS int AS $$
+ res = plpy.cursor("select fname, lname from users where false")
+ assert len(res.fetch(1)) == 0
+ try:
+     try:
+         res.next()
+     except AttributeError:
+         res.__next__()
+ except StopIteration:
+     pass
+ else:
+     assert False, "StopIteration not raised"
+ $$ LANGUAGE plpythonu;
+ CREATE FUNCTION cursor_plan() RETURNS SETOF text AS $$
+ plan = plpy.prepare(
+     "select fname, lname from users where fname like $1 || '%' order by fname",
+     ["text"])
+ for row in plpy.cursor(plan, ["w"]):
+     yield row['fname']
+ for row in plpy.cursor(plan, ["j"]):
+     yield row['fname']
+ $$ LANGUAGE plpythonu;
+ CREATE FUNCTION cursor_plan_wrong_args() RETURNS SETOF text AS $$
+ plan = plpy.prepare("select fname, lname from users where fname like $1 || '%'",
+                     ["text"])
+ c = plpy.cursor(plan, ["a", "b"])
+ $$ LANGUAGE plpythonu;
+ SELECT simple_cursor_test();
+  simple_cursor_test 
+ --------------------
+                   3
+ (1 row)
+ 
+ SELECT double_cursor_close();
+  double_cursor_close 
+ ---------------------
+                     
+ (1 row)
+ 
+ SELECT cursor_fetch();
+  cursor_fetch 
+ --------------
+              
+ (1 row)
+ 
+ SELECT cursor_mix_next_and_fetch();
+  cursor_mix_next_and_fetch 
+ ---------------------------
+                           
+ (1 row)
+ 
+ SELECT fetch_after_close();
+  fetch_after_close 
+ -------------------
+                   
+ (1 row)
+ 
+ SELECT next_after_close();
+  next_after_close 
+ ------------------
+                  
+ (1 row)
+ 
+ SELECT cursor_fetch_next_empty();
+  cursor_fetch_next_empty 
+ -------------------------
+                         
+ (1 row)
+ 
+ SELECT cursor_plan();
+  cursor_plan 
+ -------------
+  willem
+  jane
+  john
+ (3 rows)
+ 
+ SELECT cursor_plan_wrong_args();
+ ERROR:  TypeError: Expected sequence of 1 argument, got 2: ['a', 'b']
+ CONTEXT:  Traceback (most recent call last):
+   PL/Python function "cursor_plan_wrong_args", line 4, in <module>
+     c = plpy.cursor(plan, ["a", "b"])
+ PL/Python function "cursor_plan_wrong_args"
diff --git a/src/pl/plpython/expected/plpython_test.out b/src/pl/plpython/expected/plpython_test.out
index f2dda66..a884fc0 100644
*** a/src/pl/plpython/expected/plpython_test.out
--- b/src/pl/plpython/expected/plpython_test.out
*************** contents.sort()
*** 43,51 ****
  return ", ".join(contents)
  $$ LANGUAGE plpythonu;
  select module_contents();
!                                                                            module_contents                                                                            
! ----------------------------------------------------------------------------------------------------------------------------------------------------------------------
!  Error, Fatal, SPIError, debug, error, execute, fatal, info, log, notice, prepare, quote_ident, quote_literal, quote_nullable, spiexceptions, subtransaction, warning
  (1 row)
  
  CREATE FUNCTION elog_test() RETURNS void
--- 43,51 ----
  return ", ".join(contents)
  $$ LANGUAGE plpythonu;
  select module_contents();
!                                                                                module_contents                                                                                
! ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
!  Error, Fatal, SPIError, cursor, debug, error, execute, fatal, info, log, notice, prepare, quote_ident, quote_literal, quote_nullable, spiexceptions, subtransaction, warning
  (1 row)
  
  CREATE FUNCTION elog_test() RETURNS void
diff --git a/src/pl/plpython/plpython.c b/src/pl/plpython/plpython.c
index 93e8043..d56b2d7 100644
*** a/src/pl/plpython/plpython.c
--- b/src/pl/plpython/plpython.c
*************** typedef int Py_ssize_t;
*** 134,139 ****
--- 134,144 ----
  		PyObject_HEAD_INIT(type) size,
  #endif
  
+ /* Python 3 removed the Py_TPFLAGS_HAVE_ITER flag */
+ #if PY_MAJOR_VERSION >= 3
+ #define Py_TPFLAGS_HAVE_ITER 0
+ #endif
+ 
  /* define our text domain for translations */
  #undef TEXTDOMAIN
  #define TEXTDOMAIN PG_TEXTDOMAIN("plpython")
*************** typedef struct PLySubtransactionObject
*** 310,315 ****
--- 315,328 ----
  	bool		exited;
  } PLySubtransactionObject;
  
+ typedef struct PLyCursorObject
+ {
+ 	PyObject_HEAD
+ 	Portal		portal;
+ 	PLyTypeInfo result;
+ 	bool		closed;
+ } PLyCursorObject;
+ 
  /* A list of all known exceptions, generated from backend/utils/errcodes.txt */
  typedef struct ExceptionMap
  {
*************** static char PLy_subtransaction_doc[] = {
*** 486,491 ****
--- 499,508 ----
  	"PostgreSQL subtransaction context manager"
  };
  
+ static char PLy_cursor_doc[] = {
+ 	"Wrapper around a PostgreSQL cursor"
+ };
+ 
  
  /*
   * the function definitions
*************** static void PLy_subtransaction_dealloc(P
*** 2963,2968 ****
--- 2980,2993 ----
  static PyObject *PLy_subtransaction_enter(PyObject *, PyObject *);
  static PyObject *PLy_subtransaction_exit(PyObject *, PyObject *);
  
+ static PyObject *PLy_cursor(PyObject *, PyObject *);
+ static PyObject *PLy_cursor_query(char *);
+ static PyObject *PLy_cursor_plan(PyObject *, PyObject *);
+ static void PLy_cursor_dealloc(PyObject *);
+ static PyObject *PLy_cursor_iternext(PyObject *);
+ static PyObject *PLy_cursor_fetch(PyObject *, PyObject *);
+ static PyObject *PLy_cursor_close(PyObject *, PyObject *);
+ 
  
  static PyMethodDef PLy_plan_methods[] = {
  	{"status", PLy_plan_status, METH_VARARGS, NULL},
*************** static PyTypeObject PLy_SubtransactionTy
*** 3099,3104 ****
--- 3124,3170 ----
  	PLy_subtransaction_methods, /* tp_tpmethods */
  };
  
+ static PyMethodDef PLy_cursor_methods[] = {
+ 	{"fetch", PLy_cursor_fetch, METH_VARARGS, NULL},
+ 	{"close", PLy_cursor_close, METH_NOARGS, NULL},
+ 	{NULL, NULL, 0, NULL}
+ };
+ 
+ static PyTypeObject PLy_CursorType = {
+ 	PyVarObject_HEAD_INIT(NULL, 0)
+ 	"PLyCursor",		/* tp_name */
+ 	sizeof(PLyCursorObject),	/* tp_size */
+ 	0,							/* tp_itemsize */
+ 
+ 	/*
+ 	 * methods
+ 	 */
+ 	PLy_cursor_dealloc, 		/* tp_dealloc */
+ 	0,							/* tp_print */
+ 	0,							/* tp_getattr */
+ 	0,							/* tp_setattr */
+ 	0,							/* tp_compare */
+ 	0,							/* tp_repr */
+ 	0,							/* tp_as_number */
+ 	0,							/* tp_as_sequence */
+ 	0,							/* tp_as_mapping */
+ 	0,							/* tp_hash */
+ 	0,							/* tp_call */
+ 	0,							/* tp_str */
+ 	0,							/* tp_getattro */
+ 	0,							/* tp_setattro */
+ 	0,							/* tp_as_buffer */
+ 	Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE | Py_TPFLAGS_HAVE_ITER,	/* tp_flags */
+ 	PLy_cursor_doc,				/* tp_doc */
+ 	0,							/* tp_traverse */
+ 	0,							/* tp_clear */
+ 	0,							/* tp_richcompare */
+ 	0,							/* tp_weaklistoffset */
+ 	PyObject_SelfIter,			/* tp_iter */
+ 	PLy_cursor_iternext,		/* tp_iternext */
+ 	PLy_cursor_methods,			/* tp_tpmethods */
+ };
+ 
  static PyMethodDef PLy_methods[] = {
  	/*
  	 * logging methods
*************** static PyMethodDef PLy_methods[] = {
*** 3133,3138 ****
--- 3199,3209 ----
  	 */
  	{"subtransaction", PLy_subtransaction, METH_NOARGS, NULL},
  
+ 	/*
+ 	 * create a cursor
+ 	 */
+ 	{"cursor", PLy_cursor, METH_VARARGS, NULL},
+ 
  	{NULL, NULL, 0, NULL}
  };
  
*************** PLy_spi_execute_fetch_result(SPITupleTab
*** 3833,3838 ****
--- 3904,4441 ----
  	return (PyObject *) result;
  }
  
+ /*
+  * c = plpy.cursor("select * from largetable")
+  * c = plpy.cursor(plan, [])
+  */
+ static PyObject *
+ PLy_cursor(PyObject *self, PyObject *args)
+ {
+ 	char			*query;
+ 	PyObject		*plan;
+ 	PyObject   		*planargs = NULL;
+ 
+ 	if (PyArg_ParseTuple(args, "s", &query))
+ 		return PLy_cursor_query(query);
+ 
+ 	PyErr_Clear();
+ 
+ 	if (PyArg_ParseTuple(args, "O|O", &plan, &planargs))
+ 		return PLy_cursor_plan(plan, planargs);
+ 
+ 	PLy_exception_set(PLy_exc_error, "plpy.cursor expected a query or a plan");
+ 	return NULL;
+ }
+ 
+ static PyObject *
+ PLy_cursor_query(char *query)
+ {
+ 	PLyCursorObject	*cursor;
+ 	volatile MemoryContext oldcontext;
+ 	volatile ResourceOwner oldowner;
+ 
+ 	if ((cursor = PyObject_New(PLyCursorObject, &PLy_CursorType)) == NULL)
+ 		return NULL;
+ 	cursor->portal = NULL;
+ 	cursor->closed = false;
+ 	PLy_typeinfo_init(&cursor->result);
+ 
+ 	oldcontext = CurrentMemoryContext;
+ 	oldowner = CurrentResourceOwner;
+ 
+ 	BeginInternalSubTransaction(NULL);
+ 	MemoryContextSwitchTo(oldcontext);
+ 
+ 	PG_TRY();
+ 	{
+ 		SPIPlanPtr	plan;
+ 		Portal		portal;
+ 
+ 		pg_verifymbstr(query, strlen(query), false);
+ 
+ 		plan = SPI_prepare(query, 0, NULL);
+ 		if (plan == NULL)
+ 			elog(ERROR, "SPI_prepare failed: %s",
+ 				 SPI_result_code_string(SPI_result));
+ 
+ 		portal = SPI_cursor_open(NULL, plan, NULL, NULL,
+ 								 PLy_curr_procedure->fn_readonly);
+ 		SPI_freeplan(plan);
+ 
+ 		if (portal == NULL)
+ 			elog(ERROR, "SPI_cursor_open() failed:%s",
+ 				 SPI_result_code_string(SPI_result));
+ 
+ 		cursor->portal = portal;
+ 
+ 		/* Commit the inner transaction, return to outer xact context */
+ 		ReleaseCurrentSubTransaction();
+ 		MemoryContextSwitchTo(oldcontext);
+ 		CurrentResourceOwner = oldowner;
+ 
+ 		/*
+ 		 * AtEOSubXact_SPI() should not have popped any SPI context, but just
+ 		 * in case it did, make sure we remain connected.
+ 		 */
+ 		SPI_restore_connection();
+ 	}
+ 	PG_CATCH();
+ 	{
+ 		ErrorData  *edata;
+ 		PLyExceptionEntry *entry;
+ 		PyObject   *exc;
+ 
+ 		/* Save error info */
+ 		MemoryContextSwitchTo(oldcontext);
+ 		edata = CopyErrorData();
+ 		FlushErrorState();
+ 
+ 		/* Abort the inner transaction */
+ 		RollbackAndReleaseCurrentSubTransaction();
+ 		MemoryContextSwitchTo(oldcontext);
+ 		CurrentResourceOwner = oldowner;
+ 
+ 		Py_DECREF(cursor);
+ 
+ 		/*
+ 		 * If AtEOSubXact_SPI() popped any SPI context of the subxact, it will
+ 		 * have left us in a disconnected state.  We need this hack to return
+ 		 * to connected state.
+ 		 */
+ 		SPI_restore_connection();
+ 
+ 		/* Look up the correct exception */
+ 		entry = hash_search(PLy_spi_exceptions, &(edata->sqlerrcode),
+ 							HASH_FIND, NULL);
+ 		/* We really should find it, but just in case have a fallback */
+ 		Assert(entry != NULL);
+ 		exc = entry ? entry->exc : PLy_exc_spi_error;
+ 		/* Make Python raise the exception */
+ 		PLy_spi_exception_set(exc, edata);
+ 		return NULL;
+ 	}
+ 	PG_END_TRY();
+ 
+ 	Assert(cursor->portal != NULL);
+ 	return (PyObject *) cursor;
+ }
+ 
+ static PyObject *
+ PLy_cursor_plan(PyObject *ob, PyObject *args)
+ {
+ 	PLyCursorObject	*cursor;
+ 	volatile int nargs;
+ 	int			i;
+ 	PLyPlanObject *plan;
+ 	volatile MemoryContext oldcontext;
+ 	volatile ResourceOwner oldowner;
+ 
+ 	if (args != NULL)
+ 	{
+ 		if (!PySequence_Check(args) || PyString_Check(args) || PyUnicode_Check(args))
+ 		{
+ 			PLy_exception_set(PyExc_TypeError, "plpy.cursor takes a sequence as its second argument");
+ 			return NULL;
+ 		}
+ 		nargs = PySequence_Length(args);
+ 	}
+ 	else
+ 		nargs = 0;
+ 
+ 	plan = (PLyPlanObject *) ob;
+ 
+ 	if (nargs != plan->nargs)
+ 	{
+ 		char	   *sv;
+ 		PyObject   *so = PyObject_Str(args);
+ 
+ 		if (!so)
+ 			PLy_elog(ERROR, "could not execute plan");
+ 		sv = PyString_AsString(so);
+ 		PLy_exception_set_plural(PyExc_TypeError,
+ 								 "Expected sequence of %d argument, got %d: %s",
+ 								 "Expected sequence of %d arguments, got %d: %s",
+ 								 plan->nargs,
+ 								 plan->nargs, nargs, sv);
+ 		Py_DECREF(so);
+ 
+ 		return NULL;
+ 	}
+ 
+ 	if ((cursor = PyObject_New(PLyCursorObject, &PLy_CursorType)) == NULL)
+ 		return NULL;
+ 	cursor->portal = NULL;
+ 	cursor->closed = false;
+ 	PLy_typeinfo_init(&cursor->result);
+ 
+ 	oldcontext = CurrentMemoryContext;
+ 	oldowner = CurrentResourceOwner;
+ 
+ 	BeginInternalSubTransaction(NULL);
+ 	MemoryContextSwitchTo(oldcontext);
+ 
+ 	PG_TRY();
+ 	{
+ 		Portal		portal;
+ 		char	   *volatile nulls;
+ 		volatile int j;
+ 
+ 		if (nargs > 0)
+ 			nulls = palloc(nargs * sizeof(char));
+ 		else
+ 			nulls = NULL;
+ 
+ 		for (j = 0; j < nargs; j++)
+ 		{
+ 			PyObject   *elem;
+ 
+ 			elem = PySequence_GetItem(args, j);
+ 			if (elem != Py_None)
+ 			{
+ 				PG_TRY();
+ 				{
+ 					plan->values[j] =
+ 						plan->args[j].out.d.func(&(plan->args[j].out.d),
+ 												 -1,
+ 												 elem);
+ 				}
+ 				PG_CATCH();
+ 				{
+ 					Py_DECREF(elem);
+ 					PG_RE_THROW();
+ 				}
+ 				PG_END_TRY();
+ 
+ 				Py_DECREF(elem);
+ 				nulls[j] = ' ';
+ 			}
+ 			else
+ 			{
+ 				Py_DECREF(elem);
+ 				plan->values[j] =
+ 					InputFunctionCall(&(plan->args[j].out.d.typfunc),
+ 									  NULL,
+ 									  plan->args[j].out.d.typioparam,
+ 									  -1);
+ 				nulls[j] = 'n';
+ 			}
+ 		}
+ 
+ 		portal = SPI_cursor_open(NULL, plan->plan, plan->values, nulls,
+ 								 PLy_curr_procedure->fn_readonly);
+ 		if (portal == NULL)
+ 			elog(ERROR, "SPI_cursor_open() failed:%s",
+ 				 SPI_result_code_string(SPI_result));
+ 
+ 		cursor->portal = portal;
+ 
+ 		/* Commit the inner transaction, return to outer xact context */
+ 		ReleaseCurrentSubTransaction();
+ 		MemoryContextSwitchTo(oldcontext);
+ 		CurrentResourceOwner = oldowner;
+ 
+ 		/*
+ 		 * AtEOSubXact_SPI() should not have popped any SPI context, but just
+ 		 * in case it did, make sure we remain connected.
+ 		 */
+ 		SPI_restore_connection();
+ 	}
+ 	PG_CATCH();
+ 	{
+ 		int			k;
+ 		ErrorData  *edata;
+ 		PLyExceptionEntry *entry;
+ 		PyObject   *exc;
+ 
+ 		/* Save error info */
+ 		MemoryContextSwitchTo(oldcontext);
+ 		edata = CopyErrorData();
+ 		FlushErrorState();
+ 
+ 		/* cleanup plan->values array */
+ 		for (k = 0; k < nargs; k++)
+ 		{
+ 			if (!plan->args[k].out.d.typbyval &&
+ 				(plan->values[k] != PointerGetDatum(NULL)))
+ 			{
+ 				pfree(DatumGetPointer(plan->values[k]));
+ 				plan->values[k] = PointerGetDatum(NULL);
+ 			}
+ 		}
+ 
+ 		/* Abort the inner transaction */
+ 		RollbackAndReleaseCurrentSubTransaction();
+ 		MemoryContextSwitchTo(oldcontext);
+ 		CurrentResourceOwner = oldowner;
+ 
+ 		Py_DECREF(cursor);
+ 
+ 		/*
+ 		 * If AtEOSubXact_SPI() popped any SPI context of the subxact, it will
+ 		 * have left us in a disconnected state.  We need this hack to return
+ 		 * to connected state.
+ 		 */
+ 		SPI_restore_connection();
+ 
+ 		/* Look up the correct exception */
+ 		entry = hash_search(PLy_spi_exceptions, &(edata->sqlerrcode),
+ 							HASH_FIND, NULL);
+ 		/* We really should find it, but just in case have a fallback */
+ 		Assert(entry != NULL);
+ 		exc = entry ? entry->exc : PLy_exc_spi_error;
+ 		/* Make Python raise the exception */
+ 		PLy_spi_exception_set(exc, edata);
+ 		return NULL;
+ 	}
+ 	PG_END_TRY();
+ 
+ 	for (i = 0; i < nargs; i++)
+ 	{
+ 		if (!plan->args[i].out.d.typbyval &&
+ 			(plan->values[i] != PointerGetDatum(NULL)))
+ 		{
+ 			pfree(DatumGetPointer(plan->values[i]));
+ 			plan->values[i] = PointerGetDatum(NULL);
+ 		}
+ 	}
+ 
+ 	Assert(cursor->portal != NULL);
+ 	return (PyObject *) cursor;
+ }
+ 
+ static void
+ PLy_cursor_dealloc(PyObject *arg)
+ {
+ 	PLy_cursor_close(arg, NULL);
+ 	arg->ob_type->tp_free(arg);
+ }
+ 
+ static PyObject *
+ PLy_cursor_iternext(PyObject *self)
+ {
+ 	PLyCursorObject *cursor;
+ 	PyObject		*ret;
+ 	volatile MemoryContext oldcontext;
+ 	volatile ResourceOwner oldowner;
+ 
+ 	cursor = (PLyCursorObject *) self;
+ 
+ 	if (cursor->closed)
+ 	{
+ 		PLy_exception_set(PyExc_ValueError, "iterating a closed cursor");
+ 		return NULL;
+ 	}
+ 
+ 	oldcontext = CurrentMemoryContext;
+ 	oldowner = CurrentResourceOwner;
+ 
+ 	BeginInternalSubTransaction(NULL);
+ 	MemoryContextSwitchTo(oldcontext);
+ 
+ 	PG_TRY();
+ 	{
+ 
+ 		SPI_cursor_fetch(cursor->portal, true, 1);
+ 		if (SPI_processed == 0)
+ 		{
+ 			PyErr_SetNone(PyExc_StopIteration);
+ 			ret = NULL;
+ 		}
+ 		else
+ 		{
+ 			if (cursor->result.is_rowtype != 1)
+ 				PLy_input_tuple_funcs(&cursor->result, SPI_tuptable->tupdesc);
+ 
+ 			ret = PLyDict_FromTuple(&cursor->result, SPI_tuptable->vals[0],
+ 									SPI_tuptable->tupdesc);
+ 		}
+ 
+ 		SPI_freetuptable(SPI_tuptable);
+ 
+ 		/* Commit the inner transaction, return to outer xact context */
+ 		ReleaseCurrentSubTransaction();
+ 		MemoryContextSwitchTo(oldcontext);
+ 		CurrentResourceOwner = oldowner;
+ 
+ 		/*
+ 		 * AtEOSubXact_SPI() should not have popped any SPI context, but just
+ 		 * in case it did, make sure we remain connected.
+ 		 */
+ 		SPI_restore_connection();
+ 	}
+ 	PG_CATCH();
+ 	{
+ 		ErrorData  *edata;
+ 		PLyExceptionEntry *entry;
+ 		PyObject   *exc;
+ 
+ 		/* Save error info */
+ 		MemoryContextSwitchTo(oldcontext);
+ 		edata = CopyErrorData();
+ 		FlushErrorState();
+ 
+ 		/* Abort the inner transaction */
+ 		RollbackAndReleaseCurrentSubTransaction();
+ 		MemoryContextSwitchTo(oldcontext);
+ 		CurrentResourceOwner = oldowner;
+ 
+ 		SPI_freetuptable(SPI_tuptable);
+ 
+ 		/*
+ 		 * If AtEOSubXact_SPI() popped any SPI context of the subxact, it will
+ 		 * have left us in a disconnected state.  We need this hack to return
+ 		 * to connected state.
+ 		 */
+ 		SPI_restore_connection();
+ 
+ 		/* Look up the correct exception */
+ 		entry = hash_search(PLy_spi_exceptions, &edata->sqlerrcode,
+ 							HASH_FIND, NULL);
+ 		/* We really should find it, but just in case have a fallback */
+ 		Assert(entry != NULL);
+ 		exc = entry ? entry->exc : PLy_exc_spi_error;
+ 		/* Make Python raise the exception */
+ 		PLy_spi_exception_set(exc, edata);
+ 		return NULL;
+ 	}
+ 	PG_END_TRY();
+ 
+ 	return ret;
+ }
+ 
+ static PyObject *
+ PLy_cursor_fetch(PyObject *self, PyObject *args)
+ {
+ 	PLyCursorObject *cursor;
+ 	int				count;
+ 	PLyResultObject	*ret;
+ 	volatile MemoryContext oldcontext;
+ 	volatile ResourceOwner oldowner;
+ 
+ 	if (!PyArg_ParseTuple(args, "i", &count))
+ 		return NULL;
+ 
+ 	cursor = (PLyCursorObject *) self;
+ 
+ 	if (cursor->closed)
+ 	{
+ 		PLy_exception_set(PyExc_ValueError, "fetch on a closed cursor");
+ 		return NULL;
+ 	}
+ 
+ 	ret = (PLyResultObject *) PLy_result_new();
+ 	if (ret == NULL)
+ 		return NULL;
+ 
+ 	oldcontext = CurrentMemoryContext;
+ 	oldowner = CurrentResourceOwner;
+ 
+ 	BeginInternalSubTransaction(NULL);
+ 	MemoryContextSwitchTo(oldcontext);
+ 
+ 	PG_TRY();
+ 	{
+ 		SPI_cursor_fetch(cursor->portal, true, count);
+ 
+ 		if (cursor->result.is_rowtype != 1)
+ 			PLy_input_tuple_funcs(&cursor->result, SPI_tuptable->tupdesc);
+ 
+ 		Py_DECREF(ret->status);
+ 		ret->status = PyInt_FromLong(SPI_OK_FETCH);
+ 
+ 		Py_DECREF(ret->nrows);
+ 		ret->nrows = PyInt_FromLong(SPI_processed);
+ 
+ 		if (SPI_processed != 0)
+ 		{
+ 			int	i;
+ 
+ 			Py_DECREF(ret->rows);
+ 			ret->rows = PyList_New(SPI_processed);
+ 
+ 			for (i = 0; i < SPI_processed; i++)
+ 			{
+ 				PyObject   *row = PLyDict_FromTuple(&cursor->result,
+ 													SPI_tuptable->vals[i],
+ 													SPI_tuptable->tupdesc);
+ 				PyList_SetItem(ret->rows, i, row);
+ 			}
+ 		}
+ 
+ 		SPI_freetuptable(SPI_tuptable);
+ 
+ 		/* Commit the inner transaction, return to outer xact context */
+ 		ReleaseCurrentSubTransaction();
+ 		MemoryContextSwitchTo(oldcontext);
+ 		CurrentResourceOwner = oldowner;
+ 
+ 		/*
+ 		 * AtEOSubXact_SPI() should not have popped any SPI context, but just
+ 		 * in case it did, make sure we remain connected.
+ 		 */
+ 		SPI_restore_connection();
+ 	}
+ 	PG_CATCH();
+ 	{
+ 		ErrorData  *edata;
+ 		PLyExceptionEntry *entry;
+ 		PyObject   *exc;
+ 
+ 		/* Save error info */
+ 		MemoryContextSwitchTo(oldcontext);
+ 		edata = CopyErrorData();
+ 		FlushErrorState();
+ 
+ 		/* Abort the inner transaction */
+ 		RollbackAndReleaseCurrentSubTransaction();
+ 		MemoryContextSwitchTo(oldcontext);
+ 		CurrentResourceOwner = oldowner;
+ 
+ 		SPI_freetuptable(SPI_tuptable);
+ 
+ 		/*
+ 		 * If AtEOSubXact_SPI() popped any SPI context of the subxact, it will
+ 		 * have left us in a disconnected state.  We need this hack to return
+ 		 * to connected state.
+ 		 */
+ 		SPI_restore_connection();
+ 
+ 		/* Look up the correct exception */
+ 		entry = hash_search(PLy_spi_exceptions, &edata->sqlerrcode,
+ 							HASH_FIND, NULL);
+ 		/* We really should find it, but just in case have a fallback */
+ 		Assert(entry != NULL);
+ 		exc = entry ? entry->exc : PLy_exc_spi_error;
+ 		/* Make Python raise the exception */
+ 		PLy_spi_exception_set(exc, edata);
+ 		return NULL;
+ 	}
+ 	PG_END_TRY();
+ 
+ 	return (PyObject *) ret;
+ }
+ 
+ static PyObject *
+ PLy_cursor_close(PyObject *self, PyObject *unused)
+ {
+ 	PLyCursorObject *cursor = (PLyCursorObject *) self;
+ 
+ 	if (!cursor->closed)
+ 	{
+ 		if (cursor->portal != NULL)
+ 		{
+ 			SPI_cursor_close(cursor->portal);
+ 			cursor->portal = NULL;
+ 		}
+ 
+ 		PLy_typeinfo_dealloc(&cursor->result);
+ 		cursor->closed = true;
+ 	}
+ 
+ 	Py_INCREF(Py_None);
+ 	return Py_None;
+ }
+ 
  /* s = plpy.subtransaction() */
  static PyObject *
  PLy_subtransaction(PyObject *self, PyObject *unused)
*************** PLy_init_plpy(void)
*** 4184,4189 ****
--- 4787,4794 ----
  		elog(ERROR, "could not initialize PLy_ResultType");
  	if (PyType_Ready(&PLy_SubtransactionType) < 0)
  		elog(ERROR, "could not initialize PLy_SubtransactionType");
+ 	if (PyType_Ready(&PLy_CursorType) < 0)
+ 		elog(ERROR, "could not initialize PLy_CursorType");
  
  #if PY_MAJOR_VERSION >= 3
  	PyModule_Create(&PLy_module);
diff --git a/src/pl/plpython/sql/plpython_spi.sql b/src/pl/plpython/sql/plpython_spi.sql
index 7f8f6a3..874b31e 100644
*** a/src/pl/plpython/sql/plpython_spi.sql
--- b/src/pl/plpython/sql/plpython_spi.sql
*************** else:
*** 105,107 ****
--- 105,223 ----
  $$ LANGUAGE plpythonu;
  
  SELECT result_nrows_test();
+ 
+ 
+ -- cursor objects
+ 
+ CREATE FUNCTION simple_cursor_test() RETURNS int AS $$
+ res = plpy.cursor("select fname, lname from users")
+ does = 0
+ for row in res:
+     if row['lname'] == 'doe':
+         does += 1
+ return does
+ $$ LANGUAGE plpythonu;
+ 
+ CREATE FUNCTION double_cursor_close() RETURNS int AS $$
+ res = plpy.cursor("select fname, lname from users")
+ res.close()
+ res.close()
+ $$ LANGUAGE plpythonu;
+ 
+ CREATE FUNCTION cursor_fetch() RETURNS int AS $$
+ res = plpy.cursor("select fname, lname from users")
+ assert len(res.fetch(3)) == 3
+ assert len(res.fetch(3)) == 1
+ assert len(res.fetch(3)) == 0
+ assert len(res.fetch(3)) == 0
+ try:
+     # use next() or __next__(), the method name changed in
+     # http://www.python.org/dev/peps/pep-3114/
+     try:
+         res.next()
+     except AttributeError:
+         res.__next__()
+ except StopIteration:
+     pass
+ else:
+     assert False, "StopIteration not raised"
+ $$ LANGUAGE plpythonu;
+ 
+ CREATE FUNCTION cursor_mix_next_and_fetch() RETURNS int AS $$
+ res = plpy.cursor("select fname, lname from users order by fname")
+ assert len(res.fetch(2)) == 2
+ 
+ item = None
+ try:
+     item = res.next()
+ except AttributeError:
+     item = res.__next__()
+ assert item['fname'] == 'rick'
+ 
+ assert len(res.fetch(2)) == 1
+ $$ LANGUAGE plpythonu;
+ 
+ CREATE FUNCTION fetch_after_close() RETURNS int AS $$
+ res = plpy.cursor("select fname, lname from users")
+ res.close()
+ try:
+     res.fetch(1)
+ except ValueError:
+     pass
+ else:
+     assert False, "ValueError not raised"
+ $$ LANGUAGE plpythonu;
+ 
+ CREATE FUNCTION next_after_close() RETURNS int AS $$
+ res = plpy.cursor("select fname, lname from users")
+ res.close()
+ try:
+     try:
+         res.next()
+     except AttributeError:
+         res.__next__()
+ except ValueError:
+     pass
+ else:
+     assert False, "ValueError not raised"
+ $$ LANGUAGE plpythonu;
+ 
+ CREATE FUNCTION cursor_fetch_next_empty() RETURNS int AS $$
+ res = plpy.cursor("select fname, lname from users where false")
+ assert len(res.fetch(1)) == 0
+ try:
+     try:
+         res.next()
+     except AttributeError:
+         res.__next__()
+ except StopIteration:
+     pass
+ else:
+     assert False, "StopIteration not raised"
+ $$ LANGUAGE plpythonu;
+ 
+ CREATE FUNCTION cursor_plan() RETURNS SETOF text AS $$
+ plan = plpy.prepare(
+     "select fname, lname from users where fname like $1 || '%' order by fname",
+     ["text"])
+ for row in plpy.cursor(plan, ["w"]):
+     yield row['fname']
+ for row in plpy.cursor(plan, ["j"]):
+     yield row['fname']
+ $$ LANGUAGE plpythonu;
+ 
+ CREATE FUNCTION cursor_plan_wrong_args() RETURNS SETOF text AS $$
+ plan = plpy.prepare("select fname, lname from users where fname like $1 || '%'",
+                     ["text"])
+ c = plpy.cursor(plan, ["a", "b"])
+ $$ LANGUAGE plpythonu;
+ 
+ SELECT simple_cursor_test();
+ SELECT double_cursor_close();
+ SELECT cursor_fetch();
+ SELECT cursor_mix_next_and_fetch();
+ SELECT fetch_after_close();
+ SELECT next_after_close();
+ SELECT cursor_fetch_next_empty();
+ SELECT cursor_plan();
+ SELECT cursor_plan_wrong_args();
-- 
1.7.6.3

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to