I submitted a patch a while ago to support asynchronous connections and
queries (Ticket #49). At roughly the same time, D'Arcy started checking
in changes to support python 3, so the patch didn't work any more. I've
had my head under water since then so I haven't had a chance to get back
to it, but here's a new patch that was recently generated against head
and should apply and build cleanly.

Looking at it now, I seem to be missing unit tests. I'll see what
happened to them and send those in later.


-- 
Patrick McPhee

Index: docs/pg.txt
===================================================================
--- docs/pg.txt	(revision 528)
+++ docs/pg.txt	(working copy)
@@ -59,7 +59,7 @@
 -------------------------------
 Syntax::
 
-  connect([dbname], [host], [port], [opt], [tty], [user], [passwd])
+  connect([dbname], [host], [port], [opt], [tty], [user], [passwd], [nowait])
 
 Parameters:
   :dbname: name of connected database (string/None)
@@ -69,6 +69,7 @@
   :tty:    debug terminal (string/None)
   :user:   PostgreSQL user (string/None)
   :passwd: password for user (string/None)
+  :nowait: True to perform the connection asynchronously
 
 Return type:
   :pgobject: If successful, the `pgobject` handling the connection
@@ -86,6 +87,7 @@
   Python tutorial. The names of the keywords are the name of the
   parameters given in the syntax line. For a precise description
   of the parameters, please refer to the PostgreSQL user manual.
+  See connectpoll() for a description of the nowait parameter.
 
 Examples::
 
@@ -480,6 +482,95 @@
   phone = con.query("select phone from employees"
     " where name=$1", (name, )).getresult()
 
+sendquery - executes a SQL command string asynchronously
+--------------------------------------------------------
+Syntax::
+
+  sendquery(command, [args])
+
+Parameters:
+  :command: SQL command (string)
+  :args: optional positional arguments
+
+Return type:
+  :pgqueryobject
+
+Exceptions raised:
+  :TypeError: bad argument type, or too many arguments
+  :TypeError: invalid connection
+
+Description:
+  `sendquery()` is much the same as `query()`, except that it returns without
+  waiting for the query to complete. The database connection cannot be used
+  for other operations until the query completes, but the application can
+  do other things, including executing queries using other database connections.
+  The application can call `select()` using the connection's `fileno()` to
+  determine when the query has results to return.
+
+  `sendquery()` always returns a `pgqueryobject`. This object differs from
+  the `pgqueryobject` returned by `query()` in a few ways. Most importantly,
+  when `sendquery()` is used, the application must call one of the
+  result-returning methods (`getresult()`, `dictresult()`, or `namedresult()`)
+  on the `pgqueryobject` until it either throws an exception or returns `None`.
+  Otherwise, the database connection will be left in an unusable state.
+
+  In cases when `query()` would return something other than a `pgqueryobject`,
+  that result will be returned by calling one of the result-returning methods
+  on the `pgqueryobject` returned by `sendquery()`. There's one important
+  difference in these result codes: if `query()` returns `None`, the
+  result-returning methods will return an empty string (`''`). It's still
+  necessary to call a result-returning function until it returns `None`.
+
+  `listfields()`, `fieldname()`, `fieldnum()`, and `ntuples()` only work
+  after a call to a result-returning method with a non-`None` return value.
+  `ntuples()` returns only the number of rows returned by the previous
+  result-returning function. 
+
+  If multiple semi-colon-delimited statements are passed to `query()`, only
+  the results of the last statement are returned in the `pgqueryobject`. With
+  `sendquery()`, all results are returned. Each result set will be
+  returned by a separate call to `getresult()`.
+
+Example::
+
+  name = raw_input("Name? ")
+  pgq = con.sendquery("select phone from employees"
+    " where name=$1", (name, ))
+  phone = pgq.getresult()
+  pgq.getresult() # to close the query
+
+  # initiate two queries in one round trip
+  # note this differs from a union since the result sets have different
+  # structures
+  pgq = con.sendquery("select a,b,c from x where d=e; select e,f from y where g")
+  qrabc = pgq.dictresult() # results from x
+  qref = pgq.dictresult()  # results from y
+  pgq.dictresult()         # to close the query
+
+  # using select to wait for the query to be ready
+  pgq = con.sendquery("select pg_sleep(20)")
+  r,w,e = select([con.fileno(),other,sockets],[],[])
+  if con.fileno() in r:
+      results = pgq.getresult()
+      pgq.getresult() # to close the query
+
+  # concurrent queries on separate connections
+  con1 = connect()
+  con2 = connect()
+  ss = con1.query("begin; set transaction isolation level repeatable read;"
+                  "select pg_export_snapshot();").getresult()[0][0]
+  con2.query("begin; set transaction isolation level repeatable read;"
+             "set transaction snapshot '%s'" % (ss,))
+  pgq1 = con1.sendquery("select a,b,c from x where d=e")
+  pgq2 = con2.sendquery("select e,f from y where g")
+  qr1 = pgq1.getresult()
+  pgq1.getresult()
+  qr2 = pgq2.getresult()
+  pgq2.getresult()
+  con1.query("commit")
+  con2.query("commit")
+
+
 reset - resets the connection
 -----------------------------
 Syntax::
@@ -540,6 +631,59 @@
   allows you to explicitly close it. It is mainly here to allow
   the DB-SIG API wrapper to implement a close function.
 
+connectpoll - completes an asynchronous connection
+--------------------------------------------------
+Syntax::
+
+  connectpoll()
+
+Parameters:
+  None
+
+Return type:
+  :int: PGRES_POLLING_OK, PGRES_POLLING_FAILED, PGRES_POLLING_READING or
+        PGRES_POLLING_WRITING
+
+Exceptions raised:
+  :TypeError: too many (any) arguments
+  :TypeError: invalid connection
+  :pg.InternalError: some error occurred during pg connection
+
+Description:
+  The database connection can be performed without any blocking
+  calls. This allows the application mainline to perform other 
+  operations or perhaps connect to multiple databases concurrently.
+  Once the connecton is established, it's no different from a connection
+  made using blocking calls.
+
+  The required steps are to pass the parameter "nowait=True" to
+  the `connect()` call, then call `connectpoll()` until it either returns
+  `PGRES_POLLING_OK` or raises an exception. To avoid blocking in
+  `connectpoll()`, use `select()` or `poll()` to wait for the connection
+  to be readable or writable, depending on the return code of the
+  previous call to `connectpoll()`. The initial state is
+  `PGRES_POLLING_WRITING`.
+
+Example::
+
+  con = pg.connect('testdb', nowait=True)
+  conno = con.fileno()
+  rd = []
+  wt = [conno]
+  rc = pg.PGRES_POLLING_WRITING
+  while rc not in (pg.PGRES_POLLING_OK,pg.PGRES_POLLING_FAILED):
+      ra,wa,xa = select(rd, wt, [], timeout)
+      if not ra and not wa:
+          timedout()
+
+      rc = con.connectpoll()
+      if rc == pg.PGRES_POLLING_READING:
+          rd = [conno]
+          wt = []
+      else:
+          rd = []
+          wt = [conno]
+          
 fileno - returns the socket used to connect to the database
 -----------------------------------------------------------
 Syntax::
@@ -719,6 +863,47 @@
   The use of direct access methods may desynchonize client and server.
   This method ensure that client and server will be synchronized.
 
+setnonblocking - puts the connection into non-blocking mode
+-----------------------------------------------------------
+Syntax::
+
+  setnonblocking(nb)
+
+Parameters:
+  :nb: True to put the connection into non-blocking mode. False to put
+       it into blocking mode
+
+Return type:
+  None
+
+Exceptions raised:
+  :TypeError: invalid connection
+  :TypeError: too many parameters
+
+Description:
+  Puts the socket connection into non-blocking mode (or into blocking
+  mode). This affects copy commands and large object operations, but not
+  queries.
+
+isnonblocking - reports the connection's blocking status
+--------------------------------------------------------
+Syntax::
+
+  isnonblocking()
+
+Parameters:
+  None
+
+Return type:
+  :boolean: True if the connection is in non-blocking mode, False otherwise 
+
+Exceptions raised:
+  :TypeError: invalid connection
+  :TypeError: too many parameters
+
+Description:
+  Returns True if the connection is in non-blocking mode. False otherwise.
+
 locreate - create a large object in the database [LO]
 -----------------------------------------------------
 Syntax::
@@ -1158,6 +1343,8 @@
 
 Return type:
   :list: result values as a list of tuples
+  Other types are possible when the `pgqueryobject` was returned by
+  `sendquery()`
 
 Exceptions raised:
   :TypeError: too many (any) parameters
@@ -1179,6 +1366,8 @@
 
 Return type:
   :list: result values as a list of dictionaries
+  Other types are possible when the `pgqueryobject` was returned by
+  `sendquery()`
 
 Exceptions raised:
   :TypeError: too many (any) parameters
@@ -1200,6 +1389,8 @@
 
 Return type:
   :list: result values as a list of named tuples
+  Other types are possible when the `pgqueryobject` was returned by
+  `sendquery()`
 
 Exceptions raised:
   :TypeError: too many (any) parameters
Index: pg.py
===================================================================
--- pg.py	(revision 895)
+++ pg.py	(working copy)
@@ -1291,8 +1291,11 @@
 
 def _namedresult(q):
     """Get query result as named tuples."""
+    # need to call this before listfields for async queries to work
+    res = q.getresult()
+    if not isinstance(res, list): return res
     row = _row_factory(q.listfields())
-    return [row(r) for r in q.getresult()]
+    return [row(r) for r in res]
 
 
 class _MemoryQuery:
@@ -1862,6 +1865,32 @@
         return self.query(*self.adapter.format_query(
             command, parameters, types, inline))
 
+    def sendquery(self, command, *args):
+        """Execute a SQL command string.
+
+        Similar to query(), but returns without waiting for the query to
+        return. The non-query return values of query() are returned by
+	getresult(), dictresult(), or namedresult(). You must call one
+	of those functions until it returns None after calling sendquery().
+        """
+        # Wraps shared library function for debugging.
+        if not self.db:
+            raise _int_error('Connection is not valid')
+        if args:
+            self._do_debug(command, args)
+            return self.db.sendquery(command, args)
+        self._do_debug(command)
+        return self.db.sendquery(command)
+
+    def sendquery_formatted(self, command, parameters, types=None, inline=False):
+        """Execute a formatted SQL command string.
+
+        Similar to query_formatted(), but returns without waiting for the
+        query to complete. See sendquery() for caveats.
+        """
+        return self.sendquery(*self.adapter.format_query(
+            command, parameters, types, inline))
+
     def pkey(self, table, composite=False, flush=False):
         """Get or set the primary key of a table.
 
Index: pgmodule.c
===================================================================
--- pgmodule.c	(revision 895)
+++ pgmodule.c	(working copy)
@@ -176,6 +176,7 @@
 	connObject *pgcnx;			/* parent connection object */
 	PGresult   *result;			/* result content */
 	int			encoding; 		/* client encoding */
+	int			async;			/* async flag */
 }	queryObject;
 #define is_queryObject(v) (PyType(v) == &queryType)
 
@@ -2140,6 +2141,52 @@
 	return (PyObject *) npgobj;
 }
 
+/* for non-query results, sets the appropriate error status, returns
+ * the appropriate value, and frees the result set */
+static PyObject * _check_result_status(int status, PGconn * cnx, PGresult * result)
+{
+	switch (status)
+	{
+		case PGRES_EMPTY_QUERY:
+			PyErr_SetString(PyExc_ValueError, "Empty query");
+			break;
+		case PGRES_BAD_RESPONSE:
+		case PGRES_FATAL_ERROR:
+		case PGRES_NONFATAL_ERROR:
+			set_error(ProgrammingError, "Cannot execute query",
+					cnx, result);
+			break;
+		case PGRES_COMMAND_OK:
+			{						/* INSERT, UPDATE, DELETE */
+				Oid		oid = PQoidValue(result);
+				if (oid == InvalidOid)	/* not a single insert */
+				{
+					char	*ret = PQcmdTuples(result);
+						PQclear(result);
+					if (ret[0])		/* return number of rows affected */
+					{
+						return PyStr_FromString(ret);
+					}
+					Py_INCREF(Py_None);
+					return Py_None;
+				}
+				/* for a single insert, return the oid */
+				PQclear(result);
+				return PyInt_FromLong(oid);
+			}
+		case PGRES_COPY_OUT:		/* no data will be received */
+		case PGRES_COPY_IN:
+			PQclear(result);
+			Py_INCREF(Py_None);
+			return Py_None;
+		default:
+			set_error_msg(InternalError, "Unknown result status");
+	}
+
+	PQclear(result);
+	return NULL;			/* error detected on query */
+}
+
 /* database query */
 static char connQuery__doc__[] =
 "query(sql, [arg]) -- create a new query object for this connection\n\n"
@@ -2146,8 +2193,13 @@
 "You must pass the SQL (string) request and you can optionally pass\n"
 "a tuple with positional parameters.\n";
 
+static char connSendQuery__doc__[] =
+"sendquery(sql, [args]) -- create a new query object for this connection, using"
+" sql (string) request and optionally a tuple with positional parameters. Returns"
+" without waiting for the query to complete.";
+
 static PyObject *
-connQuery(connObject *self, PyObject *args)
+_connQuery(connObject *self, PyObject *args, int async)
 {
 	PyObject	*query_obj;
 	PyObject	*param_obj = NULL;
@@ -2287,8 +2339,17 @@
 		}
 
 		Py_BEGIN_ALLOW_THREADS
-		result = PQexecParams(self->cnx, query, nparms,
-			NULL, (const char * const *)parms, NULL, NULL, 0);
+		if (async)
+		{
+			status = PQsendQueryParams(self->cnx, query, nparms,
+				NULL, (const char * const *)parms, NULL, NULL, 0);
+			result = NULL;
+		}
+		else
+		{
+			result = PQexecParams(self->cnx, query, nparms,
+				NULL, (const char * const *)parms, NULL, NULL, 0);
+		}
 		Py_END_ALLOW_THREADS
 
 		PyMem_Free(parms);
@@ -2298,7 +2359,15 @@
 	else
 	{
 		Py_BEGIN_ALLOW_THREADS
-		result = PQexec(self->cnx, query);
+		if (async)
+		{
+			status = PQsendQuery(self->cnx, query);
+			result = NULL;
+		}
+		else
+		{
+			result = PQexec(self->cnx, query);
+		}
 		Py_END_ALLOW_THREADS
 	}
 
@@ -2307,7 +2376,7 @@
 	Py_XDECREF(param_obj);
 
 	/* checks result validity */
-	if (!result)
+	if ((!async && !result) || (async && !status))
 	{
 		PyErr_SetString(PyExc_ValueError, PQerrorMessage(self->cnx));
 		return NULL;
@@ -2318,49 +2387,9 @@
 	self->date_format = date_format; /* this is normally NULL */
 
 	/* checks result status */
-	if ((status = PQresultStatus(result)) != PGRES_TUPLES_OK)
+	if (!async && (status = PQresultStatus(result)) != PGRES_TUPLES_OK)
 	{
-		switch (status)
-		{
-			case PGRES_EMPTY_QUERY:
-				PyErr_SetString(PyExc_ValueError, "Empty query");
-				break;
-			case PGRES_BAD_RESPONSE:
-			case PGRES_FATAL_ERROR:
-			case PGRES_NONFATAL_ERROR:
-				set_error(ProgrammingError, "Cannot execute query",
-					self->cnx, result);
-				break;
-			case PGRES_COMMAND_OK:
-				{						/* INSERT, UPDATE, DELETE */
-					Oid		oid = PQoidValue(result);
-					if (oid == InvalidOid)	/* not a single insert */
-					{
-						char	*ret = PQcmdTuples(result);
-
-						PQclear(result);
-						if (ret[0])		/* return number of rows affected */
-						{
-							return PyStr_FromString(ret);
-						}
-						Py_INCREF(Py_None);
-						return Py_None;
-					}
-					/* for a single insert, return the oid */
-					PQclear(result);
-					return PyInt_FromLong(oid);
-				}
-			case PGRES_COPY_OUT:		/* no data will be received */
-			case PGRES_COPY_IN:
-				PQclear(result);
-				Py_INCREF(Py_None);
-				return Py_None;
-			default:
-				set_error_msg(InternalError, "Unknown result status");
-		}
-
-		PQclear(result);
-		return NULL;			/* error detected on query */
+		return _check_result_status(status, self->cnx, result);
 	}
 
 	if (!(npgobj = PyObject_NEW(queryObject, &queryType)))
@@ -2371,9 +2400,23 @@
 	npgobj->pgcnx = self;
 	npgobj->result = result;
 	npgobj->encoding = encoding;
+	npgobj->async = async;
 	return (PyObject *) npgobj;
 }
 
+static PyObject *
+connQuery(connObject *self, PyObject *args)
+{
+	return _connQuery(self, args, 0);
+}
+
+static PyObject *
+connSendQuery(connObject *self, PyObject *args )
+{
+	return _connQuery(self, args, 1);
+}
+
+
 #ifdef DIRECT_ACCESS
 static char connPutLine__doc__[] =
 "putline(line) -- send a line directly to the backend";
@@ -2466,6 +2509,74 @@
 	Py_INCREF(Py_None);
 	return Py_None;
 }
+
+
+/* direct access function : setnonblocking */
+static char connSetNonBlocking__doc__[] =
+"setnonblocking() -- puts direct copy functions in non-blocking mode";
+
+static PyObject *
+connSetNonBlocking(connObject *self, PyObject *args)
+{
+	int nonblocking;
+
+	if (!self->cnx)
+	{
+		PyErr_SetString(PyExc_TypeError, "Connection is not valid.");
+		return NULL;
+	}
+
+	/* reads args */
+	if (!PyArg_ParseTuple(args, "i", &nonblocking))
+	{
+		PyErr_SetString(PyExc_TypeError, "setnonblocking(tf), with boolean.");
+		return NULL;
+	}
+
+	/* returns -1 on error, or 0 on success */
+	if (PQsetnonblocking(self->cnx, nonblocking) < 0)
+	{
+		PyErr_SetString(PyExc_IOError, PQerrorMessage(self->cnx));
+		return NULL;
+	}
+	Py_INCREF(Py_None);
+	return Py_None;
+}
+
+/* direct access function : isnonblocking */
+static char connIsNonBlocking__doc__[] =
+"isnonblocking() -- reports the non-blocking status of copy functions";
+
+static PyObject *
+connIsNonBlocking(connObject *self, PyObject *args)
+{
+	int rc;
+
+	if (!self->cnx)
+	{
+		PyErr_SetString(PyExc_TypeError, "Connection is not valid.");
+		return NULL;
+	}
+
+	/* reads args */
+	if (!PyArg_ParseTuple(args, ""))
+	{
+		PyErr_SetString(PyExc_TypeError,
+			"method isnonblocking() takes no parameters.");
+		return NULL;
+	}
+
+	/* returns 1 if blocking, 0 otherwise. not sure if it can return -1 */
+	rc = PQisnonblocking(self->cnx);
+	if (rc < 0)
+	{
+		PyErr_SetString(PyExc_IOError, PQerrorMessage(self->cnx));
+		return NULL;
+	}
+
+	return PyBool_FromLong(rc);
+}
+
 #endif /* DIRECT_ACCESS */
 
 /* return query as string in human readable form */
@@ -3260,6 +3371,44 @@
 	return ret;
 }
 
+/* get asynchronous connection state */
+static char connConnectPoll__doc__[] =
+"Initiates phases of the asynchronous connection process"
+" and returns the connection state.";
+
+static PyObject *
+connConnectPoll(connObject *self, PyObject *args)
+{
+	int rc;
+
+	if (!self->cnx)
+	{
+		PyErr_SetString(PyExc_TypeError, "Connection is not valid.");
+		return NULL;
+	}
+
+	/* checks args */
+	if (!PyArg_ParseTuple(args, ""))
+	{
+		PyErr_SetString(PyExc_TypeError,
+			"method connectPoll() takes no parameters.");
+		return NULL;
+	}
+
+	Py_BEGIN_ALLOW_THREADS
+	rc =PQconnectPoll(self->cnx);
+	Py_END_ALLOW_THREADS
+
+	if (rc == PGRES_POLLING_FAILED)
+	{
+		set_error_msg(InternalError, PQerrorMessage(self->cnx));
+		Py_XDECREF(self);
+		return NULL;
+	}
+
+	return PyInt_FromLong(rc);
+}
+
 /* set notice receiver callback function */
 static char connSetNoticeReceiver__doc__[] =
 "set_notice_receiver(func) -- set the current notice receiver";
@@ -3410,9 +3559,12 @@
 
 	{"source", (PyCFunction) connSource, METH_NOARGS, connSource__doc__},
 	{"query", (PyCFunction) connQuery, METH_VARARGS, connQuery__doc__},
+	{"sendquery", (PyCFunction) connSendQuery, METH_VARARGS, connSendQuery__doc__},
 	{"reset", (PyCFunction) connReset, METH_NOARGS, connReset__doc__},
 	{"cancel", (PyCFunction) connCancel, METH_NOARGS, connCancel__doc__},
 	{"close", (PyCFunction) connClose, METH_NOARGS, connClose__doc__},
+	{"connectpoll", (PyCFunction) connConnectPoll, METH_VARARGS,
+			connConnectPoll__doc__},
 	{"fileno", (PyCFunction) connFileno, METH_NOARGS, connFileno__doc__},
 	{"get_cast_hook", (PyCFunction) connGetCastHook, METH_NOARGS,
 			connGetCastHook__doc__},
@@ -3448,6 +3600,8 @@
 	{"putline", (PyCFunction) connPutLine, METH_VARARGS, connPutLine__doc__},
 	{"getline", (PyCFunction) connGetLine, METH_NOARGS, connGetLine__doc__},
 	{"endcopy", (PyCFunction) connEndCopy, METH_NOARGS, connEndCopy__doc__},
+	{"setnonblocking", (PyCFunction) connSetNonBlocking, 1, connSetNonBlocking__doc__},
+	{"isnonblocking", (PyCFunction) connIsNonBlocking, 1, connIsNonBlocking__doc__},
 #endif /* DIRECT_ACCESS */
 
 #ifdef LARGE_OBJECTS
@@ -4408,7 +4562,7 @@
 
 /* connects to a database */
 static char pgConnect__doc__[] =
-"connect(dbname, host, port, opt) -- connect to a PostgreSQL database\n\n"
+"connect(dbname, host, port, opt, user, password, nowait) -- connect to a PostgreSQL database\n\n"
 "The connection uses the specified parameters (optional, keywords aware).\n";
 
 static PyObject *
@@ -4415,7 +4569,7 @@
 pgConnect(PyObject *self, PyObject *args, PyObject *dict)
 {
 	static const char *kwlist[] = {"dbname", "host", "port", "opt",
-	"user", "passwd", NULL};
+	"user", "passwd", "nowait", NULL};
 
 	char	   *pghost,
 			   *pgopt,
@@ -4422,8 +4576,12 @@
 			   *pgdbname,
 			   *pguser,
 			   *pgpasswd;
-	int			pgport;
+	int			pgport,
+				nowait = 0,
+				kwcount = 0;
 	char		port_buffer[20];
+	const char *keywords[sizeof(kwlist)/sizeof(*kwlist)+1],
+			   *values[sizeof(kwlist)/sizeof(*kwlist)+1];
 	connObject *npgobj;
 
 	pghost = pgopt = pgdbname = pguser = pgpasswd = NULL;
@@ -4435,8 +4593,8 @@
 	 * don't declare kwlist as const char *kwlist[] then it complains when
 	 * I try to assign all those constant strings to it.
 	 */
-	if (!PyArg_ParseTupleAndKeywords(args, dict, "|zzizzz", (char **) kwlist,
-		&pgdbname, &pghost, &pgport, &pgopt, &pguser, &pgpasswd))
+	if (!PyArg_ParseTupleAndKeywords(args, dict, "|zzizzzi", (char **) kwlist,
+		&pgdbname, &pghost, &pgport, &pgopt, &pguser, &pgpasswd, &nowait))
 		return NULL;
 
 #ifdef DEFAULT_VARS
@@ -4479,8 +4637,55 @@
 	}
 
 	Py_BEGIN_ALLOW_THREADS
-	npgobj->cnx = PQsetdbLogin(pghost, pgport == -1 ? NULL : port_buffer,
-		pgopt, NULL, pgdbname, pguser, pgpasswd);
+	if (!nowait)
+	{
+		npgobj->cnx = PQsetdbLogin(pghost, pgport == -1 ? NULL : port_buffer,
+			pgopt, NULL, pgdbname, pguser, pgpasswd);
+	}
+	else
+	{
+		if (pghost)
+		{
+			keywords[kwcount] = "host";
+			values[kwcount] = pghost;
+			kwcount++;
+		}
+		if (pgopt)
+		{
+			keywords[kwcount] = "options";
+			values[kwcount] = pgopt;
+			kwcount++;
+		}
+		if (pgdbname)
+		{
+			keywords[kwcount] = "dbname";
+			values[kwcount] = pgdbname;
+			kwcount++;
+		}
+		if (pguser)
+		{
+			keywords[kwcount] = "user";
+			values[kwcount] = pguser;
+			kwcount++;
+		}
+		if (pgpasswd)
+		{
+			keywords[kwcount] = "password";
+			values[kwcount] = pgpasswd;
+			kwcount++;
+		}
+		if (pgport != -1)
+		{
+			keywords[kwcount] = "port";
+			values[kwcount] = port_buffer;
+			kwcount++;
+		}
+		keywords[kwcount] = values[kwcount] = NULL;
+
+		/* The final argument means dbname can be a full connection string,
+		 * which is always the case for PQsetdbLogin() */
+		npgobj->cnx = PQconnectStartParams(keywords, values, 1);
+	}
 	Py_END_ALLOW_THREADS
 
 	if (PQstatus(npgobj->cnx) == CONNECTION_BAD)
@@ -4598,6 +4803,78 @@
 	return PyInt_FromLong(num);
 }
 
+
+/* Retrieve the result set after an async query. This is destructive
+ * so it should only be done when we want the result set (i.e., not
+ * just for an ntuples call). It may also return a non-result value
+ * through *pval. Returns 1 if it's doing that, or 0 otherwise. */
+static int
+queryAsyncResult(queryObject *self, PyObject **pval)
+{
+	/* this is following an async call, so need to get the result first */
+	if (self->async)
+	{
+		int status;
+
+		if (!self->pgcnx)
+		{
+			PyErr_SetString(PyExc_TypeError, "Connection is not valid.");
+			*pval = NULL;
+            return 1;
+		}
+
+		Py_BEGIN_ALLOW_THREADS
+		if (self->result)
+		{
+			PQclear(self->result);
+		}
+		self->result = PQgetResult(self->pgcnx->cnx);
+		Py_END_ALLOW_THREADS
+		/* end of result set, return None */
+		if (!self->result)
+		{
+			Py_DECREF(self->pgcnx);
+			self->pgcnx = NULL;
+			Py_INCREF(Py_None);
+			*pval = Py_None;
+            return 1;
+		}
+
+		if ((status = PQresultStatus(self->result)) != PGRES_TUPLES_OK)
+		{
+			*pval = _check_result_status(status, self->pgcnx->cnx, self->result);
+			/* _check_result_status calls PQclear() so we need to clear this
+			 * attribute */
+			self->result = NULL;
+			/* throwing an exception. Need to call PQgetResult() to clear the
+			 * connection state. This should return NULL the first time. */
+			if (!*pval)
+			{
+				self->result = PQgetResult(self->pgcnx->cnx);
+				while (self->result)
+				{
+					PQclear(self->result);
+					self->result = PQgetResult(self->pgcnx->cnx);
+				}
+			}
+			/* it's confusing to return None here because the caller has to
+			 * call again until we return None. We can't just consume that
+			 * final None because we don't know if there are additional
+			 * statements following this one, so we return an empty string where
+			 * query() would return None */
+			else if (*pval == Py_None)
+			{
+				Py_DECREF(*pval);
+				*pval = PyString_FromString("");
+			}
+			return 1;
+		}
+	}
+    return 0;
+}
+
+
+
 /* retrieves last result */
 static char queryGetResult__doc__[] =
 "getresult() -- Get the result of a query\n\n"
@@ -4607,10 +4884,16 @@
 static PyObject *
 queryGetResult(queryObject *self, PyObject *noargs)
 {
-	PyObject   *reslist;
+	PyObject   *reslist,
+			   *val;
 	int			i, m, n, *col_types;
 	int			encoding = self->encoding;
 
+	if (queryAsyncResult(self, &val))
+	{
+		return val;
+	}
+
 	/* stores result in tuple */
 	m = PQntuples(self->result);
 	n = PQnfields(self->result);
@@ -4632,8 +4915,6 @@
 
 		for (j = 0; j < n; ++j)
 		{
-			PyObject * val;
-
 			if (PQgetisnull(self->result, i, j))
 			{
 				Py_INCREF(Py_None);
@@ -4693,7 +4974,8 @@
 static PyObject *
 queryDictResult(queryObject *self, PyObject *noargs)
 {
-	PyObject   *reslist;
+	PyObject   *reslist,
+			   *val;
 	int			i,
 				m,
 				n,
@@ -4700,6 +4982,11 @@
 			   *col_types;
 	int			encoding = self->encoding;
 
+	if (queryAsyncResult(self, &val))
+	{
+		return val;
+	}
+
 	/* stores result in list */
 	m = PQntuples(self->result);
 	n = PQnfields(self->result);
@@ -6034,6 +6321,12 @@
 	PyDict_SetItemString(dict, "SEEK_END", PyInt_FromLong(SEEK_END));
 #endif /* LARGE_OBJECTS */
 
+	/* PQconnectPoll() results */
+	PyDict_SetItemString(dict,"PGRES_POLLING_OK",PyInt_FromLong(PGRES_POLLING_OK));
+	PyDict_SetItemString(dict,"PGRES_POLLING_FAILED",PyInt_FromLong(PGRES_POLLING_FAILED));
+	PyDict_SetItemString(dict,"PGRES_POLLING_READING",PyInt_FromLong(PGRES_POLLING_READING));
+	PyDict_SetItemString(dict,"PGRES_POLLING_WRITING",PyInt_FromLong(PGRES_POLLING_WRITING));
+
 #ifdef DEFAULT_VARS
 	/* prepares default values */
 	Py_INCREF(Py_None);
_______________________________________________
PyGreSQL mailing list
[email protected]
https://mail.vex.net/mailman/listinfo.cgi/pygresql

Reply via email to