Diff
Modified: trunk/docs/contents/changelog.rst (979 => 980)
--- trunk/docs/contents/changelog.rst 2019-04-21 19:28:33 UTC (rev 979)
+++ trunk/docs/contents/changelog.rst 2019-04-22 14:44:58 UTC (rev 980)
@@ -3,18 +3,32 @@
Version 5.1 (2019-mm-dd)
------------------------
-- Support for prepared statements has been added to the classic API.
-- DB wrapper objects based on existing connections can now be closed and
- reopened properly (but the underlying connection will not be affected).
-- The query objects in the classic API can now be used as iterators
- and will then yield the rows as tuples, similar to query.getresult().
- Thanks to Justin Pryzby for the proposal and most of the implementation.
-- Added methods query.dictiter() and query.namediter() to the classic API
- which work like query.dictresult() and query.namedresult() except that
- they return iterators instead of lists.
-- Deprecated query.ntuples() in the classic API, since len(query) can now
- be used and returns the same number.
-- Added pg.get/set_namediter and deprecated pg.get/set_namedresult.
+- Changes in the classic PyGreSQL module (pg):
+ - Support for prepared statements.
+ - DB wrapper objects based on existing connections can now be closed and
+ reopened properly (but the underlying connection will not be affected).
+ - The query object can now be used as iterator and will then yield the
+ rows as tuples, similar to query.getresult().
+ Thanks to Justin Pryzby for the proposal and most of the implementation.
+ - Deprecated query.ntuples() in the classic API, since len(query) can now
+ be used and returns the same number.
+ - The i-th row from the result can now be accessed as the `query[i]`.
+ - New method query.scalarresult() that gets only the first field of each
+ row as a list of scalar values.
+ - New methods query.one(), query.onenamed(), query.onedict() and
+ query.onescalar() that fetch only one row from the result or None if
+ there is no more row, similar to cursor.fetchone() method in DB-API 2.
+ - New methods query.single(), query.singlenamed(), query.singledict() and
+ query.singlescalar() that fetch only one row from the result, and raise
+ an error when the result does not have exactly one row.
+ - New methods query.dictiter(), query.namediter() and query.scalariter()
+ returning the same values as query.dictresult(), query.namedresult()
+ and query.salarresult(), but as iterables instead of lists. This avoids
+ creating a Python list of all results and can be slightly more efficient.
+ - Removed pg.get/set_namedresult. You can configure the named tuples
+ factory with the pg.set_row_factory_size() function and change the
+ implementation with pg.set_query_helps(), but this is not recommended
+ and this function is not part of the official API.
Vesion 5.0.7 (2019-mm-dd)
-------------------------
Modified: trunk/docs/contents/pg/module.rst (979 => 980)
--- trunk/docs/contents/pg/module.rst 2019-04-21 19:28:33 UTC (rev 979)
+++ trunk/docs/contents/pg/module.rst 2019-04-22 14:44:58 UTC (rev 980)
@@ -308,49 +308,6 @@
Note that there is also a :class:`DB` method with the same name
which does exactly the same.
-get/set_namediter -- conversion to named tuples
------------------------------------------------
-
-.. function:: get_namediter()
-
- Get the generator that converts to named tuples
-
-This returns the function used by PyGreSQL to construct the result of the
-:meth:`Query.namediter` and :meth:`Query.namedresult` methods.
-
-.. versionadded:: 5.1
-
-.. function:: set_namediter(func)
-
- Set a generator that will convert to named tuples
-
- :param func: the generator to be used to convert results to named tuples
-
-You can use this if you want to create different kinds of named tuples
-returned by the :meth:`Query.namediter` and :meth:`Query.namedresult` methods.
-If you set this function to *None*, then normal tuples will be used.
-
-.. versionadded:: 5.1
-
-get/set_namedresult -- conversion to named tuples
--------------------------------------------------
-
-.. function:: get_namedresult()
-
- Get the generator that converts to named tuples
-
-.. deprecated:: 5.1
- Use :func:`get_namediter` instead.
-
-.. function:: set_namedresult(func)
-
- Set a generator that will convert to named tuples
-
- :param func: the generator to be used to convert results to named tuples
-
-.. deprecated:: 5.1
- Use :func:`set_namediter` instead.
-
get/set_decimal -- decimal type to be used for numeric values
-------------------------------------------------------------
Modified: trunk/docs/contents/pg/query.rst (979 => 980)
--- trunk/docs/contents/pg/query.rst 2019-04-21 19:28:33 UTC (rev 979)
+++ trunk/docs/contents/pg/query.rst 2019-04-22 14:44:58 UTC (rev 980)
@@ -6,11 +6,11 @@
.. class:: Query
The :class:`Query` object returned by :meth:`Connection.query` and
-:meth:`DB.query` can be used as an iterator returning rows as tuples.
+:meth:`DB.query` can be used as an iterable returning rows as tuples.
You can also directly access row tuples using their index, and get
-the number of rows with the :func:`len` function. The :class:`Query`
-class also provides the following methods for accessing the results
-of the query:
+the number of rows with the :func:`len` function.
+The :class:`Query` class also provides the following methods for accessing
+the results of the query:
getresult -- get query values as list of tuples
-----------------------------------------------
@@ -35,12 +35,16 @@
Since PyGreSQL 5.1 the :class:`Query` can be also used directly as
an iterable sequence, i.e. you can iterate over the :class:`Query`
object to get the same tuples as returned by :meth:`Query.getresult`.
+This is slightly more efficient than getting the full list of results,
+but note that the full result is always fetched from the server anyway
+when the query is executed.
+
You can also call :func:`len` on a query to find the number of rows
in the result, and access row tuples using their index directly on
the :class:`Query` object.
-dictresult -- get query values as list of dictionaries
-------------------------------------------------------
+dictresult/dictiter -- get query values as dictionaries
+-------------------------------------------------------
.. method:: Query.dictresult()
@@ -60,9 +64,28 @@
Note that since PyGreSQL 5.0 this method will return the values of array
type columns as Python lists.
-namedresult -- get query values as list of named tuples
--------------------------------------------------------
+.. method:: Query.dictiter()
+ Get query values as iterable of dictionaries
+
+ :returns: result values as an iterable of dictionaries
+ :rtype: iterable
+ :raises TypeError: too many (any) parameters
+ :raises MemoryError: internal memory error
+
+This method returns query results as an iterable of dictionaries which have
+the field names as keys. This is slightly more efficient than getting the full
+list of results as dictionaries, but note that the full result is always
+fetched from the server anyway when the query is executed.
+
+If the query has duplicate field names, you will get the value for the
+field with the highest index in the query.
+
+.. versionadded:: 5.1
+
+namedresult/namediter -- get query values a named tuples
+--------------------------------------------------------
+
.. method:: Query.namedresult()
Get query values as list of named tuples
@@ -85,49 +108,209 @@
.. versionadded:: 4.1
-dictiter -- get query values as iterator of dictionaries
---------------------------------------------------------
+.. method:: Query.namediter()
-.. method:: Query.dictiter()
+ Get query values as iterable of named tuples
- Get query values as iterator of dictionaries
+ :returns: result values as an iterable of named tuples
+ :rtype: iterable
+ :raises TypeError: too many (any) parameters
+ :raises TypeError: named tuples not supported
+ :raises MemoryError: internal memory error
- :returns: result values as an iterator of dictionaries
- :rtype: iterator
+This method returns query results as an iterable of named tuples with
+proper field names. This is slightly more efficient than getting the full
+list of results as named tuples, but note that the full result is always
+fetched from the server anyway when the query is executed.
+
+Column names in the database that are not valid as field names for
+named tuples (particularly, names starting with an underscore) are
+automatically renamed to valid positional names.
+
+.. versionadded:: 5.1
+
+scalarresult/scalariter -- get query values as scalars
+------------------------------------------------------
+
+.. method:: Query.scalarresult()
+
+ Get first fields from query result as list of scalar values
+
+ :returns: first fields from result as a list of scalar values
+ :rtype: list
:raises TypeError: too many (any) parameters
:raises MemoryError: internal memory error
-This method returns query results as an iterator of dictionaries which have
-the field names as keys.
+This method returns the first fields from the query results as a list of
+scalar values in the order returned by the server.
-If the query has duplicate field names, you will get the value for the
-field with the highest index in the query.
+.. versionadded:: 5.1
+.. method:: Query.scalariter()
+
+ Get first fields from query result as iterable of scalar values
+
+ :returns: first fields from result as an iterable of scalar values
+ :rtype: list
+ :raises TypeError: too many (any) parameters
+ :raises MemoryError: internal memory error
+
+This method returns the first fields from the query results as an iterable
+of scalar values in the order returned by the server. This is slightly more
+efficient than getting the full list of results as rows or scalar values,
+but note that the full result is always fetched from the server anyway when
+the query is executed.
+
.. versionadded:: 5.1
-namediter -- get query values as iterator of named tuples
----------------------------------------------------------
+one/onedict/onenamed/onescalar -- get one result of a query
+-----------------------------------------------------------
-.. method:: Query.namediter()
+.. method:: Query.one()
- Get query values as iterator of named tuples
+ Get one row from the result of a query as a tuple
- :returns: result values as an iterator of named tuples
- :rtype: iterator
+ :returns: next row from the query results as a tuple of fields
+ :rtype: tuple or None
:raises TypeError: too many (any) parameters
- :raises TypeError: named tuples not supported
:raises MemoryError: internal memory error
-This method returns query results as an iterator of named tuples with
-proper field names.
+Returns only one row from the result as a tuple of fields.
+This method can be called multiple times to return more rows.
+It returns None if the result does not contain one more row.
+
+.. versionadded:: 5.1
+
+.. method:: Query.onedict()
+
+ Get one row from the result of a query as a dictionary
+
+ :returns: next row from the query results as a dictionary
+ :rtype: dict or None
+ :raises TypeError: too many (any) parameters
+ :raises MemoryError: internal memory error
+
+Returns only one row from the result as a dictionary with the field names
+used as the keys.
+
+This method can be called multiple times to return more rows.
+It returns None if the result does not contain one more row.
+
+.. versionadded:: 5.1
+
+.. method:: Query.onenamed()
+
+ Get one row from the result of a query as named tuple
+
+ :returns: next row from the query results as a named tuple
+ :rtype: named tuple or None
+ :raises TypeError: too many (any) parameters
+ :raises MemoryError: internal memory error
+
+Returns only one row from the result as a named tuple with proper field names.
+
Column names in the database that are not valid as field names for
named tuples (particularly, names starting with an underscore) are
automatically renamed to valid positional names.
+This method can be called multiple times to return more rows.
+It returns None if the result does not contain one more row.
+
.. versionadded:: 5.1
+.. method:: Query.onescalar()
+ Get one row from the result of a query as scalar value
+
+ :returns: next row from the query results as a scalar value
+ :rtype: type of first field or None
+ :raises TypeError: too many (any) parameters
+ :raises MemoryError: internal memory error
+
+Returns the first field of the next row from the result as a scalar value.
+
+This method can be called multiple times to return more rows as scalars.
+It returns None if the result does not contain one more row.
+
+.. versionadded:: 5.1
+
+single/singledict/singlenamed/singlescalar -- get single result of a query
+--------------------------------------------------------------------------
+
+.. method:: Query.single()
+
+ Get single row from the result of a query as a tuple
+
+ :returns: single row from the query results as a tuple of fields
+ :rtype: tuple
+ :raises ProgrammingError: result does not have exactly one row
+ :raises TypeError: too many (any) parameters
+ :raises MemoryError: internal memory error
+
+Returns a single row from the result as a tuple of fields.
+
+This method returns the same single row when called multiple times.
+It raises a ProgrammingError if the result does not have exactly one row.
+
+.. versionadded:: 5.1
+
+.. method:: Query.singledict()
+
+ Get single row from the result of a query as a dictionary
+
+ :returns: single row from the query results as a dictionary
+ :rtype: dict
+ :raises ProgrammingError: result does not have exactly one row
+ :raises TypeError: too many (any) parameters
+ :raises MemoryError: internal memory error
+
+Returns a single row from the result as a dictionary with the field names
+used as the keys.
+
+This method returns the same single row when called multiple times.
+It raises a ProgrammingError if the result does not have exactly one row.
+
+.. versionadded:: 5.1
+
+.. method:: Query.singlenamed()
+
+ Get single row from the result of a query as named tuple
+
+ :returns: single row from the query results as a named tuple
+ :rtype: named tuple
+ :raises ProgrammingError: result does not have exactly one row
+ :raises TypeError: too many (any) parameters
+ :raises MemoryError: internal memory error
+
+Returns single row from the result as a named tuple with proper field names.
+
+Column names in the database that are not valid as field names for
+named tuples (particularly, names starting with an underscore) are
+automatically renamed to valid positional names.
+
+This method returns the same single row when called multiple times.
+It raises a ProgrammingError if the result does not have exactly one row.
+
+.. versionadded:: 5.1
+
+.. method:: Query.singlescalar()
+
+ Get single row from the result of a query as scalar value
+
+ :returns: single row from the query results as a scalar value
+ :rtype: type of first field
+ :raises ProgrammingError: result does not have exactly one row
+ :raises TypeError: too many (any) parameters
+ :raises MemoryError: internal memory error
+
+Returns the first field of a single row from the result as a scalar value.
+
+This method returns the same single row as scalar when called multiple times.
+It raises a ProgrammingError if the result does not have exactly one row.
+
+.. versionadded:: 5.1
+
listfields -- list fields names of previous query result
--------------------------------------------------------
Modified: trunk/docs/contents/tutorial.rst (979 => 980)
--- trunk/docs/contents/tutorial.rst 2019-04-21 19:28:33 UTC (rev 979)
+++ trunk/docs/contents/tutorial.rst 2019-04-22 14:44:58 UTC (rev 980)
@@ -107,9 +107,11 @@
'durian'
In PyGreSQL 5.1 and newer, you can also use the :class:`Query` instance
-directly as an iterator that yields the rows as tuples, and you can use
-the methods :meth:`Query.dictiter` or :meth:`Query.namediter` to get
-iterators yielding the rows as dictionaries or named tuples.
+directly as an iterable that yields the rows as tuples, and there are also
+methods that return iterables for rows as dictionaries, named tuples or
+scalar values. Other methods like :meth:`Query.one` or :meth:`Query.onescalar`
+return only one row or only the first field of that row. You can get the
+number of rows with the :func:`len` function.
Using the method :meth:`DB.get_as_dict`, you can easily import the whole table
into a Python dictionary mapping the primary key *id* to the *name*::
Modified: trunk/pg.py (979 => 980)
--- trunk/pg.py 2019-04-21 19:28:33 UTC (rev 979)
+++ trunk/pg.py 2019-04-22 14:44:58 UTC (rev 980)
@@ -1311,6 +1311,8 @@
_row_factory = lru_cache(maxsize)(_row_factory.__wrapped__)
+# Helper functions used by the query object
+
def _dictiter(q):
"""Get query result as an iterator of dictionaries."""
fields = q.listfields()
@@ -1325,6 +1327,17 @@
yield row(r)
+def _namednext(q):
+ """Get next row from query result as a named tuple."""
+ return _row_factory(q.listfields())(next(q))
+
+
+def _scalariter(q):
+ """Get query result as an iterator of scalar values."""
+ for r in q:
+ yield r[0]
+
+
class _MemoryQuery:
"""Class that embodies a given query result."""
@@ -1364,10 +1377,9 @@
# Initialize the C module
-set_dictiter(_dictiter)
-set_namediter(_namediter)
set_decimal(Decimal)
set_jsondecode(jsondecode)
+set_query_helpers(_dictiter, _namediter, _namednext, _scalariter)
# The notification handler
@@ -2678,13 +2690,11 @@
rowtuple = True
rows = map(getrow, res)
if keytuple or rowtuple:
- namedresult = get_namedresult()
- if namedresult:
- if keytuple:
- keys = namedresult(_MemoryQuery(keys, keyname))
- if rowtuple:
- fields = [f for f in fields if f not in keyset]
- rows = namedresult(_MemoryQuery(rows, fields))
+ if keytuple:
+ keys = _namediter(_MemoryQuery(keys, keyname))
+ if rowtuple:
+ fields = [f for f in fields if f not in keyset]
+ rows = _namediter(_MemoryQuery(rows, fields))
return cls(zip(keys, rows))
def notification_handler(self,
Modified: trunk/pgmodule.c (979 => 980)
--- trunk/pgmodule.c 2019-04-21 19:28:33 UTC (rev 979)
+++ trunk/pgmodule.c 2019-04-22 14:44:58 UTC (rev 980)
@@ -91,6 +91,8 @@
static PyObject *decimal = NULL, /* decimal type */
*dictiter = NULL, /* function for getting named results */
*namediter = NULL, /* function for getting named results */
+ *namednext = NULL, /* function for getting one named result */
+ *scalariter = NULL, /* function for getting scalar results */
*jsondecode = NULL; /* function for decoding json strings */
static const char *date_format = NULL; /* date format that is always assumed */
static char decimal_point = '.'; /* decimal point used in money values */
@@ -4830,9 +4832,54 @@
row_tuple = queryGetRowAsTuple(self);
if (row_tuple) ++self->current_row;
- return row_tuple;
+ return row_tuple;
}
+/* Retrieves one row from the result as a tuple. */
+static char queryOne__doc__[] =
+"one() -- Get one row from the result of a query\n\n"
+"Only one row from the result is returned as a tuple of fields.\n"
+"This method can be called multiple times to return more rows.\n"
+"It returns None if the result does not contain one more row.\n";
+
+static PyObject *
+queryOne(queryObject *self, PyObject *noargs)
+{
+ PyObject *row_tuple;
+
+ if (self->current_row >= self->max_row) {
+ Py_INCREF(Py_None); return Py_None;
+ }
+
+ row_tuple = queryGetRowAsTuple(self);
+ if (row_tuple) ++self->current_row;
+ return row_tuple;
+}
+
+/* Retrieves the single row from the result as a tuple. */
+static char querySingle__doc__[] =
+"single() -- Get the result of a query as single row\n\n"
+"The single row from the query result is returned as a tuple of fields.\n"
+"This method returns the same single row when called multiple times.\n"
+"It raises a ProgrammingError if the result does not have exactly one row.\n";
+
+static PyObject *
+querySingle(queryObject *self, PyObject *noargs)
+{
+ PyObject *row_tuple;
+
+ if (self->max_row != 1) {
+ set_error_msg(ProgrammingError,
+ self->max_row ? "Multiple results found" : "No result found");
+ return NULL;
+ }
+
+ self->current_row = 0;
+ row_tuple = queryGetRowAsTuple(self);
+ if (row_tuple) ++self->current_row;
+ return row_tuple;
+}
+
/* Retrieves the last query result as a list of tuples. */
static char queryGetresult__doc__[] =
"getresult() -- Get the result of a query\n\n"
@@ -4899,6 +4946,53 @@
return row_dict;
}
+/* Retrieve one row from the result as a dictionary. */
+static char queryOnedict__doc__[] =
+"onedict() -- Get one row from the result of a query\n\n"
+"Only one row from the result is returned as a dictionary with\n"
+"the field names used as the keys.\n"
+"This method can be called multiple times to return more rows.\n"
+"It returns None if the result does not contain one more row.\n";
+
+static PyObject *
+queryOnedict(queryObject *self, PyObject *noargs)
+{
+ PyObject *row_dict;
+
+ if (self->current_row >= self->max_row) {
+ Py_INCREF(Py_None); return Py_None;
+ }
+
+ row_dict = queryGetRowAsDict(self);
+ if (row_dict) ++self->current_row;
+ return row_dict;
+}
+
+/* Retrieve the single row from the result as a dictionary. */
+static char querySingledict__doc__[] =
+"singledict() -- Get the result of a query as single row\n\n"
+"The single row from the query result is returned as a dictionary with\n"
+"the field names used as the keys.\n"
+"This method returns the same single row when called multiple times.\n"
+"It raises a ProgrammingError if the result does not have exactly one row.\n";
+
+static PyObject *
+querySingledict(queryObject *self, PyObject *noargs)
+{
+ PyObject *row_dict;
+
+ if (self->max_row != 1) {
+ set_error_msg(ProgrammingError,
+ self->max_row ? "Multiple results found" : "No result found");
+ return NULL;
+ }
+
+ self->current_row = 0;
+ row_dict = queryGetRowAsDict(self);
+ if (row_dict) ++self->current_row;
+ return row_dict;
+}
+
/* Retrieve the last query result as a list of dictionaries. */
static char queryDictresult__doc__[] =
"dictresult() -- Get the result of a query\n\n"
@@ -4926,7 +5020,7 @@
return result_list;
}
-/* retrieves last result as iterator of dictionaries */
+/* Retrieve last result as iterator of dictionaries. */
static char queryDictiter__doc__[] =
"dictiter() -- Get the result of a query\n\n"
"The result is returned as an iterator of rows, each one a a dictionary\n"
@@ -4935,14 +5029,56 @@
static PyObject *
queryDictiter(queryObject *self, PyObject *noargs)
{
- if (dictiter) {
- return PyObject_CallFunction(dictiter, "(O)", self);
+ if (!dictiter)
+ return queryDictresult(self, noargs);
+
+ return PyObject_CallFunction(dictiter, "(O)", self);
+}
+
+/* Retrieve one row from the result as a named tuple. */
+static char queryOnenamed__doc__[] =
+"onenamed() -- Get one row from the result of a query\n\n"
+"Only one row from the result is returned as a named tuple of fields.\n"
+"This method can be called multiple times to return more rows.\n"
+"It returns None if the result does not contain one more row.\n";
+
+static PyObject *
+queryOnenamed(queryObject *self, PyObject *noargs)
+{
+ if (!namednext)
+ return queryOne(self, noargs);
+
+ if (self->current_row >= self->max_row) {
+ Py_INCREF(Py_None); return Py_None;
}
- return queryGetIter(self);
+
+ return PyObject_CallFunction(namednext, "(O)", self);
}
+/* Retrieve the single row from the result as a tuple. */
+static char querySinglenamed__doc__[] =
+"singlenamed() -- Get the result of a query as single row\n\n"
+"The single row from the query result is returned as a named tuple of fields.\n"
+"This method returns the same single row when called multiple times.\n"
+"It raises a ProgrammingError if the result does not have exactly one row.\n";
-/* retrieves last result as list of named tuples */
+static PyObject *
+querySinglenamed(queryObject *self, PyObject *noargs)
+{
+ if (!namednext)
+ return querySingle(self, noargs);
+
+ if (self->max_row != 1) {
+ set_error_msg(ProgrammingError,
+ self->max_row ? "Multiple results found" : "No result found");
+ return NULL;
+ }
+
+ self->current_row = 0;
+ return PyObject_CallFunction(namednext, "(O)", self);
+}
+
+/* Retrieve last result as list of named tuples. */
static char queryNamedresult__doc__[] =
"namedresult() -- Get the result of a query\n\n"
"The result is returned as a list of rows, each one a named tuple of fields\n"
@@ -4951,18 +5087,20 @@
static PyObject *
queryNamedresult(queryObject *self, PyObject *noargs)
{
- if (namediter) {
- PyObject* res = PyObject_CallFunction(namediter, "(O)", self);
- if (res && PyList_Check(res))
- return res;
- PyObject *res_list = PySequence_List(res);
- Py_DECREF(res);
- return res_list;
- }
- return queryGetresult(self, noargs);
+ PyObject *res, *res_list;
+
+ if (!namediter)
+ return queryGetresult(self, noargs);
+
+ res = PyObject_CallFunction(namediter, "(O)", self);
+ if (!res) return NULL;
+ if (PyList_Check(res)) return res;
+ res_list = PySequence_List(res);
+ Py_DECREF(res);
+ return res_list;
}
-/* retrieves last result as iterator of named tuples */
+/* Retrieve last result as iterator of named tuples. */
static char queryNamediter__doc__[] =
"namediter() -- Get the result of a query\n\n"
"The result is returned as an iterator of rows, each one a named tuple\n"
@@ -4971,17 +5109,126 @@
static PyObject *
queryNamediter(queryObject *self, PyObject *noargs)
{
- if (namediter) {
- PyObject* res = PyObject_CallFunction(namediter, "(O)", self);
- if (res && !PyList_Check(res))
- return res;
- PyObject* res_iter = (Py_TYPE(res)->tp_iter)((PyObject *)self);
- Py_DECREF(res);
- return res_iter;
+ PyObject *res, *res_iter;
+
+ if (!namediter)
+ return queryGetIter(self);
+
+ res = PyObject_CallFunction(namediter, "(O)", self);
+ if (!res) return NULL;
+ if (!PyList_Check(res)) return res;
+ res_iter = (Py_TYPE(res)->tp_iter)((PyObject *)self);
+ Py_DECREF(res);
+ return res_iter;
+}
+
+/* Retrieve the last query result as a list of scalar values. */
+static char queryScalarresult__doc__[] =
+"scalarresult() -- Get query result as scalars\n\n"
+"The result is returned as a list of scalar values where the values\n"
+"are the first fields of the rows in the order returned by the server.\n";
+
+static PyObject *
+queryScalarresult(queryObject *self, PyObject *noargs)
+{
+ PyObject *result_list;
+
+ if (!self->num_fields) {
+ set_error_msg(ProgrammingError, "No fields in result");
+ return NULL;
}
- return queryGetIter(self);
+
+ if (!(result_list = PyList_New(self->max_row))) return NULL;
+
+ for (self->current_row = 0; self->current_row < self->max_row;
+ ++self->current_row)
+ {
+ PyObject *value = getValueInColumn(self, 0);
+ if (!value)
+ {
+ Py_DECREF(result_list); return NULL;
+ }
+ PyList_SET_ITEM(result_list, self->current_row, value);
+ }
+
+ return result_list;
}
+/* Retrieve the last query result as iterator of scalar values. */
+static char queryScalariter__doc__[] =
+"scalariter() -- Get query result as scalars\n\n"
+"The result is returned as an iterator of scalar values where the values\n"
+"are the first fields of the rows in the order returned by the server.\n";
+
+static PyObject *
+queryScalariter(queryObject *self, PyObject *noargs)
+{
+ if (!scalariter)
+ return queryScalarresult(self, noargs);
+
+ if (!self->num_fields) {
+ set_error_msg(ProgrammingError, "No fields in result");
+ return NULL;
+ }
+
+ return PyObject_CallFunction(scalariter, "(O)", self);
+}
+
+/* Retrieve one result as scalar value. */
+static char queryOnescalar__doc__[] =
+"onescalar() -- Get one scalar value from the result of a query\n\n"
+"Returns the first field of the next row from the result as a scalar value.\n"
+"This method can be called multiple times to return more rows as scalars.\n"
+"It returns None if the result does not contain one more row.\n";
+
+static PyObject *
+queryOnescalar(queryObject *self, PyObject *noargs)
+{
+ PyObject *value;
+
+ if (!self->num_fields) {
+ set_error_msg(ProgrammingError, "No fields in result");
+ return NULL;
+ }
+
+ if (self->current_row >= self->max_row) {
+ Py_INCREF(Py_None); return Py_None;
+ }
+
+ value = getValueInColumn(self, 0);
+ if (value) ++self->current_row;
+ return value;
+}
+
+/* Retrieves the single row from the result as a tuple. */
+static char querySinglescalar__doc__[] =
+"singlescalar() -- Get scalar value from single result of a query\n\n"
+"Returns the first field of the next row from the result as a scalar value.\n"
+"This method returns the same single row when called multiple times.\n"
+"It raises a ProgrammingError if the result does not have exactly one row.\n";
+
+static PyObject *
+querySinglescalar(queryObject *self, PyObject *noargs)
+{
+ PyObject *value;
+
+ if (!self->num_fields) {
+ set_error_msg(ProgrammingError, "No fields in result");
+ return NULL;
+ }
+
+ if (self->max_row != 1) {
+ set_error_msg(ProgrammingError,
+ self->max_row ? "Multiple results found" : "No result found");
+ return NULL;
+ }
+
+ self->current_row = 0;
+ value = getValueInColumn(self, 0);
+ if (value) ++self->current_row;
+ return value;
+}
+
/* Return length of a query object. */
static Py_ssize_t
queryLen(PyObject *self)
@@ -5144,6 +5391,24 @@
queryNamedresult__doc__},
{"namediter", (PyCFunction) queryNamediter, METH_NOARGS,
queryNamediter__doc__},
+ {"one", (PyCFunction) queryOne, METH_NOARGS, queryOne__doc__},
+ {"single", (PyCFunction) querySingle, METH_NOARGS, querySingle__doc__},
+ {"onedict", (PyCFunction) queryOnedict, METH_NOARGS,
+ queryOnedict__doc__},
+ {"singledict", (PyCFunction) querySingledict, METH_NOARGS,
+ querySingledict__doc__},
+ {"onenamed", (PyCFunction) queryOnenamed, METH_NOARGS,
+ queryOnenamed__doc__},
+ {"singlenamed", (PyCFunction) querySinglenamed, METH_NOARGS,
+ querySinglenamed__doc__},
+ {"scalarresult", (PyCFunction) queryScalarresult, METH_NOARGS,
+ queryScalarresult__doc__},
+ {"scalariter", (PyCFunction) queryScalariter, METH_NOARGS,
+ queryScalariter__doc__},
+ {"onescalar", (PyCFunction) queryOnescalar, METH_NOARGS,
+ queryOnescalar__doc__},
+ {"singlescalar", (PyCFunction) querySinglescalar, METH_NOARGS,
+ querySinglescalar__doc__},
{"fieldname", (PyCFunction) queryFieldname, METH_VARARGS,
queryFieldname__doc__},
{"fieldnum", (PyCFunction) queryFieldnum, METH_VARARGS,
@@ -5598,90 +5863,25 @@
return ret;
}
-/* get dict result factory */
-static char pgGetDictiter__doc__[] =
-"get_dictiter() -- get the generator used for getting dict results";
+/* set query helper functions */
-static PyObject *
-pgGetDictiter(PyObject *self, PyObject *noargs)
-{
- PyObject *ret;
+static char pgSetQueryHelpers__doc__[] =
+"set_query_helpers(*helpers) -- set internal query helper functions";
- ret = dictiter ? dictiter : Py_None;
- Py_INCREF(ret);
-
- return ret;
-}
-
-/* set dict result factory */
-static char pgSetDictiter__doc__[] =
-"set_dictiter(func) -- set a generator to be used for getting dict results";
-
static PyObject *
-pgSetDictiter(PyObject *self, PyObject *func)
+pgSetQueryHelpers(PyObject *self, PyObject *args)
{
- PyObject *ret = NULL;
+ /* gets arguments */
+ if (!PyArg_ParseTuple(args, "O!O!O!O!",
+ &PyFunction_Type, &dictiter,
+ &PyFunction_Type, &namediter,
+ &PyFunction_Type, &namednext,
+ &PyFunction_Type, &scalariter)) return NULL;
- if (func == Py_None)
- {
- Py_XDECREF(dictiter); dictiter = NULL;
- Py_INCREF(Py_None); ret = Py_None;
- }
- else if (PyCallable_Check(func))
- {
- Py_XINCREF(func); Py_XDECREF(dictiter); dictiter = func;
- Py_INCREF(Py_None); ret = Py_None;
- }
- else
- PyErr_SetString(PyExc_TypeError,
- "Function set_dictiter() expects"
- " a callable or None as argument");
-
- return ret;
+ Py_INCREF(Py_None);
+ return Py_None;
}
-/* get named result factory */
-static char pgGetNamediter__doc__[] =
-"get_namediter() -- get the generator used for getting named results";
-
-static PyObject *
-pgGetNamediter(PyObject *self, PyObject *noargs)
-{
- PyObject *ret;
-
- ret = namediter ? namediter : Py_None;
- Py_INCREF(ret);
-
- return ret;
-}
-
-/* set named result factory */
-static char pgSetNamediter__doc__[] =
-"set_namediter(func) -- set a generator to be used for getting named results";
-
-static PyObject *
-pgSetNamediter(PyObject *self, PyObject *func)
-{
- PyObject *ret = NULL;
-
- if (func == Py_None)
- {
- Py_XDECREF(namediter); namediter = NULL;
- Py_INCREF(Py_None); ret = Py_None;
- }
- else if (PyCallable_Check(func))
- {
- Py_XINCREF(func); Py_XDECREF(namediter); namediter = func;
- Py_INCREF(Py_None); ret = Py_None;
- }
- else
- PyErr_SetString(PyExc_TypeError,
- "Function set_namediter() expects"
- " a callable or None as argument");
-
- return ret;
-}
-
/* get json decode function */
static char pgGetJsondecode__doc__[] =
"get_jsondecode() -- get the function used for decoding json results";
@@ -6166,23 +6366,12 @@
{"set_bool", (PyCFunction) pgSetBool, METH_VARARGS, pgSetBool__doc__},
{"get_array", (PyCFunction) pgGetArray, METH_NOARGS, pgGetArray__doc__},
{"set_array", (PyCFunction) pgSetArray, METH_VARARGS, pgSetArray__doc__},
+ {"set_query_helpers", (PyCFunction) pgSetQueryHelpers, METH_VARARGS,
+ pgSetQueryHelpers__doc__},
{"get_bytea_escaped", (PyCFunction) pgGetByteaEscaped, METH_NOARGS,
pgGetByteaEscaped__doc__},
{"set_bytea_escaped", (PyCFunction) pgSetByteaEscaped, METH_VARARGS,
pgSetByteaEscaped__doc__},
- {"get_dictiter", (PyCFunction) pgGetDictiter, METH_NOARGS,
- pgGetDictiter__doc__},
- {"set_dictiter", (PyCFunction) pgSetDictiter, METH_O,
- pgSetDictiter__doc__},
- {"get_namediter", (PyCFunction) pgGetNamediter, METH_NOARGS,
- pgGetNamediter__doc__},
- {"set_namediter", (PyCFunction) pgSetNamediter, METH_O,
- pgSetNamediter__doc__},
- /* get/set_namedresult is deprecated, use get/set_namediter */
- {"get_namedresult", (PyCFunction) pgGetNamediter, METH_NOARGS,
- pgGetNamediter__doc__},
- {"set_namedresult", (PyCFunction) pgSetNamediter, METH_O,
- pgSetNamediter__doc__},
{"get_jsondecode", (PyCFunction) pgGetJsondecode, METH_NOARGS,
pgGetJsondecode__doc__},
{"set_jsondecode", (PyCFunction) pgSetJsondecode, METH_O,
Modified: trunk/tests/test_classic_connection.py (979 => 980)
--- trunk/tests/test_classic_connection.py 2019-04-21 19:28:33 UTC (rev 979)
+++ trunk/tests/test_classic_connection.py 2019-04-22 14:44:58 UTC (rev 980)
@@ -1166,6 +1166,33 @@
self.assertIn((8,), r)
self.assertNotIn((5,), r)
+ def testDictIterate(self):
+ r = self.c.query("select generate_series(3,5) as n").dictiter()
+ self.assertNotIsInstance(r, (list, tuple))
+ self.assertIsInstance(r, Iterable)
+ r = list(r)
+ self.assertEqual(r, [dict(n=3), dict(n=4), dict(n=5)])
+ self.assertIsInstance(r[1], dict)
+
+ def testDictIterateTwoColumns(self):
+ r = self.c.query("select 1 as one, 2 as two"
+ " union select 3 as one, 4 as two").dictiter()
+ self.assertIsInstance(r, Iterable)
+ r = list(r)
+ self.assertEqual(r, [dict(_one_=1, two=2), dict(_one_=3, two=4)])
+
+ def testDictNext(self):
+ r = self.c.query("select generate_series(7,9) as n").dictiter()
+ self.assertEqual(next(r), dict(n=7))
+ self.assertEqual(next(r), dict(n=8))
+ self.assertEqual(next(r), dict(n=9))
+ self.assertRaises(StopIteration, next, r)
+
+ def testDictContains(self):
+ r = self.c.query("select generate_series(7,9) as n").dictiter()
+ self.assertIn(dict(n=8), r)
+ self.assertNotIn(dict(n=5), r)
+
def testNamedIterate(self):
r = self.c.query("select generate_series(3,5) as number").namediter()
self.assertNotIsInstance(r, (list, tuple))
@@ -1201,34 +1228,257 @@
self.assertIn((8,), r)
self.assertNotIn((5,), r)
- def testDictIterate(self):
- r = self.c.query("select generate_series(3,5) as n").dictiter()
+ def testScalarIterate(self):
+ r = self.c.query("select generate_series(3,5)").scalariter()
self.assertNotIsInstance(r, (list, tuple))
self.assertIsInstance(r, Iterable)
r = list(r)
- self.assertEqual(r, [dict(n=3), dict(n=4), dict(n=5)])
- self.assertIsInstance(r[1], dict)
+ self.assertEqual(r, [3, 4, 5])
+ self.assertIsInstance(r[1], int)
- def testDictIterateTwoColumns(self):
- r = self.c.query("select 1 as one, 2 as two"
- " union select 3 as one, 4 as two").dictiter()
+ def testScalarIterateTwoColumns(self):
+ r = self.c.query("select 1, 2 union select 3, 4").scalariter()
self.assertIsInstance(r, Iterable)
r = list(r)
- self.assertEqual(r, [dict(_one_=1, two=2), dict(_one_=3, two=4)])
+ self.assertEqual(r, [1, 3])
- def testDictNext(self):
- r = self.c.query("select generate_series(7,9) as n").dictiter()
- self.assertEqual(next(r), dict(n=7))
- self.assertEqual(next(r), dict(n=8))
- self.assertEqual(next(r), dict(n=9))
+ def testScalarNext(self):
+ r = self.c.query("select generate_series(7,9)").scalariter()
+ self.assertEqual(next(r), 7)
+ self.assertEqual(next(r), 8)
+ self.assertEqual(next(r), 9)
self.assertRaises(StopIteration, next, r)
- def testNamedContains(self):
- r = self.c.query("select generate_series(7,9) as n").dictiter()
- self.assertIn(dict(n=8), r)
- self.assertNotIn(dict(n=5), r)
+ def testScalarContains(self):
+ r = self.c.query("select generate_series(7,9)").scalariter()
+ self.assertIn(8, r)
+ self.assertNotIn(5, r)
+class TestQueryOneSingleScalar(unittest.TestCase):
+ """Test the query methods for getting single rows and columns."""
+
+ def setUp(self):
+ self.c = connect()
+
+ def tearDown(self):
+ self.c.close()
+
+ def testOneWithEmptyQuery(self):
+ q = self.c.query("select 0 where false")
+ self.assertIsNone(q.one())
+
+ def testOneWithSingleRow(self):
+ q = self.c.query("select 1, 2")
+ r = q.one()
+ self.assertIsInstance(r, tuple)
+ self.assertEqual(r, (1, 2))
+ self.assertEqual(q.one(), None)
+
+ def testOneWithTwoRows(self):
+ q = self.c.query("select 1, 2 union select 3, 4")
+ self.assertEqual(q.one(), (1, 2))
+ self.assertEqual(q.one(), (3, 4))
+ self.assertEqual(q.one(), None)
+
+ def testOneDictWithEmptyQuery(self):
+ q = self.c.query("select 0 where false")
+ self.assertIsNone(q.onedict())
+
+ def testOneDictWithSingleRow(self):
+ q = self.c.query("select 1 as one, 2 as two")
+ r = q.onedict()
+ self.assertIsInstance(r, dict)
+ self.assertEqual(r, dict(_one_=1, two=2))
+ self.assertEqual(q.onedict(), None)
+
+ def testOneDictWithTwoRows(self):
+ q = self.c.query(
+ "select 1 as one, 2 as two union select 3 as one, 4 as two")
+ self.assertEqual(q.onedict(), dict(_one_=1, two=2))
+ self.assertEqual(q.onedict(), dict(_one_=3, two=4))
+ self.assertEqual(q.onedict(), None)
+
+ def testOneNamedWithEmptyQuery(self):
+ q = self.c.query("select 0 where false")
+ self.assertIsNone(q.onenamed())
+
+ def testOneNamedWithSingleRow(self):
+ q = self.c.query("select 1 as one, 2 as two")
+ r = q.onenamed()
+ self.assertEqual(r._fields, ('one', 'two'))
+ self.assertEqual(r.one, 1)
+ self.assertEqual(r.two, 2)
+ self.assertEqual(r, (1, 2))
+ self.assertEqual(q.onenamed(), None)
+
+ def testOneNamedWithTwoRows(self):
+ q = self.c.query(
+ "select 1 as one, 2 as two union select 3 as one, 4 as two")
+ r = q.onenamed()
+ self.assertEqual(r._fields, ('one', 'two'))
+ self.assertEqual(r.one, 1)
+ self.assertEqual(r.two, 2)
+ self.assertEqual(r, (1, 2))
+ r = q.onenamed()
+ self.assertEqual(r._fields, ('one', 'two'))
+ self.assertEqual(r.one, 3)
+ self.assertEqual(r.two, 4)
+ self.assertEqual(r, (3, 4))
+ self.assertEqual(q.onenamed(), None)
+
+ def testOneScalarWithEmptyQuery(self):
+ q = self.c.query("select 0 where false")
+ self.assertIsNone(q.onescalar())
+
+ def testOneScalarWithSingleRow(self):
+ q = self.c.query("select 1, 2")
+ r = q.onescalar()
+ self.assertIsInstance(r, int)
+ self.assertEqual(r, 1)
+ self.assertEqual(q.onescalar(), None)
+
+ def testOneScalarWithTwoRows(self):
+ q = self.c.query("select 1, 2 union select 3, 4")
+ self.assertEqual(q.onescalar(), 1)
+ self.assertEqual(q.onescalar(), 3)
+ self.assertEqual(q.onescalar(), None)
+
+ def testSingleWithEmptyQuery(self):
+ q = self.c.query("select 0 where false")
+ try:
+ q.single()
+ except pg.ProgrammingError as e:
+ r = str(e)
+ else:
+ r = None
+ self.assertEqual(r, 'No result found')
+
+ def testSingleWithSingleRow(self):
+ q = self.c.query("select 1, 2")
+ r = q.single()
+ self.assertIsInstance(r, tuple)
+ self.assertEqual(r, (1, 2))
+ r = q.single()
+ self.assertIsInstance(r, tuple)
+ self.assertEqual(r, (1, 2))
+
+ def testSingleWithTwoRows(self):
+ q = self.c.query("select 1, 2 union select 3, 4")
+ try:
+ q.single()
+ except pg.ProgrammingError as e:
+ r = str(e)
+ else:
+ r = None
+ self.assertEqual(r, 'Multiple results found')
+
+ def testSingleDictWithEmptyQuery(self):
+ q = self.c.query("select 0 where false")
+ try:
+ q.singledict()
+ except pg.ProgrammingError as e:
+ r = str(e)
+ else:
+ r = None
+ self.assertEqual(r, 'No result found')
+
+ def testSingleDictWithSingleRow(self):
+ q = self.c.query("select 1 as one, 2 as two")
+ r = q.singledict()
+ self.assertIsInstance(r, dict)
+ self.assertEqual(r, dict(_one_=1, two=2))
+ r = q.singledict()
+ self.assertIsInstance(r, dict)
+ self.assertEqual(r, dict(_one_=1, two=2))
+
+ def testSingleDictWithTwoRows(self):
+ q = self.c.query("select 1, 2 union select 3, 4")
+ try:
+ q.singledict()
+ except pg.ProgrammingError as e:
+ r = str(e)
+ else:
+ r = None
+ self.assertEqual(r, 'Multiple results found')
+
+ def testSingleNamedWithEmptyQuery(self):
+ q = self.c.query("select 0 where false")
+ try:
+ q.singlenamed()
+ except pg.ProgrammingError as e:
+ r = str(e)
+ else:
+ r = None
+ self.assertEqual(r, 'No result found')
+
+ def testSingleNamedWithSingleRow(self):
+ q = self.c.query("select 1 as one, 2 as two")
+ r = q.singlenamed()
+ self.assertEqual(r._fields, ('one', 'two'))
+ self.assertEqual(r.one, 1)
+ self.assertEqual(r.two, 2)
+ self.assertEqual(r, (1, 2))
+ r = q.singlenamed()
+ self.assertEqual(r._fields, ('one', 'two'))
+ self.assertEqual(r.one, 1)
+ self.assertEqual(r.two, 2)
+ self.assertEqual(r, (1, 2))
+
+ def testSingleNamedWithTwoRows(self):
+ q = self.c.query("select 1, 2 union select 3, 4")
+ try:
+ q.singlenamed()
+ except pg.ProgrammingError as e:
+ r = str(e)
+ else:
+ r = None
+ self.assertEqual(r, 'Multiple results found')
+
+ def testSingleScalarWithEmptyQuery(self):
+ q = self.c.query("select 0 where false")
+ try:
+ q.singlescalar()
+ except pg.ProgrammingError as e:
+ r = str(e)
+ else:
+ r = None
+ self.assertEqual(r, 'No result found')
+
+ def testSingleScalarWithSingleRow(self):
+ q = self.c.query("select 1, 2")
+ r = q.singlescalar()
+ self.assertIsInstance(r, int)
+ self.assertEqual(r, 1)
+ r = q.singlescalar()
+ self.assertIsInstance(r, int)
+ self.assertEqual(r, 1)
+
+ def testSingleWithTwoRows(self):
+ q = self.c.query("select 1, 2 union select 3, 4")
+ try:
+ q.singlescalar()
+ except pg.ProgrammingError as e:
+ r = str(e)
+ else:
+ r = None
+ self.assertEqual(r, 'Multiple results found')
+
+ def testScalarResult(self):
+ q = self.c.query("select 1, 2 union select 3, 4")
+ r = q.scalarresult()
+ self.assertIsInstance(r, list)
+ self.assertEqual(r, [1, 3])
+
+ def testScalarIter(self):
+ q = self.c.query("select 1, 2 union select 3, 4")
+ r = q.scalariter()
+ self.assertNotIsInstance(r, (list, tuple))
+ self.assertIsInstance(r, Iterable)
+ r = list(r)
+ self.assertEqual(r, [1, 3])
+
+
class TestInserttable(unittest.TestCase):
"""Test inserttable method."""
@@ -1994,144 +2244,6 @@
self.assertIsInstance(r, bytes)
self.assertEqual(r, b'data')
- def testGetDictditer(self):
- dictiter = pg.get_dictiter()
- # error if a parameter is passed
- self.assertRaises(TypeError, pg.get_dictiter, dictiter)
- self.assertIs(dictiter, pg._dictiter) # the default setting
-
- def testSetDictiter(self):
- dictiter = pg.get_dictiter()
- self.assertTrue(callable(dictiter))
-
- query = self.c.query
-
- r = query("select 1 as x, 2 as y").dictiter()
- self.assertNotIsInstance(r, list)
- r = next(r)
- self.assertIsInstance(r, dict)
- self.assertEqual(r, dict(x=1, y=2))
-
- def listiter(q):
- for row in q:
- yield list(row)
-
- pg.set_dictiter(listiter)
- try:
- r = pg.get_dictiter()
- self.assertIs(r, listiter)
- r = query("select 1 as x, 2 as y").dictiter()
- self.assertNotIsInstance(r, list)
- r = next(r)
- self.assertIsInstance(r, list)
- self.assertEqual(r, [1, 2])
- self.assertNotIsInstance(r, dict)
- finally:
- pg.set_dictiter(dictiter)
-
- r = pg.get_dictiter()
- self.assertIs(r, dictiter)
-
- def testGetNamediter(self):
- namediter = pg.get_namediter()
- # error if a parameter is passed
- self.assertRaises(TypeError, pg.get_namediter, namediter)
- self.assertIs(namediter, pg._namediter) # the default setting
-
- def testSetNamediter(self):
- namediter = pg.get_namediter()
- self.assertTrue(callable(namediter))
-
- query = self.c.query
-
- r = query("select 1 as x, 2 as y").namediter()
- self.assertNotIsInstance(r, list)
- r = next(r)
- self.assertIsInstance(r, tuple)
- self.assertEqual(r, (1, 2))
- self.assertIsNot(type(r), tuple)
- self.assertEqual(r._fields, ('x', 'y'))
- self.assertEqual(r._asdict(), {'x': 1, 'y': 2})
- self.assertEqual(r.__class__.__name__, 'Row')
- r = query("select 1 as x, 2 as y").namedresult()
- self.assertIsInstance(r, list)
- r = r[0]
- self.assertIsInstance(r, tuple)
- self.assertEqual(r, (1, 2))
- self.assertIsNot(type(r), tuple)
- self.assertEqual(r._fields, ('x', 'y'))
- self.assertEqual(r._asdict(), {'x': 1, 'y': 2})
- self.assertEqual(r.__class__.__name__, 'Row')
-
- def listiter(q):
- for row in q:
- yield list(row)
-
- pg.set_namediter(listiter)
- try:
- r = pg.get_namediter()
- self.assertIs(r, listiter)
- r = query("select 1 as x, 2 as y").namediter()
- self.assertNotIsInstance(r, list)
- r = next(r)
- self.assertIsInstance(r, list)
- self.assertEqual(r, [1, 2])
- self.assertIsNot(type(r), tuple)
- self.assertFalse(hasattr(r, '_fields'))
- self.assertNotEqual(r.__class__.__name__, 'Row')
- r = query("select 1 as x, 2 as y").namedresult()
- self.assertIsInstance(r, list)
- r = r[0]
- self.assertIsInstance(r, list)
- self.assertEqual(r, [1, 2])
- self.assertIsNot(type(r), tuple)
- self.assertFalse(hasattr(r, '_fields'))
- self.assertNotEqual(r.__class__.__name__, 'Row')
- finally:
- pg.set_namediter(namediter)
-
- r = pg.get_namediter()
- self.assertIs(r, namediter)
-
- def testGetNamedresult(self): # deprecated
- namedresult = pg.get_namedresult()
- # error if a parameter is passed
- self.assertRaises(TypeError, pg.get_namedresult, namedresult)
- self.assertIs(namedresult, pg._namediter) # the default setting
-
- def testSetNamedresult(self): # deprecated
- namedresult = pg.get_namedresult()
- self.assertTrue(callable(namedresult))
-
- query = self.c.query
-
- r = query("select 1 as x, 2 as y").namedresult()[0]
- self.assertIsInstance(r, tuple)
- self.assertEqual(r, (1, 2))
- self.assertIsNot(type(r), tuple)
- self.assertEqual(r._fields, ('x', 'y'))
- self.assertEqual(r._asdict(), {'x': 1, 'y': 2})
- self.assertEqual(r.__class__.__name__, 'Row')
-
- def listresult(q):
- return [list(row) for row in q.getresult()]
-
- pg.set_namedresult(listresult)
- try:
- r = pg.get_namedresult()
- self.assertIs(r, listresult)
- r = query("select 1 as x, 2 as y").namedresult()[0]
- self.assertIsInstance(r, list)
- self.assertEqual(r, [1, 2])
- self.assertIsNot(type(r), tuple)
- self.assertFalse(hasattr(r, '_fields'))
- self.assertNotEqual(r.__class__.__name__, 'Row')
- finally:
- pg.set_namedresult(namedresult)
-
- r = pg.get_namedresult()
- self.assertIs(r, namedresult)
-
def testSetRowFactorySize(self):
try:
from functools import lru_cache
Modified: trunk/tests/test_classic_dbwrapper.py (979 => 980)
--- trunk/tests/test_classic_dbwrapper.py 2019-04-21 19:28:33 UTC (rev 979)
+++ trunk/tests/test_classic_dbwrapper.py 2019-04-22 14:44:58 UTC (rev 980)
@@ -4139,7 +4139,6 @@
cls.set_option('array', not_array)
not_bytea_escaped = not pg.get_bytea_escaped()
cls.set_option('bytea_escaped', not_bytea_escaped)
- cls.set_option('namedresult', None)
cls.set_option('jsondecode', None)
db = DB()
cls.regtypes = not db.use_regtypes()
@@ -4150,7 +4149,6 @@
def tearDownClass(cls):
super(TestDBClassNonStdOpts, cls).tearDownClass()
cls.reset_option('jsondecode')
- cls.reset_option('namedresult')
cls.reset_option('bool')
cls.reset_option('array')
cls.reset_option('bytea_escaped')
@@ -4167,7 +4165,7 @@
class TestDBClassAdapter(unittest.TestCase):
- """Test the adapter object associatd with the DB class."""
+ """Test the adapter object associated with the DB class."""
def setUp(self):
self.db = DB()
Modified: trunk/tests/test_classic_functions.py (979 => 980)
--- trunk/tests/test_classic_functions.py 2019-04-21 19:28:33 UTC (rev 979)
+++ trunk/tests/test_classic_functions.py 2019-04-22 14:44:58 UTC (rev 980)
@@ -1005,69 +1005,6 @@
self.assertIsInstance(r, bool)
self.assertIs(r, bytea_escaped)
- def testGetDictiter(self):
- r = pg.get_dictiter()
- self.assertTrue(callable(r))
- self.assertIs(r, pg._dictiter)
-
- def testSetDictiter(self):
- dictiter = pg.get_dictiter()
- try:
- pg.set_dictiter(None)
- r = pg.get_dictiter()
- self.assertIsNone(r)
- f = lambda q: q
- pg.set_dictiter(f)
- r = pg.get_dictiter()
- self.assertIs(r, f)
- self.assertRaises(TypeError, pg.set_dictiter, 'invalid')
- finally:
- pg.set_dictiter(dictiter)
- r = pg.get_dictiter()
- self.assertIs(r, dictiter)
-
- def testGetNamediter(self):
- r = pg.get_namediter()
- self.assertTrue(callable(r))
- self.assertIs(r, pg._namediter)
-
- def testSetNamediter(self):
- namediter = pg.get_namediter()
- try:
- pg.set_namediter(None)
- r = pg.get_namediter()
- self.assertIsNone(r)
- f = lambda q: q
- pg.set_namediter(f)
- r = pg.get_namediter()
- self.assertIs(r, f)
- self.assertRaises(TypeError, pg.set_namediter, 'invalid')
- finally:
- pg.set_namediter(namediter)
- r = pg.get_namediter()
- self.assertIs(r, namediter)
-
- def testGetNamedresult(self): # deprecated
- r = pg.get_namedresult()
- self.assertTrue(callable(r))
- self.assertIs(r, pg._namediter)
-
- def testSetNamedresult(self): # deprecated
- namedresult = pg.get_namedresult()
- try:
- pg.set_namedresult(None)
- r = pg.get_namedresult()
- self.assertIsNone(r)
- f = lambda q: q.getresult()
- pg.set_namedresult(f)
- r = pg.get_namedresult()
- self.assertIs(r, f)
- self.assertRaises(TypeError, pg.set_namedresult, 'invalid')
- finally:
- pg.set_namedresult(namedresult)
- r = pg.get_namedresult()
- self.assertIs(r, namedresult)
-
def testGetJsondecode(self):
r = pg.get_jsondecode()
self.assertTrue(callable(r))