All:
We know mysql will kick off the idle connection after a wait timeout,
Trac will not work any more after the connection were broken, So I made
some change this morning, here is a patch for Trac-1.0.1 which get from
yum repository.
Usage:
cd /usr/lib/python2.6/site-package/Trac-1.0.1/
patch -p0 < /tmp/dbutils.trac.patch
hope it's usefull.
Regards.
--
You received this message because you are subscribed to the Google Groups "Trac
Development" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/trac-dev.
For more options, visit https://groups.google.com/groups/opt_out.
diff -Naur trac/db/tests/api.py ../trac/db/tests/api.py
--- trac/db/tests/api.py 2013-02-01 08:47:40.000000000 +0800
+++ ../trac/db/tests/api.py 1970-01-01 08:00:00.000000000 +0800
@@ -1,355 +0,0 @@
-# -*- coding: utf-8 -*-
-
-from __future__ import with_statement
-
-import os
-import unittest
-
-from trac.db.api import DatabaseManager, _parse_db_str, get_column_names, \
- with_transaction
-from trac.test import EnvironmentStub, Mock
-from trac.util.concurrency import ThreadLocal
-
-
-class Connection(object):
-
- committed = False
- rolledback = False
-
- def commit(self):
- self.committed = True
-
- def rollback(self):
- self.rolledback = True
-
-
-class Error(Exception):
- pass
-
-
-def make_env(get_cnx):
- return Mock(components={DatabaseManager:
- Mock(get_connection=get_cnx,
- _transaction_local=ThreadLocal(wdb=None, rdb=None))})
-
-
-class WithTransactionTest(unittest.TestCase):
-
- def test_successful_transaction(self):
- db = Connection()
- env = make_env(lambda: db)
- @with_transaction(env)
- def do_transaction(db):
- self.assertTrue(not db.committed and not db.rolledback)
- self.assertTrue(db.committed and not db.rolledback)
-
- def test_failed_transaction(self):
- db = Connection()
- env = make_env(lambda: db)
- try:
- @with_transaction(env)
- def do_transaction(db):
- self.assertTrue(not db.committed and not db.rolledback)
- raise Error()
- self.fail()
- except Error:
- pass
- self.assertTrue(not db.committed and db.rolledback)
-
- def test_implicit_nesting_success(self):
- env = make_env(Connection)
- dbs = [None, None]
- @with_transaction(env)
- def level0(db):
- dbs[0] = db
- @with_transaction(env)
- def level1(db):
- dbs[1] = db
- self.assertTrue(not db.committed and not db.rolledback)
- self.assertTrue(not db.committed and not db.rolledback)
- self.assertTrue(dbs[0] is not None)
- self.assertTrue(dbs[0] is dbs[1])
- self.assertTrue(dbs[0].committed and not dbs[0].rolledback)
-
- def test_implicit_nesting_failure(self):
- env = make_env(Connection)
- dbs = [None, None]
- try:
- @with_transaction(env)
- def level0(db):
- dbs[0] = db
- try:
- @with_transaction(env)
- def level1(db):
- dbs[1] = db
- self.assertTrue(not db.committed and not db.rolledback)
- raise Error()
- self.fail()
- except Error:
- self.assertTrue(not db.committed and not db.rolledback)
- raise
- self.fail()
- except Error:
- pass
- self.assertTrue(dbs[0] is not None)
- self.assertTrue(dbs[0] is dbs[1])
- self.assertTrue(not dbs[0].committed and dbs[0].rolledback)
-
- def test_explicit_success(self):
- db = Connection()
- env = make_env(lambda: None)
- @with_transaction(env, db)
- def do_transaction(idb):
- self.assertTrue(idb is db)
- self.assertTrue(not db.committed and not db.rolledback)
- self.assertTrue(not db.committed and not db.rolledback)
-
- def test_explicit_failure(self):
- db = Connection()
- env = make_env(lambda: None)
- try:
- @with_transaction(env, db)
- def do_transaction(idb):
- self.assertTrue(idb is db)
- self.assertTrue(not db.committed and not db.rolledback)
- raise Error()
- self.fail()
- except Error:
- pass
- self.assertTrue(not db.committed and not db.rolledback)
-
- def test_implicit_in_explicit_success(self):
- db = Connection()
- env = make_env(lambda: db)
- dbs = [None, None]
- @with_transaction(env, db)
- def level0(db):
- dbs[0] = db
- @with_transaction(env)
- def level1(db):
- dbs[1] = db
- self.assertTrue(not db.committed and not db.rolledback)
- self.assertTrue(not db.committed and not db.rolledback)
- self.assertTrue(dbs[0] is not None)
- self.assertTrue(dbs[0] is dbs[1])
- self.assertTrue(not dbs[0].committed and not dbs[0].rolledback)
-
- def test_implicit_in_explicit_failure(self):
- db = Connection()
- env = make_env(lambda: db)
- dbs = [None, None]
- try:
- @with_transaction(env, db)
- def level0(db):
- dbs[0] = db
- @with_transaction(env)
- def level1(db):
- dbs[1] = db
- self.assertTrue(not db.committed and not db.rolledback)
- raise Error()
- self.fail()
- self.fail()
- except Error:
- pass
- self.assertTrue(dbs[0] is not None)
- self.assertTrue(dbs[0] is dbs[1])
- self.assertTrue(not dbs[0].committed and not dbs[0].rolledback)
-
- def test_explicit_in_implicit_success(self):
- db = Connection()
- env = make_env(lambda: db)
- dbs = [None, None]
- @with_transaction(env)
- def level0(db):
- dbs[0] = db
- @with_transaction(env, db)
- def level1(db):
- dbs[1] = db
- self.assertTrue(not db.committed and not db.rolledback)
- self.assertTrue(not db.committed and not db.rolledback)
- self.assertTrue(dbs[0] is not None)
- self.assertTrue(dbs[0] is dbs[1])
- self.assertTrue(dbs[0].committed and not dbs[0].rolledback)
-
- def test_explicit_in_implicit_failure(self):
- db = Connection()
- env = make_env(lambda: db)
- dbs = [None, None]
- try:
- @with_transaction(env)
- def level0(db):
- dbs[0] = db
- @with_transaction(env, db)
- def level1(db):
- dbs[1] = db
- self.assertTrue(not db.committed and not db.rolledback)
- raise Error()
- self.fail()
- self.fail()
- except Error:
- pass
- self.assertTrue(dbs[0] is not None)
- self.assertTrue(dbs[0] is dbs[1])
- self.assertTrue(not dbs[0].committed and dbs[0].rolledback)
-
- def test_invalid_nesting(self):
- env = make_env(Connection)
- try:
- @with_transaction(env)
- def level0(db):
- @with_transaction(env, Connection())
- def level1(db):
- raise Error()
- raise Error()
- raise Error()
- except AssertionError:
- pass
-
-
-
-class ParseConnectionStringTestCase(unittest.TestCase):
-
- def test_sqlite_relative(self):
- # Default syntax for specifying DB path relative to the environment
- # directory
- self.assertEqual(('sqlite', {'path': 'db/trac.db'}),
- _parse_db_str('sqlite:db/trac.db'))
-
- def test_sqlite_absolute(self):
- # Standard syntax
- self.assertEqual(('sqlite', {'path': '/var/db/trac.db'}),
- _parse_db_str('sqlite:///var/db/trac.db'))
- # Legacy syntax
- self.assertEqual(('sqlite', {'path': '/var/db/trac.db'}),
- _parse_db_str('sqlite:/var/db/trac.db'))
-
- def test_sqlite_with_timeout_param(self):
- # In-memory database
- self.assertEqual(('sqlite', {'path': 'db/trac.db',
- 'params': {'timeout': '10000'}}),
- _parse_db_str('sqlite:db/trac.db?timeout=10000'))
-
- def test_sqlite_windows_path(self):
- # In-memory database
- os_name = os.name
- try:
- os.name = 'nt'
- self.assertEqual(('sqlite', {'path': 'C:/project/db/trac.db'}),
- _parse_db_str('sqlite:C|/project/db/trac.db'))
- finally:
- os.name = os_name
-
- def test_postgres_simple(self):
- self.assertEqual(('postgres', {'host': 'localhost', 'path': '/trac'}),
- _parse_db_str('postgres://localhost/trac'))
-
- def test_postgres_with_port(self):
- self.assertEqual(('postgres', {'host': 'localhost', 'port': 9431,
- 'path': '/trac'}),
- _parse_db_str('postgres://localhost:9431/trac'))
-
- def test_postgres_with_creds(self):
- self.assertEqual(('postgres', {'user': 'john', 'password': 'letmein',
- 'host': 'localhost', 'port': 9431,
- 'path': '/trac'}),
- _parse_db_str('postgres://john:letmein@localhost:9431/trac'))
-
- def test_postgres_with_quoted_password(self):
- self.assertEqual(('postgres', {'user': 'john', 'password': ':@/',
- 'host': 'localhost', 'path': '/trac'}),
- _parse_db_str('postgres://john:%3a%40%2f@localhost/trac'))
-
- def test_mysql_simple(self):
- self.assertEqual(('mysql', {'host': 'localhost', 'path': '/trac'}),
- _parse_db_str('mysql://localhost/trac'))
-
- def test_mysql_with_creds(self):
- self.assertEqual(('mysql', {'user': 'john', 'password': 'letmein',
- 'host': 'localhost', 'port': 3306,
- 'path': '/trac'}),
- _parse_db_str('mysql://john:letmein@localhost:3306/trac'))
-
-
-class StringsTestCase(unittest.TestCase):
-
- def setUp(self):
- self.env = EnvironmentStub()
-
- def test_insert_unicode(self):
- self.env.db_transaction(
- "INSERT INTO system (name,value) VALUES (%s,%s)",
- ('test-unicode', u'?nic?de'))
- self.assertEqual([(u'?nic?de',)], self.env.db_query(
- "SELECT value FROM system WHERE name='test-unicode'"))
-
- def test_insert_empty(self):
- from trac.util.text import empty
- self.env.db_transaction(
- "INSERT INTO system (name,value) VALUES (%s,%s)",
- ('test-empty', empty))
- self.assertEqual([(u'',)], self.env.db_query(
- "SELECT value FROM system WHERE name='test-empty'"))
-
- def test_insert_markup(self):
- from genshi.core import Markup
- self.env.db_transaction(
- "INSERT INTO system (name,value) VALUES (%s,%s)",
- ('test-markup', Markup(u'<em>m?rkup</em>')))
- self.assertEqual([(u'<em>m?rkup</em>',)], self.env.db_query(
- "SELECT value FROM system WHERE name='test-markup'"))
-
- def test_quote(self):
- db = self.env.get_db_cnx()
- cursor = db.cursor()
- cursor.execute('SELECT 1 AS %s' % \
- db.quote(r'alpha\`\"\'\\beta``gamma""delta'))
- self.assertEqual(r'alpha\`\"\'\\beta``gamma""delta',
- get_column_names(cursor)[0])
-
-
-class ConnectionTestCase(unittest.TestCase):
- def setUp(self):
- self.env = EnvironmentStub()
-
- def tearDown(self):
- self.env.reset_db()
-
- def test_get_last_id(self):
- id1 = id2 = None
- q = "INSERT INTO report (author) VALUES ('anonymous')"
- with self.env.db_transaction as db:
- cursor = db.cursor()
- cursor.execute(q)
- # Row ID correct before...
- id1 = db.get_last_id(cursor, 'report')
- self.assertNotEqual(0, id1)
- db.commit()
- cursor.execute(q)
- # ... and after commit()
- db.commit()
- id2 = db.get_last_id(cursor, 'report')
- self.assertEqual(id1 + 1, id2)
-
- def test_update_sequence(self):
- self.env.db_transaction(
- "INSERT INTO report (id, author) VALUES (42, 'anonymous')")
- with self.env.db_transaction as db:
- cursor = db.cursor()
- db.update_sequence(cursor, 'report', 'id')
- self.env.db_transaction(
- "INSERT INTO report (author) VALUES ('next-id')")
- self.assertEqual(43, self.env.db_query(
- "SELECT id FROM report WHERE author='next-id'")[0][0])
-
-
-def suite():
- suite = unittest.TestSuite()
- suite.addTest(unittest.makeSuite(ParseConnectionStringTestCase, 'test'))
- suite.addTest(unittest.makeSuite(StringsTestCase, 'test'))
- suite.addTest(unittest.makeSuite(ConnectionTestCase, 'test'))
- suite.addTest(unittest.makeSuite(WithTransactionTest, 'test'))
- return suite
-
-
-if __name__ == '__main__':
- unittest.main(defaultTest='suite')
diff -Naur trac/db/tests/functional.py ../trac/db/tests/functional.py
--- trac/db/tests/functional.py 2013-02-01 08:47:40.000000000 +0800
+++ ../trac/db/tests/functional.py 1970-01-01 08:00:00.000000000 +0800
@@ -1,29 +0,0 @@
-#!/usr/bin/python
-
-import os
-from trac.tests.functional import *
-
-
-class DatabaseBackupTestCase(FunctionalTestCaseSetup):
- def runTest(self):
- """Testing backup"""
- env = self._testenv.get_trac_environment()
- # raises TracError if backup fails
- backup_file = env.backup()
- self.assertTrue(os.path.exists(backup_file),
- 'Backup file was not created.')
- self.assertNotEqual(os.path.getsize(backup_file), 0,
- 'Backup file is zero length.')
-
-
-def functionalSuite(suite=None):
- if not suite:
- import trac.tests.functional.testcases
- suite = trac.tests.functional.testcases.functionalSuite()
- suite.addTest(DatabaseBackupTestCase())
- return suite
-
-
-if __name__ == '__main__':
- unittest.main(defaultTest='functionalSuite')
-
diff -Naur trac/db/tests/__init__.py ../trac/db/tests/__init__.py
--- trac/db/tests/__init__.py 2013-02-01 08:47:40.000000000 +0800
+++ ../trac/db/tests/__init__.py 1970-01-01 08:00:00.000000000 +0800
@@ -1,18 +0,0 @@
-import unittest
-
-from trac.db.tests import api, mysql_test, postgres_test, util
-
-from trac.db.tests.functional import functionalSuite
-
-def suite():
-
- suite = unittest.TestSuite()
- suite.addTest(api.suite())
- suite.addTest(mysql_test.suite())
- suite.addTest(postgres_test.suite())
- suite.addTest(util.suite())
- return suite
-
-if __name__ == '__main__':
- unittest.main(defaultTest='suite')
-
diff -Naur trac/db/tests/mysql_test.py ../trac/db/tests/mysql_test.py
--- trac/db/tests/mysql_test.py 2013-02-01 08:47:40.000000000 +0800
+++ ../trac/db/tests/mysql_test.py 1970-01-01 08:00:00.000000000 +0800
@@ -1,61 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Copyright (C) 2009 Edgewall Software
-# All rights reserved.
-#
-# This software is licensed as described in the file COPYING, which
-# you should have received as part of this distribution. The terms
-# are also available at http://trac.edgewall.org/wiki/TracLicense.
-#
-# This software consists of voluntary contributions made by many
-# individuals. For the exact contribution history, see the revision
-# history and logs, available at http://trac.edgewall.org/log/.
-
-import unittest
-
-from trac.db.mysql_backend import MySQLConnector
-from trac.test import EnvironmentStub
-
-
-class MySQLTableAlterationSQLTest(unittest.TestCase):
- def setUp(self):
- self.env = EnvironmentStub()
-
- def test_alter_column_types(self):
- connector = MySQLConnector(self.env)
- sql = connector.alter_column_types('milestone',
- {'due': ('int', 'int64'),
- 'completed': ('int', 'int64')})
- sql = list(sql)
- self.assertEqual([
- "ALTER TABLE milestone "
- "MODIFY completed bigint, "
- "MODIFY due bigint",
- ], sql)
-
- def test_alter_column_types_same(self):
- connector = MySQLConnector(self.env)
- sql = connector.alter_column_types('milestone',
- {'due': ('int', 'int'),
- 'completed': ('int', 'int64')})
- sql = list(sql)
- self.assertEqual([
- "ALTER TABLE milestone "
- "MODIFY completed bigint",
- ], sql)
-
- def test_alter_column_types_none(self):
- connector = MySQLConnector(self.env)
- sql = connector.alter_column_types('milestone',
- {'due': ('int', 'int')})
- self.assertEqual([], list(sql))
-
-
-def suite():
- suite = unittest.TestSuite()
- suite.addTest(unittest.makeSuite(MySQLTableAlterationSQLTest, 'test'))
- return suite
-
-
-if __name__ == '__main__':
- unittest.main(defaultTest='suite')
diff -Naur trac/db/tests/postgres_test.py ../trac/db/tests/postgres_test.py
--- trac/db/tests/postgres_test.py 2013-02-01 08:47:40.000000000 +0800
+++ ../trac/db/tests/postgres_test.py 1970-01-01 08:00:00.000000000 +0800
@@ -1,158 +0,0 @@
-# -*- coding: utf-8 -*-
-
-import re
-import unittest
-
-from trac.db import Table, Column, Index
-from trac.db.postgres_backend import PostgreSQLConnector, assemble_pg_dsn
-from trac.test import EnvironmentStub
-
-
-class PostgresTableCreationSQLTest(unittest.TestCase):
- def setUp(self):
- self.env = EnvironmentStub()
-
- def _unroll_generator(self, generator):
- items = []
- for item in generator:
- items.append(item)
- return items
-
- def _normalize_sql(self, sql_generator):
- normalized_commands = []
- whitespace_regex = re.compile(r'\s+')
- commands = self._unroll_generator(sql_generator)
- for command in commands:
- command = command.replace('\n', '')
- command = whitespace_regex.sub(' ', command)
- normalized_commands.append(command)
- return normalized_commands
-
- def test_quote_table_name(self):
- table = Table('foo bar')
- table[Column('name'),]
- sql_generator = PostgreSQLConnector(self.env).to_sql(table)
- sql_commands = self._normalize_sql(sql_generator)
- self.assertEqual(1, len(sql_commands))
- self.assertEqual('CREATE TABLE "foo bar" ( "name" text)',
- sql_commands[0])
-
- def test_quote_column_names(self):
- table = Table('foo')
- table[Column('my name'),]
- sql_generator = PostgreSQLConnector(self.env).to_sql(table)
- sql_commands = self._normalize_sql(sql_generator)
- self.assertEqual(1, len(sql_commands))
- self.assertEqual('CREATE TABLE "foo" ( "my name" text)',
- sql_commands[0])
-
- def test_quote_compound_primary_key_declaration(self):
- table = Table('foo bar', key=['my name', 'your name'])
- table[Column('my name'), Column('your name'),]
- sql_generator = PostgreSQLConnector(self.env).to_sql(table)
- sql_commands = self._normalize_sql(sql_generator)
- self.assertEqual(1, len(sql_commands))
- expected_sql = 'CREATE TABLE "foo bar" ( "my name" text, ' + \
- '"your name" text, CONSTRAINT "foo bar_pk" ' +\
- 'PRIMARY KEY ("my name","your name"))'
- self.assertEqual(expected_sql, sql_commands[0])
-
- def test_quote_index_declaration(self):
- table = Table('foo')
- table[Column('my name'), Index(['my name'])]
- sql_generator = PostgreSQLConnector(self.env).to_sql(table)
- sql_commands = self._normalize_sql(sql_generator)
- self.assertEqual(2, len(sql_commands))
- self.assertEqual('CREATE TABLE "foo" ( "my name" text)',
- sql_commands[0])
- index_sql = 'CREATE INDEX "foo_my name_idx" ON "foo" ("my name")'
- self.assertEqual(index_sql, sql_commands[1])
-
- def test_quote_index_declaration_for_multiple_indexes(self):
- table = Table('foo')
- table[Column('a'), Column('b'),
- Index(['a', 'b'])]
- sql_generator = PostgreSQLConnector(self.env).to_sql(table)
- sql_commands = self._normalize_sql(sql_generator)
- self.assertEqual(2, len(sql_commands))
- self.assertEqual('CREATE TABLE "foo" ( "a" text, "b" text)',
- sql_commands[0])
- index_sql = 'CREATE INDEX "foo_a_b_idx" ON "foo" ("a","b")'
- self.assertEqual(index_sql, sql_commands[1])
-
- def test_assemble_dsn(self):
- values = [
- {'path': 't', 'user': 't'},
- {'path': 't', 'password': 't'},
- {'path': 't', 'host': 't'},
- {'path': 't', 'port': 't'},
- {'path': 't', 'password': 't', 'user': 't'},
- {'path': 't', 'host': 't', 'user': 't'},
- {'path': 't', 'user': 't', 'port': 't'},
- {'path': 't', 'host': 't', 'password': 't'},
- {'path': 't', 'password': 't', 'port': 't'},
- {'path': 't', 'host': 't', 'port': 't'},
- {'path': 't', 'host': 't', 'password': 't', 'user': 't'},
- {'path': 't', 'password': 't', 'user': 't', 'port': 't'},
- {'path': 't', 'host': 't', 'user': 't', 'port': 't'},
- {'path': 't', 'host': 't', 'password': 't', 'port': 't'},
- ]
- for orig in values:
- dsn = assemble_pg_dsn(**orig)
- for k, v in orig.iteritems():
- orig[k] = "'%s'" % v
- continue
- orig['dbname'] = "'t'"
- del orig['path']
- new_values = {'dbname': "'t'"}
- for key_value in dsn.split(' '):
- k, v = key_value.split('=')
- new_values[k] = v
- continue
- self.assertEqual(new_values, orig)
- continue
-
-
-class PostgresTableAlterationSQLTest(unittest.TestCase):
- def setUp(self):
- self.env = EnvironmentStub()
-
- def test_alter_column_types(self):
- connector = PostgreSQLConnector(self.env)
- sql = connector.alter_column_types('milestone',
- {'due': ('int', 'int64'),
- 'completed': ('int', 'int64')})
- sql = list(sql)
- self.assertEqual([
- "ALTER TABLE milestone "
- "ALTER COLUMN completed TYPE bigint, "
- "ALTER COLUMN due TYPE bigint",
- ], sql)
-
- def test_alter_column_types_same(self):
- connector = PostgreSQLConnector(self.env)
- sql = connector.alter_column_types('milestone',
- {'due': ('int', 'int'),
- 'completed': ('int', 'int64')})
- sql = list(sql)
- self.assertEqual([
- "ALTER TABLE milestone "
- "ALTER COLUMN completed TYPE bigint",
- ], sql)
-
- def test_alter_column_types_none(self):
- connector = PostgreSQLConnector(self.env)
- sql = connector.alter_column_types('milestone',
- {'due': ('int', 'int')})
- self.assertEqual([], list(sql))
-
-
-def suite():
- suite = unittest.TestSuite()
- suite.addTest(unittest.makeSuite(PostgresTableCreationSQLTest, 'test'))
- suite.addTest(unittest.makeSuite(PostgresTableAlterationSQLTest, 'test'))
- return suite
-
-
-if __name__ == '__main__':
- unittest.main(defaultTest='suite')
diff -Naur trac/db/tests/util.py ../trac/db/tests/util.py
--- trac/db/tests/util.py 2013-02-01 08:47:40.000000000 +0800
+++ ../trac/db/tests/util.py 1970-01-01 08:00:00.000000000 +0800
@@ -1,41 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Copyright (C) 2010 Edgewall Software
-# All rights reserved.
-#
-# This software is licensed as described in the file COPYING, which
-# you should have received as part of this distribution. The terms
-# are also available at http://trac.edgewall.org/wiki/TracLicense.
-#
-# This software consists of voluntary contributions made by many
-# individuals. For the exact contribution history, see the revision
-# history and logs, available at http://trac.edgewall.org/log/.
-
-import unittest
-
-from trac.db.util import sql_escape_percent
-
-# TODO: test IterableCursor, ConnectionWrapper
-
-class SQLEscapeTestCase(unittest.TestCase):
- def test_sql_escape_percent(self):
- self.assertEqual("%", sql_escape_percent("%"))
- self.assertEqual("'%%'", sql_escape_percent("'%'"))
- self.assertEqual("''%''", sql_escape_percent("''%''"))
- self.assertEqual("'''%%'''", sql_escape_percent("'''%'''"))
- self.assertEqual("'''%%'", sql_escape_percent("'''%'"))
- self.assertEqual("%s", sql_escape_percent("%s"))
- self.assertEqual("% %", sql_escape_percent("% %"))
- self.assertEqual("%s %i", sql_escape_percent("%s %i"))
- self.assertEqual("'%%s'", sql_escape_percent("'%s'"))
- self.assertEqual("'%% %%'", sql_escape_percent("'% %'"))
- self.assertEqual("'%%s %%i'", sql_escape_percent("'%s %i'"))
-
-
-def suite():
- suite = unittest.TestSuite()
- suite.addTest(unittest.makeSuite(SQLEscapeTestCase, 'test'))
- return suite
-
-if __name__ == '__main__':
- unittest.main(defaultTest='suite')
diff -Naur trac/db/util.py ../trac/db/util.py
--- trac/db/util.py 2013-02-01 08:47:40.000000000 +0800
+++ ../trac/db/util.py 2013-08-27 12:17:32.795244987 +0800
@@ -86,6 +86,7 @@
return self.cursor.executemany(sql, args)
+
class ConnectionWrapper(object):
"""Generic wrapper around connection objects.
@@ -97,6 +98,12 @@
"""
__slots__ = ('cnx', 'log', 'readonly')
+ def commit(self):
+ return self.cnx.connection().cursor().execute("commit")
+
+ def roolback(self):
+ return self.cnx.connection().cursor().execute("rollback")
+
def __init__(self, cnx, log=None, readonly=False):
self.cnx = cnx
self.log = log
@@ -153,3 +160,10 @@
if self.readonly and not dql:
raise ValueError("a 'readonly' connection can only do a SELECT")
return dql
+
+try:
+ from DBUtils.PooledDB import PooledDB as connect
+except ImportError:
+ def connect(dbapi, maxusage=0, setsession=None, *args, **kwargs):
+ return dbapi.connect(*args, **kwargs)
+