I submitted a patch a while ago to support asynchronous connections and
queries (Ticket #49). At roughly the same time, D'Arcy started checking
in changes to support python 3, so the patch didn't work any more. I've
had my head under water since then so I haven't had a chance to get back
to it, but here's a new patch that was recently generated against head
and should apply and build cleanly.
Looking at it now, I seem to be missing unit tests. I'll see what
happened to them and send those in later.
--
Patrick McPhee
Index: docs/pg.txt
===================================================================
--- docs/pg.txt (revision 528)
+++ docs/pg.txt (working copy)
@@ -59,7 +59,7 @@
-------------------------------
Syntax::
- connect([dbname], [host], [port], [opt], [tty], [user], [passwd])
+ connect([dbname], [host], [port], [opt], [tty], [user], [passwd], [nowait])
Parameters:
:dbname: name of connected database (string/None)
@@ -69,6 +69,7 @@
:tty: debug terminal (string/None)
:user: PostgreSQL user (string/None)
:passwd: password for user (string/None)
+ :nowait: True to perform the connection asynchronously
Return type:
:pgobject: If successful, the `pgobject` handling the connection
@@ -86,6 +87,7 @@
Python tutorial. The names of the keywords are the name of the
parameters given in the syntax line. For a precise description
of the parameters, please refer to the PostgreSQL user manual.
+ See connectpoll() for a description of the nowait parameter.
Examples::
@@ -480,6 +482,95 @@
phone = con.query("select phone from employees"
" where name=$1", (name, )).getresult()
+sendquery - executes a SQL command string asynchronously
+--------------------------------------------------------
+Syntax::
+
+ sendquery(command, [args])
+
+Parameters:
+ :command: SQL command (string)
+ :args: optional positional arguments
+
+Return type:
+ :pgqueryobject
+
+Exceptions raised:
+ :TypeError: bad argument type, or too many arguments
+ :TypeError: invalid connection
+
+Description:
+ `sendquery()` is much the same as `query()`, except that it returns without
+ waiting for the query to complete. The database connection cannot be used
+ for other operations until the query completes, but the application can
+ do other things, including executing queries using other database connections.
+ The application can call `select()` using the connection's `fileno()` to
+ determine when the query has results to return.
+
+ `sendquery()` always returns a `pgqueryobject`. This object differs from
+ the `pgqueryobject` returned by `query()` in a few ways. Most importantly,
+ when `sendquery()` is used, the application must call one of the
+ result-returning methods (`getresult()`, `dictresult()`, or `namedresult()`)
+ on the `pgqueryobject` until it either throws an exception or returns `None`.
+ Otherwise, the database connection will be left in an unusable state.
+
+ In cases when `query()` would return something other than a `pgqueryobject`,
+ that result will be returned by calling one of the result-returning methods
+ on the `pgqueryobject` returned by `sendquery()`. There's one important
+ difference in these result codes: if `query()` returns `None`, the
+ result-returning methods will return an empty string (`''`). It's still
+ necessary to call a result-returning function until it returns `None`.
+
+ `listfields()`, `fieldname()`, `fieldnum()`, and `ntuples()` only work
+ after a call to a result-returning method with a non-`None` return value.
+ `ntuples()` returns only the number of rows returned by the previous
+ result-returning function.
+
+ If multiple semi-colon-delimited statements are passed to `query()`, only
+ the results of the last statement are returned in the `pgqueryobject`. With
+ `sendquery()`, all results are returned. Each result set will be
+ returned by a separate call to `getresult()`.
+
+Example::
+
+ name = raw_input("Name? ")
+ pgq = con.sendquery("select phone from employees"
+ " where name=$1", (name, ))
+ phone = pgq.getresult()
+ pgq.getresult() # to close the query
+
+ # initiate two queries in one round trip
+ # note this differs from a union since the result sets have different
+ # structures
+ pgq = con.sendquery("select a,b,c from x where d=e; select e,f from y where g")
+ qrabc = pgq.dictresult() # results from x
+ qref = pgq.dictresult() # results from y
+ pgq.dictresult() # to close the query
+
+ # using select to wait for the query to be ready
+ pgq = con.sendquery("select pg_sleep(20)")
+ r,w,e = select([con.fileno(),other,sockets],[],[])
+ if con.fileno() in r:
+ results = pgq.getresult()
+ pgq.getresult() # to close the query
+
+ # concurrent queries on separate connections
+ con1 = connect()
+ con2 = connect()
+ ss = con1.query("begin; set transaction isolation level repeatable read;"
+ "select pg_export_snapshot();").getresult()[0][0]
+ con2.query("begin; set transaction isolation level repeatable read;"
+ "set transaction snapshot '%s'" % (ss,))
+ pgq1 = con1.sendquery("select a,b,c from x where d=e")
+ pgq2 = con2.sendquery("select e,f from y where g")
+ qr1 = pgq1.getresult()
+ pgq1.getresult()
+ qr2 = pgq2.getresult()
+ pgq2.getresult()
+ con1.query("commit")
+ con2.query("commit")
+
+
reset - resets the connection
-----------------------------
Syntax::
@@ -540,6 +631,59 @@
allows you to explicitly close it. It is mainly here to allow
the DB-SIG API wrapper to implement a close function.
+connectpoll - completes an asynchronous connection
+--------------------------------------------------
+Syntax::
+
+ connectpoll()
+
+Parameters:
+ None
+
+Return type:
+ :int: PGRES_POLLING_OK, PGRES_POLLING_FAILED, PGRES_POLLING_READING or
+ PGRES_POLLING_WRITING
+
+Exceptions raised:
+ :TypeError: too many (any) arguments
+ :TypeError: invalid connection
+ :pg.InternalError: some error occurred during pg connection
+
+Description:
+ The database connection can be performed without any blocking
+ calls. This allows the application mainline to perform other
+ operations or perhaps connect to multiple databases concurrently.
+ Once the connecton is established, it's no different from a connection
+ made using blocking calls.
+
+ The required steps are to pass the parameter "nowait=True" to
+ the `connect()` call, then call `connectpoll()` until it either returns
+ `PGRES_POLLING_OK` or raises an exception. To avoid blocking in
+ `connectpoll()`, use `select()` or `poll()` to wait for the connection
+ to be readable or writable, depending on the return code of the
+ previous call to `connectpoll()`. The initial state is
+ `PGRES_POLLING_WRITING`.
+
+Example::
+
+ con = pg.connect('testdb', nowait=True)
+ conno = con.fileno()
+ rd = []
+ wt = [conno]
+ rc = pg.PGRES_POLLING_WRITING
+ while rc not in (pg.PGRES_POLLING_OK,pg.PGRES_POLLING_FAILED):
+ ra,wa,xa = select(rd, wt, [], timeout)
+ if not ra and not wa:
+ timedout()
+
+ rc = con.connectpoll()
+ if rc == pg.PGRES_POLLING_READING:
+ rd = [conno]
+ wt = []
+ else:
+ rd = []
+ wt = [conno]
+
fileno - returns the socket used to connect to the database
-----------------------------------------------------------
Syntax::
@@ -719,6 +863,47 @@
The use of direct access methods may desynchonize client and server.
This method ensure that client and server will be synchronized.
+setnonblocking - puts the connection into non-blocking mode
+-----------------------------------------------------------
+Syntax::
+
+ setnonblocking(nb)
+
+Parameters:
+ :nb: True to put the connection into non-blocking mode. False to put
+ it into blocking mode
+
+Return type:
+ None
+
+Exceptions raised:
+ :TypeError: invalid connection
+ :TypeError: too many parameters
+
+Description:
+ Puts the socket connection into non-blocking mode (or into blocking
+ mode). This affects copy commands and large object operations, but not
+ queries.
+
+isnonblocking - reports the connection's blocking status
+--------------------------------------------------------
+Syntax::
+
+ isnonblocking()
+
+Parameters:
+ None
+
+Return type:
+ :boolean: True if the connection is in non-blocking mode, False otherwise
+
+Exceptions raised:
+ :TypeError: invalid connection
+ :TypeError: too many parameters
+
+Description:
+ Returns True if the connection is in non-blocking mode. False otherwise.
+
locreate - create a large object in the database [LO]
-----------------------------------------------------
Syntax::
@@ -1158,6 +1343,8 @@
Return type:
:list: result values as a list of tuples
+ Other types are possible when the `pgqueryobject` was returned by
+ `sendquery()`
Exceptions raised:
:TypeError: too many (any) parameters
@@ -1179,6 +1366,8 @@
Return type:
:list: result values as a list of dictionaries
+ Other types are possible when the `pgqueryobject` was returned by
+ `sendquery()`
Exceptions raised:
:TypeError: too many (any) parameters
@@ -1200,6 +1389,8 @@
Return type:
:list: result values as a list of named tuples
+ Other types are possible when the `pgqueryobject` was returned by
+ `sendquery()`
Exceptions raised:
:TypeError: too many (any) parameters
Index: pg.py
===================================================================
--- pg.py (revision 895)
+++ pg.py (working copy)
@@ -1291,8 +1291,11 @@
def _namedresult(q):
"""Get query result as named tuples."""
+ # need to call this before listfields for async queries to work
+ res = q.getresult()
+ if not isinstance(res, list): return res
row = _row_factory(q.listfields())
- return [row(r) for r in q.getresult()]
+ return [row(r) for r in res]
class _MemoryQuery:
@@ -1862,6 +1865,32 @@
return self.query(*self.adapter.format_query(
command, parameters, types, inline))
+ def sendquery(self, command, *args):
+ """Execute a SQL command string.
+
+ Similar to query(), but returns without waiting for the query to
+ return. The non-query return values of query() are returned by
+ getresult(), dictresult(), or namedresult(). You must call one
+ of those functions until it returns None after calling sendquery().
+ """
+ # Wraps shared library function for debugging.
+ if not self.db:
+ raise _int_error('Connection is not valid')
+ if args:
+ self._do_debug(command, args)
+ return self.db.sendquery(command, args)
+ self._do_debug(command)
+ return self.db.sendquery(command)
+
+ def sendquery_formatted(self, command, parameters, types=None, inline=False):
+ """Execute a formatted SQL command string.
+
+ Similar to query_formatted(), but returns without waiting for the
+ query to complete. See sendquery() for caveats.
+ """
+ return self.sendquery(*self.adapter.format_query(
+ command, parameters, types, inline))
+
def pkey(self, table, composite=False, flush=False):
"""Get or set the primary key of a table.
Index: pgmodule.c
===================================================================
--- pgmodule.c (revision 895)
+++ pgmodule.c (working copy)
@@ -176,6 +176,7 @@
connObject *pgcnx; /* parent connection object */
PGresult *result; /* result content */
int encoding; /* client encoding */
+ int async; /* async flag */
} queryObject;
#define is_queryObject(v) (PyType(v) == &queryType)
@@ -2140,6 +2141,52 @@
return (PyObject *) npgobj;
}
+/* for non-query results, sets the appropriate error status, returns
+ * the appropriate value, and frees the result set */
+static PyObject * _check_result_status(int status, PGconn * cnx, PGresult * result)
+{
+ switch (status)
+ {
+ case PGRES_EMPTY_QUERY:
+ PyErr_SetString(PyExc_ValueError, "Empty query");
+ break;
+ case PGRES_BAD_RESPONSE:
+ case PGRES_FATAL_ERROR:
+ case PGRES_NONFATAL_ERROR:
+ set_error(ProgrammingError, "Cannot execute query",
+ cnx, result);
+ break;
+ case PGRES_COMMAND_OK:
+ { /* INSERT, UPDATE, DELETE */
+ Oid oid = PQoidValue(result);
+ if (oid == InvalidOid) /* not a single insert */
+ {
+ char *ret = PQcmdTuples(result);
+ PQclear(result);
+ if (ret[0]) /* return number of rows affected */
+ {
+ return PyStr_FromString(ret);
+ }
+ Py_INCREF(Py_None);
+ return Py_None;
+ }
+ /* for a single insert, return the oid */
+ PQclear(result);
+ return PyInt_FromLong(oid);
+ }
+ case PGRES_COPY_OUT: /* no data will be received */
+ case PGRES_COPY_IN:
+ PQclear(result);
+ Py_INCREF(Py_None);
+ return Py_None;
+ default:
+ set_error_msg(InternalError, "Unknown result status");
+ }
+
+ PQclear(result);
+ return NULL; /* error detected on query */
+}
+
/* database query */
static char connQuery__doc__[] =
"query(sql, [arg]) -- create a new query object for this connection\n\n"
@@ -2146,8 +2193,13 @@
"You must pass the SQL (string) request and you can optionally pass\n"
"a tuple with positional parameters.\n";
+static char connSendQuery__doc__[] =
+"sendquery(sql, [args]) -- create a new query object for this connection, using"
+" sql (string) request and optionally a tuple with positional parameters. Returns"
+" without waiting for the query to complete.";
+
static PyObject *
-connQuery(connObject *self, PyObject *args)
+_connQuery(connObject *self, PyObject *args, int async)
{
PyObject *query_obj;
PyObject *param_obj = NULL;
@@ -2287,8 +2339,17 @@
}
Py_BEGIN_ALLOW_THREADS
- result = PQexecParams(self->cnx, query, nparms,
- NULL, (const char * const *)parms, NULL, NULL, 0);
+ if (async)
+ {
+ status = PQsendQueryParams(self->cnx, query, nparms,
+ NULL, (const char * const *)parms, NULL, NULL, 0);
+ result = NULL;
+ }
+ else
+ {
+ result = PQexecParams(self->cnx, query, nparms,
+ NULL, (const char * const *)parms, NULL, NULL, 0);
+ }
Py_END_ALLOW_THREADS
PyMem_Free(parms);
@@ -2298,7 +2359,15 @@
else
{
Py_BEGIN_ALLOW_THREADS
- result = PQexec(self->cnx, query);
+ if (async)
+ {
+ status = PQsendQuery(self->cnx, query);
+ result = NULL;
+ }
+ else
+ {
+ result = PQexec(self->cnx, query);
+ }
Py_END_ALLOW_THREADS
}
@@ -2307,7 +2376,7 @@
Py_XDECREF(param_obj);
/* checks result validity */
- if (!result)
+ if ((!async && !result) || (async && !status))
{
PyErr_SetString(PyExc_ValueError, PQerrorMessage(self->cnx));
return NULL;
@@ -2318,49 +2387,9 @@
self->date_format = date_format; /* this is normally NULL */
/* checks result status */
- if ((status = PQresultStatus(result)) != PGRES_TUPLES_OK)
+ if (!async && (status = PQresultStatus(result)) != PGRES_TUPLES_OK)
{
- switch (status)
- {
- case PGRES_EMPTY_QUERY:
- PyErr_SetString(PyExc_ValueError, "Empty query");
- break;
- case PGRES_BAD_RESPONSE:
- case PGRES_FATAL_ERROR:
- case PGRES_NONFATAL_ERROR:
- set_error(ProgrammingError, "Cannot execute query",
- self->cnx, result);
- break;
- case PGRES_COMMAND_OK:
- { /* INSERT, UPDATE, DELETE */
- Oid oid = PQoidValue(result);
- if (oid == InvalidOid) /* not a single insert */
- {
- char *ret = PQcmdTuples(result);
-
- PQclear(result);
- if (ret[0]) /* return number of rows affected */
- {
- return PyStr_FromString(ret);
- }
- Py_INCREF(Py_None);
- return Py_None;
- }
- /* for a single insert, return the oid */
- PQclear(result);
- return PyInt_FromLong(oid);
- }
- case PGRES_COPY_OUT: /* no data will be received */
- case PGRES_COPY_IN:
- PQclear(result);
- Py_INCREF(Py_None);
- return Py_None;
- default:
- set_error_msg(InternalError, "Unknown result status");
- }
-
- PQclear(result);
- return NULL; /* error detected on query */
+ return _check_result_status(status, self->cnx, result);
}
if (!(npgobj = PyObject_NEW(queryObject, &queryType)))
@@ -2371,9 +2400,23 @@
npgobj->pgcnx = self;
npgobj->result = result;
npgobj->encoding = encoding;
+ npgobj->async = async;
return (PyObject *) npgobj;
}
+static PyObject *
+connQuery(connObject *self, PyObject *args)
+{
+ return _connQuery(self, args, 0);
+}
+
+static PyObject *
+connSendQuery(connObject *self, PyObject *args )
+{
+ return _connQuery(self, args, 1);
+}
+
+
#ifdef DIRECT_ACCESS
static char connPutLine__doc__[] =
"putline(line) -- send a line directly to the backend";
@@ -2466,6 +2509,74 @@
Py_INCREF(Py_None);
return Py_None;
}
+
+
+/* direct access function : setnonblocking */
+static char connSetNonBlocking__doc__[] =
+"setnonblocking() -- puts direct copy functions in non-blocking mode";
+
+static PyObject *
+connSetNonBlocking(connObject *self, PyObject *args)
+{
+ int nonblocking;
+
+ if (!self->cnx)
+ {
+ PyErr_SetString(PyExc_TypeError, "Connection is not valid.");
+ return NULL;
+ }
+
+ /* reads args */
+ if (!PyArg_ParseTuple(args, "i", &nonblocking))
+ {
+ PyErr_SetString(PyExc_TypeError, "setnonblocking(tf), with boolean.");
+ return NULL;
+ }
+
+ /* returns -1 on error, or 0 on success */
+ if (PQsetnonblocking(self->cnx, nonblocking) < 0)
+ {
+ PyErr_SetString(PyExc_IOError, PQerrorMessage(self->cnx));
+ return NULL;
+ }
+ Py_INCREF(Py_None);
+ return Py_None;
+}
+
+/* direct access function : isnonblocking */
+static char connIsNonBlocking__doc__[] =
+"isnonblocking() -- reports the non-blocking status of copy functions";
+
+static PyObject *
+connIsNonBlocking(connObject *self, PyObject *args)
+{
+ int rc;
+
+ if (!self->cnx)
+ {
+ PyErr_SetString(PyExc_TypeError, "Connection is not valid.");
+ return NULL;
+ }
+
+ /* reads args */
+ if (!PyArg_ParseTuple(args, ""))
+ {
+ PyErr_SetString(PyExc_TypeError,
+ "method isnonblocking() takes no parameters.");
+ return NULL;
+ }
+
+ /* returns 1 if blocking, 0 otherwise. not sure if it can return -1 */
+ rc = PQisnonblocking(self->cnx);
+ if (rc < 0)
+ {
+ PyErr_SetString(PyExc_IOError, PQerrorMessage(self->cnx));
+ return NULL;
+ }
+
+ return PyBool_FromLong(rc);
+}
+
#endif /* DIRECT_ACCESS */
/* return query as string in human readable form */
@@ -3260,6 +3371,44 @@
return ret;
}
+/* get asynchronous connection state */
+static char connConnectPoll__doc__[] =
+"Initiates phases of the asynchronous connection process"
+" and returns the connection state.";
+
+static PyObject *
+connConnectPoll(connObject *self, PyObject *args)
+{
+ int rc;
+
+ if (!self->cnx)
+ {
+ PyErr_SetString(PyExc_TypeError, "Connection is not valid.");
+ return NULL;
+ }
+
+ /* checks args */
+ if (!PyArg_ParseTuple(args, ""))
+ {
+ PyErr_SetString(PyExc_TypeError,
+ "method connectPoll() takes no parameters.");
+ return NULL;
+ }
+
+ Py_BEGIN_ALLOW_THREADS
+ rc =PQconnectPoll(self->cnx);
+ Py_END_ALLOW_THREADS
+
+ if (rc == PGRES_POLLING_FAILED)
+ {
+ set_error_msg(InternalError, PQerrorMessage(self->cnx));
+ Py_XDECREF(self);
+ return NULL;
+ }
+
+ return PyInt_FromLong(rc);
+}
+
/* set notice receiver callback function */
static char connSetNoticeReceiver__doc__[] =
"set_notice_receiver(func) -- set the current notice receiver";
@@ -3410,9 +3559,12 @@
{"source", (PyCFunction) connSource, METH_NOARGS, connSource__doc__},
{"query", (PyCFunction) connQuery, METH_VARARGS, connQuery__doc__},
+ {"sendquery", (PyCFunction) connSendQuery, METH_VARARGS, connSendQuery__doc__},
{"reset", (PyCFunction) connReset, METH_NOARGS, connReset__doc__},
{"cancel", (PyCFunction) connCancel, METH_NOARGS, connCancel__doc__},
{"close", (PyCFunction) connClose, METH_NOARGS, connClose__doc__},
+ {"connectpoll", (PyCFunction) connConnectPoll, METH_VARARGS,
+ connConnectPoll__doc__},
{"fileno", (PyCFunction) connFileno, METH_NOARGS, connFileno__doc__},
{"get_cast_hook", (PyCFunction) connGetCastHook, METH_NOARGS,
connGetCastHook__doc__},
@@ -3448,6 +3600,8 @@
{"putline", (PyCFunction) connPutLine, METH_VARARGS, connPutLine__doc__},
{"getline", (PyCFunction) connGetLine, METH_NOARGS, connGetLine__doc__},
{"endcopy", (PyCFunction) connEndCopy, METH_NOARGS, connEndCopy__doc__},
+ {"setnonblocking", (PyCFunction) connSetNonBlocking, 1, connSetNonBlocking__doc__},
+ {"isnonblocking", (PyCFunction) connIsNonBlocking, 1, connIsNonBlocking__doc__},
#endif /* DIRECT_ACCESS */
#ifdef LARGE_OBJECTS
@@ -4408,7 +4562,7 @@
/* connects to a database */
static char pgConnect__doc__[] =
-"connect(dbname, host, port, opt) -- connect to a PostgreSQL database\n\n"
+"connect(dbname, host, port, opt, user, password, nowait) -- connect to a PostgreSQL database\n\n"
"The connection uses the specified parameters (optional, keywords aware).\n";
static PyObject *
@@ -4415,7 +4569,7 @@
pgConnect(PyObject *self, PyObject *args, PyObject *dict)
{
static const char *kwlist[] = {"dbname", "host", "port", "opt",
- "user", "passwd", NULL};
+ "user", "passwd", "nowait", NULL};
char *pghost,
*pgopt,
@@ -4422,8 +4576,12 @@
*pgdbname,
*pguser,
*pgpasswd;
- int pgport;
+ int pgport,
+ nowait = 0,
+ kwcount = 0;
char port_buffer[20];
+ const char *keywords[sizeof(kwlist)/sizeof(*kwlist)+1],
+ *values[sizeof(kwlist)/sizeof(*kwlist)+1];
connObject *npgobj;
pghost = pgopt = pgdbname = pguser = pgpasswd = NULL;
@@ -4435,8 +4593,8 @@
* don't declare kwlist as const char *kwlist[] then it complains when
* I try to assign all those constant strings to it.
*/
- if (!PyArg_ParseTupleAndKeywords(args, dict, "|zzizzz", (char **) kwlist,
- &pgdbname, &pghost, &pgport, &pgopt, &pguser, &pgpasswd))
+ if (!PyArg_ParseTupleAndKeywords(args, dict, "|zzizzzi", (char **) kwlist,
+ &pgdbname, &pghost, &pgport, &pgopt, &pguser, &pgpasswd, &nowait))
return NULL;
#ifdef DEFAULT_VARS
@@ -4479,8 +4637,55 @@
}
Py_BEGIN_ALLOW_THREADS
- npgobj->cnx = PQsetdbLogin(pghost, pgport == -1 ? NULL : port_buffer,
- pgopt, NULL, pgdbname, pguser, pgpasswd);
+ if (!nowait)
+ {
+ npgobj->cnx = PQsetdbLogin(pghost, pgport == -1 ? NULL : port_buffer,
+ pgopt, NULL, pgdbname, pguser, pgpasswd);
+ }
+ else
+ {
+ if (pghost)
+ {
+ keywords[kwcount] = "host";
+ values[kwcount] = pghost;
+ kwcount++;
+ }
+ if (pgopt)
+ {
+ keywords[kwcount] = "options";
+ values[kwcount] = pgopt;
+ kwcount++;
+ }
+ if (pgdbname)
+ {
+ keywords[kwcount] = "dbname";
+ values[kwcount] = pgdbname;
+ kwcount++;
+ }
+ if (pguser)
+ {
+ keywords[kwcount] = "user";
+ values[kwcount] = pguser;
+ kwcount++;
+ }
+ if (pgpasswd)
+ {
+ keywords[kwcount] = "password";
+ values[kwcount] = pgpasswd;
+ kwcount++;
+ }
+ if (pgport != -1)
+ {
+ keywords[kwcount] = "port";
+ values[kwcount] = port_buffer;
+ kwcount++;
+ }
+ keywords[kwcount] = values[kwcount] = NULL;
+
+ /* The final argument means dbname can be a full connection string,
+ * which is always the case for PQsetdbLogin() */
+ npgobj->cnx = PQconnectStartParams(keywords, values, 1);
+ }
Py_END_ALLOW_THREADS
if (PQstatus(npgobj->cnx) == CONNECTION_BAD)
@@ -4598,6 +4803,78 @@
return PyInt_FromLong(num);
}
+
+/* Retrieve the result set after an async query. This is destructive
+ * so it should only be done when we want the result set (i.e., not
+ * just for an ntuples call). It may also return a non-result value
+ * through *pval. Returns 1 if it's doing that, or 0 otherwise. */
+static int
+queryAsyncResult(queryObject *self, PyObject **pval)
+{
+ /* this is following an async call, so need to get the result first */
+ if (self->async)
+ {
+ int status;
+
+ if (!self->pgcnx)
+ {
+ PyErr_SetString(PyExc_TypeError, "Connection is not valid.");
+ *pval = NULL;
+ return 1;
+ }
+
+ Py_BEGIN_ALLOW_THREADS
+ if (self->result)
+ {
+ PQclear(self->result);
+ }
+ self->result = PQgetResult(self->pgcnx->cnx);
+ Py_END_ALLOW_THREADS
+ /* end of result set, return None */
+ if (!self->result)
+ {
+ Py_DECREF(self->pgcnx);
+ self->pgcnx = NULL;
+ Py_INCREF(Py_None);
+ *pval = Py_None;
+ return 1;
+ }
+
+ if ((status = PQresultStatus(self->result)) != PGRES_TUPLES_OK)
+ {
+ *pval = _check_result_status(status, self->pgcnx->cnx, self->result);
+ /* _check_result_status calls PQclear() so we need to clear this
+ * attribute */
+ self->result = NULL;
+ /* throwing an exception. Need to call PQgetResult() to clear the
+ * connection state. This should return NULL the first time. */
+ if (!*pval)
+ {
+ self->result = PQgetResult(self->pgcnx->cnx);
+ while (self->result)
+ {
+ PQclear(self->result);
+ self->result = PQgetResult(self->pgcnx->cnx);
+ }
+ }
+ /* it's confusing to return None here because the caller has to
+ * call again until we return None. We can't just consume that
+ * final None because we don't know if there are additional
+ * statements following this one, so we return an empty string where
+ * query() would return None */
+ else if (*pval == Py_None)
+ {
+ Py_DECREF(*pval);
+ *pval = PyString_FromString("");
+ }
+ return 1;
+ }
+ }
+ return 0;
+}
+
+
+
/* retrieves last result */
static char queryGetResult__doc__[] =
"getresult() -- Get the result of a query\n\n"
@@ -4607,10 +4884,16 @@
static PyObject *
queryGetResult(queryObject *self, PyObject *noargs)
{
- PyObject *reslist;
+ PyObject *reslist,
+ *val;
int i, m, n, *col_types;
int encoding = self->encoding;
+ if (queryAsyncResult(self, &val))
+ {
+ return val;
+ }
+
/* stores result in tuple */
m = PQntuples(self->result);
n = PQnfields(self->result);
@@ -4632,8 +4915,6 @@
for (j = 0; j < n; ++j)
{
- PyObject * val;
-
if (PQgetisnull(self->result, i, j))
{
Py_INCREF(Py_None);
@@ -4693,7 +4974,8 @@
static PyObject *
queryDictResult(queryObject *self, PyObject *noargs)
{
- PyObject *reslist;
+ PyObject *reslist,
+ *val;
int i,
m,
n,
@@ -4700,6 +4982,11 @@
*col_types;
int encoding = self->encoding;
+ if (queryAsyncResult(self, &val))
+ {
+ return val;
+ }
+
/* stores result in list */
m = PQntuples(self->result);
n = PQnfields(self->result);
@@ -6034,6 +6321,12 @@
PyDict_SetItemString(dict, "SEEK_END", PyInt_FromLong(SEEK_END));
#endif /* LARGE_OBJECTS */
+ /* PQconnectPoll() results */
+ PyDict_SetItemString(dict,"PGRES_POLLING_OK",PyInt_FromLong(PGRES_POLLING_OK));
+ PyDict_SetItemString(dict,"PGRES_POLLING_FAILED",PyInt_FromLong(PGRES_POLLING_FAILED));
+ PyDict_SetItemString(dict,"PGRES_POLLING_READING",PyInt_FromLong(PGRES_POLLING_READING));
+ PyDict_SetItemString(dict,"PGRES_POLLING_WRITING",PyInt_FromLong(PGRES_POLLING_WRITING));
+
#ifdef DEFAULT_VARS
/* prepares default values */
Py_INCREF(Py_None);
_______________________________________________
PyGreSQL mailing list
[email protected]
https://mail.vex.net/mailman/listinfo.cgi/pygresql