Author: darcy
Date: Sat Jan 5 10:19:07 2013
New Revision: 486
Log:
Open database in every test to assure that there are no interactions.
Add tests for pgnotify.
Modified:
trunk/module/TEST_PyGreSQL_classic.py
Modified: trunk/module/TEST_PyGreSQL_classic.py
==============================================================================
--- trunk/module/TEST_PyGreSQL_classic.py Sat Jan 5 10:07:57 2013
(r485)
+++ trunk/module/TEST_PyGreSQL_classic.py Sat Jan 5 10:19:07 2013
(r486)
@@ -2,6 +2,7 @@
from __future__ import with_statement
+import sys, thread, time
import unittest
from pg import *
@@ -16,17 +17,27 @@
except ImportError:
pass
-db = DB(dbname, dbhost, dbport)
-db.query("SET DATESTYLE TO 'ISO'")
-db.query("SET TIME ZONE 'EST5EDT'")
-db.query("SET DEFAULT_WITH_OIDS=TRUE")
-db.query("SET STANDARD_CONFORMING_STRINGS=FALSE")
-
+def opendb():
+ db = DB(dbname, dbhost, dbport)
+ db.query("SET DATESTYLE TO 'ISO'")
+ db.query("SET TIME ZONE 'EST5EDT'")
+ db.query("SET DEFAULT_WITH_OIDS=TRUE")
+ db.query("SET STANDARD_CONFORMING_STRINGS=FALSE")
+ return db
+
+def cb1(arg_dict):
+ global cb1_return
+ if arg_dict is None:
+ cb1_return = 'timed out'
+ else:
+ cb1_return = arg_dict
class UtilityTest(unittest.TestCase):
def setUp(self):
"""Setup test tables or empty them if they already exist."""
+ db = opendb()
+
for t in ('_test1', '_test2'):
try:
db.query("CREATE SCHEMA " + t)
@@ -50,10 +61,12 @@
def test_invalidname(self):
"""Make sure that invalid table names are caught"""
+ db = opendb()
self.failUnlessRaises(ProgrammingError, db.get_attnames, 'x.y.z')
def test_schema(self):
"""Does it differentiate the same table name in different schemas"""
+ db = opendb()
# see if they differentiate the table names properly
self.assertEqual(
db.get_attnames('_test_schema'),
@@ -73,6 +86,7 @@
)
def test_pkey(self):
+ db = opendb()
self.assertEqual(db.pkey('_test_schema'), '_test')
self.assertEqual(db.pkey('public._test_schema'), '_test')
self.assertEqual(db.pkey('_test1._test_schema'), '_test1')
@@ -84,6 +98,7 @@
self.assertEqual(db.pkey('public.test1'), 'a')
def test_get(self):
+ db = opendb()
db.query("INSERT INTO _test_schema VALUES (1234)")
db.get('_test_schema', 1234)
db.get('_test_schema', 1234, keyname='_test')
@@ -91,11 +106,13 @@
db.get('_test_vschema', 1234, keyname='_test')
def test_params(self):
+ db = opendb()
db.query("INSERT INTO _test_schema VALUES ($1, $2, $3)", 12, None, 34)
d = db.get('_test_schema', 12)
self.assertEqual(d['dvar'], 34)
def test_insert(self):
+ db = opendb()
d = dict(_test=1234)
db.insert('_test_schema', d)
self.assertEqual(d['dvar'], 999)
@@ -103,6 +120,7 @@
self.assertEqual(d['dvar'], 999)
def test_context_manager(self):
+ db = opendb()
t = '_test_schema'
d = dict(_test=1235)
with db:
@@ -128,6 +146,7 @@
self.assertTrue(db.get(t, 1239))
def test_sqlstate(self):
+ db = opendb()
db.query("INSERT INTO _test_schema VALUES (1234)")
try:
db.query("INSERT INTO _test_schema VALUES (1234)")
@@ -138,6 +157,7 @@
self.assertEqual(error.sqlstate, '23505')
def test_mixed_case(self):
+ db = opendb()
try:
db.query('CREATE TABLE _test_mc ("_Test" int PRIMARY KEY)')
except Error:
@@ -146,6 +166,7 @@
db.insert('_test_mc', d)
def test_update(self):
+ db = opendb()
db.query("INSERT INTO _test_schema VALUES (1234)")
r = db.get('_test_schema', 1234)
@@ -165,6 +186,7 @@
self.assertEqual(r['dvar'], 456)
def test_quote(self):
+ db = opendb()
q = db._quote
self.assertEqual(q(0, 'int'), "0")
self.assertEqual(q(0, 'num'), "0")
@@ -204,6 +226,79 @@
self.assertEqual(q("'", 'text'), "''''")
self.assertEqual(q("\\", 'text'), "'\\\\'")
+ # note that notify can be created as part of the DB class or
+ # independently.
+
+ def test_notify_DB(self):
+ global cb1_return
+
+ db = opendb()
+ db2 = opendb()
+ # Listen for 'event_1'
+ pgn = db2.pgnotify('event_1', cb1)
+ thread.start_new_thread(pgn, ())
+ time.sleep(1)
+ # Generate notification from the other connection.
+ db.query('notify event_1')
+ time.sleep(1)
+ # Check that callback has been invoked.
+ self.assertEquals(cb1_return['event'], 'event_1')
+
+ def test_notify_timeout_DB(self):
+ db = opendb()
+ db2 = opendb()
+ global cb1_return
+ # Listen for 'event_1'
+ pgn = db2.pgnotify('event_1', cb1, {}, 1)
+ thread.start_new_thread(pgn, ())
+ # Sleep long enough to time out.
+ time.sleep(2)
+ # Verify that we've indeed timed out.
+ self.assertEquals(cb1_return, 'timed out')
+
+ def test_notify(self):
+ db = opendb()
+ db2 = opendb()
+ global cb1_return
+ # Listen for 'event_1'
+ pgn = pgnotify(db2, 'event_1', cb1)
+ thread.start_new_thread(pgn, ())
+ time.sleep(1)
+ # Generate notification from the other connection.
+ db.query('notify event_1')
+ time.sleep(1)
+ # Check that callback has been invoked.
+ self.assertEquals(cb1_return['event'], 'event_1')
+
+ def test_notify_timeout(self):
+ db = opendb()
+ db2 = opendb()
+ global cb1_return
+ # Listen for 'event_1'
+ pgn = pgnotify(db2, 'event_1', cb1, {}, 1)
+ thread.start_new_thread(pgn, ())
+ # Sleep long enough to time out.
+ time.sleep(2)
+ # Verify that we've indeed timed out.
+ self.assertEquals(cb1_return, 'timed out')
if __name__ == '__main__':
- unittest.main()
+ suite = unittest.TestSuite()
+
+ if len(sys.argv) > 1: test_list = sys.argv[1:]
+ else: test_list = unittest.getTestCaseNames(UtilityTest, 'test_')
+
+ if len(sys.argv) == 2 and sys.argv[1] == '-l':
+ print '\n'.join(unittest.getTestCaseNames(UtilityTest, 'test_'))
+ sys.exit(1)
+
+ for test_name in test_list:
+ try:
+ suite.addTest(UtilityTest(test_name))
+ except:
+ print "\n ERROR: %s.\n" % sys.exc_value
+ sys.exit(1)
+
+ rc = unittest.TextTestRunner(verbosity=1).run(suite)
+ sys.exit(len(rc.errors+rc.failures) != 0)
+
_______________________________________________
PyGreSQL mailing list
[email protected]
https://mail.vex.net/mailman/listinfo.cgi/pygresql