All:

    We know mysql will kick off the idle connection after a wait timeout, 
Trac will not work any more after the connection were broken, So I made 
some change this morning, here is a patch for Trac-1.0.1 which  get  from 
yum repository.
 
    Usage: 

    cd /usr/lib/python2.6/site-package/Trac-1.0.1/
    patch -p0 < /tmp/dbutils.trac.patch

    hope it's usefull.

Regards.

-- 
You received this message because you are subscribed to the Google Groups "Trac 
Development" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/trac-dev.
For more options, visit https://groups.google.com/groups/opt_out.
diff -Naur trac/db/tests/api.py ../trac/db/tests/api.py
--- trac/db/tests/api.py	2013-02-01 08:47:40.000000000 +0800
+++ ../trac/db/tests/api.py	1970-01-01 08:00:00.000000000 +0800
@@ -1,355 +0,0 @@
-# -*- coding: utf-8 -*-
-
-from __future__ import with_statement
-
-import os
-import unittest
-
-from trac.db.api import DatabaseManager, _parse_db_str, get_column_names, \
-                        with_transaction
-from trac.test import EnvironmentStub, Mock
-from trac.util.concurrency import ThreadLocal
-
-
-class Connection(object):
-
-    committed = False
-    rolledback = False
-
-    def commit(self):
-        self.committed = True
-
-    def rollback(self):
-        self.rolledback = True
-
-
-class Error(Exception):
-    pass
-
-
-def make_env(get_cnx):
-    return Mock(components={DatabaseManager:
-             Mock(get_connection=get_cnx,
-                  _transaction_local=ThreadLocal(wdb=None, rdb=None))})
-
-
-class WithTransactionTest(unittest.TestCase):
-
-    def test_successful_transaction(self):
-        db = Connection()
-        env = make_env(lambda: db)
-        @with_transaction(env)
-        def do_transaction(db):
-            self.assertTrue(not db.committed and not db.rolledback)
-        self.assertTrue(db.committed and not db.rolledback)
-
-    def test_failed_transaction(self):
-        db = Connection()
-        env = make_env(lambda: db)
-        try:
-            @with_transaction(env)
-            def do_transaction(db):
-                self.assertTrue(not db.committed and not db.rolledback)
-                raise Error()
-            self.fail()
-        except Error:
-            pass
-        self.assertTrue(not db.committed and db.rolledback)
-
-    def test_implicit_nesting_success(self):
-        env = make_env(Connection)
-        dbs = [None, None]
-        @with_transaction(env)
-        def level0(db):
-            dbs[0] = db
-            @with_transaction(env)
-            def level1(db):
-                dbs[1] = db
-                self.assertTrue(not db.committed and not db.rolledback)
-            self.assertTrue(not db.committed and not db.rolledback)
-        self.assertTrue(dbs[0] is not None)
-        self.assertTrue(dbs[0] is dbs[1])
-        self.assertTrue(dbs[0].committed and not dbs[0].rolledback)
-
-    def test_implicit_nesting_failure(self):
-        env = make_env(Connection)
-        dbs = [None, None]
-        try:
-            @with_transaction(env)
-            def level0(db):
-                dbs[0] = db
-                try:
-                    @with_transaction(env)
-                    def level1(db):
-                        dbs[1] = db
-                        self.assertTrue(not db.committed and not db.rolledback)
-                        raise Error()
-                    self.fail()
-                except Error:
-                    self.assertTrue(not db.committed and not db.rolledback)
-                    raise
-            self.fail()
-        except Error:
-            pass
-        self.assertTrue(dbs[0] is not None)
-        self.assertTrue(dbs[0] is dbs[1])
-        self.assertTrue(not dbs[0].committed and dbs[0].rolledback)
-
-    def test_explicit_success(self):
-        db = Connection()
-        env = make_env(lambda: None)
-        @with_transaction(env, db)
-        def do_transaction(idb):
-            self.assertTrue(idb is db)
-            self.assertTrue(not db.committed and not db.rolledback)
-        self.assertTrue(not db.committed and not db.rolledback)
-
-    def test_explicit_failure(self):
-        db = Connection()
-        env = make_env(lambda: None)
-        try:
-            @with_transaction(env, db)
-            def do_transaction(idb):
-                self.assertTrue(idb is db)
-                self.assertTrue(not db.committed and not db.rolledback)
-                raise Error()
-            self.fail()
-        except Error:
-            pass
-        self.assertTrue(not db.committed and not db.rolledback)
-
-    def test_implicit_in_explicit_success(self):
-        db = Connection()
-        env = make_env(lambda: db)
-        dbs = [None, None]
-        @with_transaction(env, db)
-        def level0(db):
-            dbs[0] = db
-            @with_transaction(env)
-            def level1(db):
-                dbs[1] = db
-                self.assertTrue(not db.committed and not db.rolledback)
-            self.assertTrue(not db.committed and not db.rolledback)
-        self.assertTrue(dbs[0] is not None)
-        self.assertTrue(dbs[0] is dbs[1])
-        self.assertTrue(not dbs[0].committed and not dbs[0].rolledback)
-
-    def test_implicit_in_explicit_failure(self):
-        db = Connection()
-        env = make_env(lambda: db)
-        dbs = [None, None]
-        try:
-            @with_transaction(env, db)
-            def level0(db):
-                dbs[0] = db
-                @with_transaction(env)
-                def level1(db):
-                    dbs[1] = db
-                    self.assertTrue(not db.committed and not db.rolledback)
-                    raise Error()
-                self.fail()
-            self.fail()
-        except Error:
-            pass
-        self.assertTrue(dbs[0] is not None)
-        self.assertTrue(dbs[0] is dbs[1])
-        self.assertTrue(not dbs[0].committed and not dbs[0].rolledback)
-
-    def test_explicit_in_implicit_success(self):
-        db = Connection()
-        env = make_env(lambda: db)
-        dbs = [None, None]
-        @with_transaction(env)
-        def level0(db):
-            dbs[0] = db
-            @with_transaction(env, db)
-            def level1(db):
-                dbs[1] = db
-                self.assertTrue(not db.committed and not db.rolledback)
-            self.assertTrue(not db.committed and not db.rolledback)
-        self.assertTrue(dbs[0] is not None)
-        self.assertTrue(dbs[0] is dbs[1])
-        self.assertTrue(dbs[0].committed and not dbs[0].rolledback)
-
-    def test_explicit_in_implicit_failure(self):
-        db = Connection()
-        env = make_env(lambda: db)
-        dbs = [None, None]
-        try:
-            @with_transaction(env)
-            def level0(db):
-                dbs[0] = db
-                @with_transaction(env, db)
-                def level1(db):
-                    dbs[1] = db
-                    self.assertTrue(not db.committed and not db.rolledback)
-                    raise Error()
-                self.fail()
-            self.fail()
-        except Error:
-            pass
-        self.assertTrue(dbs[0] is not None)
-        self.assertTrue(dbs[0] is dbs[1])
-        self.assertTrue(not dbs[0].committed and dbs[0].rolledback)
-
-    def test_invalid_nesting(self):
-        env = make_env(Connection)
-        try:
-            @with_transaction(env)
-            def level0(db):
-                @with_transaction(env, Connection())
-                def level1(db):
-                    raise Error()
-                raise Error()
-            raise Error()
-        except AssertionError:
-            pass
-
-
-
-class ParseConnectionStringTestCase(unittest.TestCase):
-
-    def test_sqlite_relative(self):
-        # Default syntax for specifying DB path relative to the environment
-        # directory
-        self.assertEqual(('sqlite', {'path': 'db/trac.db'}),
-                         _parse_db_str('sqlite:db/trac.db'))
-
-    def test_sqlite_absolute(self):
-        # Standard syntax
-        self.assertEqual(('sqlite', {'path': '/var/db/trac.db'}),
-                         _parse_db_str('sqlite:///var/db/trac.db'))
-        # Legacy syntax
-        self.assertEqual(('sqlite', {'path': '/var/db/trac.db'}),
-                         _parse_db_str('sqlite:/var/db/trac.db'))
-
-    def test_sqlite_with_timeout_param(self):
-        # In-memory database
-        self.assertEqual(('sqlite', {'path': 'db/trac.db',
-                                     'params': {'timeout': '10000'}}),
-                         _parse_db_str('sqlite:db/trac.db?timeout=10000'))
-
-    def test_sqlite_windows_path(self):
-        # In-memory database
-        os_name = os.name
-        try:
-            os.name = 'nt'
-            self.assertEqual(('sqlite', {'path': 'C:/project/db/trac.db'}),
-                             _parse_db_str('sqlite:C|/project/db/trac.db'))
-        finally:
-            os.name = os_name
-
-    def test_postgres_simple(self):
-        self.assertEqual(('postgres', {'host': 'localhost', 'path': '/trac'}),
-                         _parse_db_str('postgres://localhost/trac'))
-
-    def test_postgres_with_port(self):
-        self.assertEqual(('postgres', {'host': 'localhost', 'port': 9431,
-                                       'path': '/trac'}),
-                         _parse_db_str('postgres://localhost:9431/trac'))
-
-    def test_postgres_with_creds(self):
-        self.assertEqual(('postgres', {'user': 'john', 'password': 'letmein',
-                                       'host': 'localhost', 'port': 9431,
-                                       'path': '/trac'}),
-                 _parse_db_str('postgres://john:letmein@localhost:9431/trac'))
-
-    def test_postgres_with_quoted_password(self):
-        self.assertEqual(('postgres', {'user': 'john', 'password': ':@/',
-                                       'host': 'localhost', 'path': '/trac'}),
-                     _parse_db_str('postgres://john:%3a%40%2f@localhost/trac'))
-
-    def test_mysql_simple(self):
-        self.assertEqual(('mysql', {'host': 'localhost', 'path': '/trac'}),
-                     _parse_db_str('mysql://localhost/trac'))
-
-    def test_mysql_with_creds(self):
-        self.assertEqual(('mysql', {'user': 'john', 'password': 'letmein',
-                                    'host': 'localhost', 'port': 3306,
-                                    'path': '/trac'}),
-                     _parse_db_str('mysql://john:letmein@localhost:3306/trac'))
-
-
-class StringsTestCase(unittest.TestCase):
-
-    def setUp(self):
-        self.env = EnvironmentStub()
-
-    def test_insert_unicode(self):
-        self.env.db_transaction(
-                "INSERT INTO system (name,value) VALUES (%s,%s)",
-                ('test-unicode', u'?nic?de'))
-        self.assertEqual([(u'?nic?de',)], self.env.db_query(
-            "SELECT value FROM system WHERE name='test-unicode'"))
-
-    def test_insert_empty(self):
-        from trac.util.text import empty
-        self.env.db_transaction(
-                "INSERT INTO system (name,value) VALUES (%s,%s)",
-                ('test-empty', empty))
-        self.assertEqual([(u'',)], self.env.db_query(
-            "SELECT value FROM system WHERE name='test-empty'"))
-
-    def test_insert_markup(self):
-        from genshi.core import Markup
-        self.env.db_transaction(
-                "INSERT INTO system (name,value) VALUES (%s,%s)",
-                ('test-markup', Markup(u'<em>m?rkup</em>')))
-        self.assertEqual([(u'<em>m?rkup</em>',)], self.env.db_query(
-            "SELECT value FROM system WHERE name='test-markup'"))
-
-    def test_quote(self):
-        db = self.env.get_db_cnx()
-        cursor = db.cursor()
-        cursor.execute('SELECT 1 AS %s' % \
-                       db.quote(r'alpha\`\"\'\\beta``gamma""delta'))
-        self.assertEqual(r'alpha\`\"\'\\beta``gamma""delta',
-                         get_column_names(cursor)[0])
-
-
-class ConnectionTestCase(unittest.TestCase):
-    def setUp(self):
-        self.env = EnvironmentStub()
-
-    def tearDown(self):
-        self.env.reset_db()
-
-    def test_get_last_id(self):
-        id1 = id2 = None
-        q = "INSERT INTO report (author) VALUES ('anonymous')"
-        with self.env.db_transaction as db:
-            cursor = db.cursor()
-            cursor.execute(q)
-            # Row ID correct before...
-            id1 = db.get_last_id(cursor, 'report')
-            self.assertNotEqual(0, id1)
-            db.commit()
-            cursor.execute(q)
-            # ... and after commit()
-            db.commit()
-            id2 = db.get_last_id(cursor, 'report')
-            self.assertEqual(id1 + 1, id2)
-
-    def test_update_sequence(self):
-        self.env.db_transaction(
-            "INSERT INTO report (id, author) VALUES (42, 'anonymous')")
-        with self.env.db_transaction as db:
-            cursor = db.cursor()
-            db.update_sequence(cursor, 'report', 'id')
-        self.env.db_transaction(
-            "INSERT INTO report (author) VALUES ('next-id')")
-        self.assertEqual(43, self.env.db_query(
-                "SELECT id FROM report WHERE author='next-id'")[0][0])
-
-
-def suite():
-    suite = unittest.TestSuite()
-    suite.addTest(unittest.makeSuite(ParseConnectionStringTestCase, 'test'))
-    suite.addTest(unittest.makeSuite(StringsTestCase, 'test'))
-    suite.addTest(unittest.makeSuite(ConnectionTestCase, 'test'))
-    suite.addTest(unittest.makeSuite(WithTransactionTest, 'test'))
-    return suite
-
-
-if __name__ == '__main__':
-    unittest.main(defaultTest='suite')
diff -Naur trac/db/tests/functional.py ../trac/db/tests/functional.py
--- trac/db/tests/functional.py	2013-02-01 08:47:40.000000000 +0800
+++ ../trac/db/tests/functional.py	1970-01-01 08:00:00.000000000 +0800
@@ -1,29 +0,0 @@
-#!/usr/bin/python
-
-import os
-from trac.tests.functional import *
-
-
-class DatabaseBackupTestCase(FunctionalTestCaseSetup):
-    def runTest(self):
-        """Testing backup"""
-        env = self._testenv.get_trac_environment()
-        # raises TracError if backup fails
-        backup_file = env.backup()
-        self.assertTrue(os.path.exists(backup_file),
-                        'Backup file was not created.')
-        self.assertNotEqual(os.path.getsize(backup_file), 0,
-                            'Backup file is zero length.')
-
-
-def functionalSuite(suite=None):
-    if not suite:
-        import trac.tests.functional.testcases
-        suite = trac.tests.functional.testcases.functionalSuite()
-    suite.addTest(DatabaseBackupTestCase())
-    return suite
-
-
-if __name__ == '__main__':
-    unittest.main(defaultTest='functionalSuite')
-
diff -Naur trac/db/tests/__init__.py ../trac/db/tests/__init__.py
--- trac/db/tests/__init__.py	2013-02-01 08:47:40.000000000 +0800
+++ ../trac/db/tests/__init__.py	1970-01-01 08:00:00.000000000 +0800
@@ -1,18 +0,0 @@
-import unittest
-
-from trac.db.tests import api, mysql_test, postgres_test, util
-
-from trac.db.tests.functional import functionalSuite
-
-def suite():
-
-    suite = unittest.TestSuite()
-    suite.addTest(api.suite())
-    suite.addTest(mysql_test.suite())
-    suite.addTest(postgres_test.suite())
-    suite.addTest(util.suite())
-    return suite
-
-if __name__ == '__main__':
-    unittest.main(defaultTest='suite')
-
diff -Naur trac/db/tests/mysql_test.py ../trac/db/tests/mysql_test.py
--- trac/db/tests/mysql_test.py	2013-02-01 08:47:40.000000000 +0800
+++ ../trac/db/tests/mysql_test.py	1970-01-01 08:00:00.000000000 +0800
@@ -1,61 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Copyright (C) 2009 Edgewall Software
-# All rights reserved.
-#
-# This software is licensed as described in the file COPYING, which
-# you should have received as part of this distribution. The terms
-# are also available at http://trac.edgewall.org/wiki/TracLicense.
-#
-# This software consists of voluntary contributions made by many
-# individuals. For the exact contribution history, see the revision
-# history and logs, available at http://trac.edgewall.org/log/.
-
-import unittest
-
-from trac.db.mysql_backend import MySQLConnector
-from trac.test import EnvironmentStub
-
-
-class MySQLTableAlterationSQLTest(unittest.TestCase):
-    def setUp(self):
-        self.env = EnvironmentStub()
-
-    def test_alter_column_types(self):
-        connector = MySQLConnector(self.env)
-        sql = connector.alter_column_types('milestone',
-                                           {'due': ('int', 'int64'),
-                                            'completed': ('int', 'int64')})
-        sql = list(sql)
-        self.assertEqual([
-            "ALTER TABLE milestone "
-                "MODIFY completed bigint, "
-                "MODIFY due bigint",
-            ], sql)
-
-    def test_alter_column_types_same(self):
-        connector = MySQLConnector(self.env)
-        sql = connector.alter_column_types('milestone',
-                                           {'due': ('int', 'int'),
-                                            'completed': ('int', 'int64')})
-        sql = list(sql)
-        self.assertEqual([
-            "ALTER TABLE milestone "
-                "MODIFY completed bigint",
-            ], sql)
-
-    def test_alter_column_types_none(self):
-        connector = MySQLConnector(self.env)
-        sql = connector.alter_column_types('milestone',
-                                           {'due': ('int', 'int')})
-        self.assertEqual([], list(sql))
-
-
-def suite():
-    suite = unittest.TestSuite()
-    suite.addTest(unittest.makeSuite(MySQLTableAlterationSQLTest, 'test'))
-    return suite
-
-
-if __name__ == '__main__':
-    unittest.main(defaultTest='suite')
diff -Naur trac/db/tests/postgres_test.py ../trac/db/tests/postgres_test.py
--- trac/db/tests/postgres_test.py	2013-02-01 08:47:40.000000000 +0800
+++ ../trac/db/tests/postgres_test.py	1970-01-01 08:00:00.000000000 +0800
@@ -1,158 +0,0 @@
-# -*- coding: utf-8 -*-
-
-import re
-import unittest
-
-from trac.db import Table, Column, Index
-from trac.db.postgres_backend import PostgreSQLConnector, assemble_pg_dsn
-from trac.test import EnvironmentStub
-
-
-class PostgresTableCreationSQLTest(unittest.TestCase):
-    def setUp(self):
-        self.env = EnvironmentStub()
-
-    def _unroll_generator(self, generator):
-        items = []
-        for item in generator:
-            items.append(item)
-        return items
-
-    def _normalize_sql(self, sql_generator):
-        normalized_commands = []
-        whitespace_regex = re.compile(r'\s+')
-        commands = self._unroll_generator(sql_generator)
-        for command in commands:
-            command = command.replace('\n', '')
-            command = whitespace_regex.sub(' ', command)
-            normalized_commands.append(command)
-        return normalized_commands
-
-    def test_quote_table_name(self):
-        table = Table('foo bar')
-        table[Column('name'),]
-        sql_generator = PostgreSQLConnector(self.env).to_sql(table)
-        sql_commands = self._normalize_sql(sql_generator)
-        self.assertEqual(1, len(sql_commands))
-        self.assertEqual('CREATE TABLE "foo bar" ( "name" text)',
-                         sql_commands[0])
-
-    def test_quote_column_names(self):
-        table = Table('foo')
-        table[Column('my name'),]
-        sql_generator = PostgreSQLConnector(self.env).to_sql(table)
-        sql_commands = self._normalize_sql(sql_generator)
-        self.assertEqual(1, len(sql_commands))
-        self.assertEqual('CREATE TABLE "foo" ( "my name" text)',
-                         sql_commands[0])
-
-    def test_quote_compound_primary_key_declaration(self):
-        table = Table('foo bar', key=['my name', 'your name'])
-        table[Column('my name'), Column('your name'),]
-        sql_generator = PostgreSQLConnector(self.env).to_sql(table)
-        sql_commands = self._normalize_sql(sql_generator)
-        self.assertEqual(1, len(sql_commands))
-        expected_sql = 'CREATE TABLE "foo bar" ( "my name" text, ' + \
-                       '"your name" text, CONSTRAINT "foo bar_pk" ' +\
-                       'PRIMARY KEY ("my name","your name"))'
-        self.assertEqual(expected_sql, sql_commands[0])
-
-    def test_quote_index_declaration(self):
-        table = Table('foo')
-        table[Column('my name'), Index(['my name'])]
-        sql_generator = PostgreSQLConnector(self.env).to_sql(table)
-        sql_commands = self._normalize_sql(sql_generator)
-        self.assertEqual(2, len(sql_commands))
-        self.assertEqual('CREATE TABLE "foo" ( "my name" text)',
-                         sql_commands[0])
-        index_sql = 'CREATE INDEX "foo_my name_idx" ON "foo" ("my name")'
-        self.assertEqual(index_sql, sql_commands[1])
-
-    def test_quote_index_declaration_for_multiple_indexes(self):
-        table = Table('foo')
-        table[Column('a'), Column('b'),
-              Index(['a', 'b'])]
-        sql_generator = PostgreSQLConnector(self.env).to_sql(table)
-        sql_commands = self._normalize_sql(sql_generator)
-        self.assertEqual(2, len(sql_commands))
-        self.assertEqual('CREATE TABLE "foo" ( "a" text, "b" text)',
-                         sql_commands[0])
-        index_sql = 'CREATE INDEX "foo_a_b_idx" ON "foo" ("a","b")'
-        self.assertEqual(index_sql, sql_commands[1])
-
-    def test_assemble_dsn(self):
-        values = [
-            {'path': 't', 'user': 't'},
-            {'path': 't', 'password': 't'},
-            {'path': 't', 'host': 't'},
-            {'path': 't', 'port': 't'},
-            {'path': 't', 'password': 't', 'user': 't'},
-            {'path': 't', 'host': 't', 'user': 't'},
-            {'path': 't', 'user': 't', 'port': 't'},
-            {'path': 't', 'host': 't', 'password': 't'},
-            {'path': 't', 'password': 't', 'port': 't'},
-            {'path': 't', 'host': 't', 'port': 't'},
-            {'path': 't', 'host': 't', 'password': 't', 'user': 't'},
-            {'path': 't', 'password': 't', 'user': 't', 'port': 't'},
-            {'path': 't', 'host': 't', 'user': 't', 'port': 't'},
-            {'path': 't', 'host': 't', 'password': 't', 'port': 't'},
-        ]
-        for orig in values:
-            dsn = assemble_pg_dsn(**orig)
-            for k, v in orig.iteritems():
-                orig[k] = "'%s'" % v
-                continue
-            orig['dbname'] = "'t'"
-            del orig['path']
-            new_values = {'dbname': "'t'"}
-            for key_value in dsn.split(' '):
-                k, v = key_value.split('=')
-                new_values[k] = v
-                continue
-            self.assertEqual(new_values, orig)
-            continue
-
-
-class PostgresTableAlterationSQLTest(unittest.TestCase):
-    def setUp(self):
-        self.env = EnvironmentStub()
-
-    def test_alter_column_types(self):
-        connector = PostgreSQLConnector(self.env)
-        sql = connector.alter_column_types('milestone',
-                                           {'due': ('int', 'int64'),
-                                            'completed': ('int', 'int64')})
-        sql = list(sql)
-        self.assertEqual([
-            "ALTER TABLE milestone "
-                "ALTER COLUMN completed TYPE bigint, "
-                "ALTER COLUMN due TYPE bigint",
-            ], sql)
-
-    def test_alter_column_types_same(self):
-        connector = PostgreSQLConnector(self.env)
-        sql = connector.alter_column_types('milestone',
-                                           {'due': ('int', 'int'),
-                                            'completed': ('int', 'int64')})
-        sql = list(sql)
-        self.assertEqual([
-            "ALTER TABLE milestone "
-                "ALTER COLUMN completed TYPE bigint",
-            ], sql)
-
-    def test_alter_column_types_none(self):
-        connector = PostgreSQLConnector(self.env)
-        sql = connector.alter_column_types('milestone',
-                                           {'due': ('int', 'int')})
-        self.assertEqual([], list(sql))
-
-
-def suite():
-    suite = unittest.TestSuite()
-    suite.addTest(unittest.makeSuite(PostgresTableCreationSQLTest, 'test'))
-    suite.addTest(unittest.makeSuite(PostgresTableAlterationSQLTest, 'test'))
-    return suite
-
-
-if __name__ == '__main__':
-    unittest.main(defaultTest='suite')
diff -Naur trac/db/tests/util.py ../trac/db/tests/util.py
--- trac/db/tests/util.py	2013-02-01 08:47:40.000000000 +0800
+++ ../trac/db/tests/util.py	1970-01-01 08:00:00.000000000 +0800
@@ -1,41 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Copyright (C) 2010 Edgewall Software
-# All rights reserved.
-#
-# This software is licensed as described in the file COPYING, which
-# you should have received as part of this distribution. The terms
-# are also available at http://trac.edgewall.org/wiki/TracLicense.
-#
-# This software consists of voluntary contributions made by many
-# individuals. For the exact contribution history, see the revision
-# history and logs, available at http://trac.edgewall.org/log/.
-
-import unittest
-
-from trac.db.util import sql_escape_percent
-
-# TODO: test IterableCursor, ConnectionWrapper
-
-class SQLEscapeTestCase(unittest.TestCase):
-    def test_sql_escape_percent(self):
-        self.assertEqual("%", sql_escape_percent("%"))
-        self.assertEqual("'%%'", sql_escape_percent("'%'"))
-        self.assertEqual("''%''", sql_escape_percent("''%''"))
-        self.assertEqual("'''%%'''", sql_escape_percent("'''%'''"))
-        self.assertEqual("'''%%'", sql_escape_percent("'''%'"))
-        self.assertEqual("%s", sql_escape_percent("%s"))
-        self.assertEqual("% %", sql_escape_percent("% %"))
-        self.assertEqual("%s %i", sql_escape_percent("%s %i"))
-        self.assertEqual("'%%s'", sql_escape_percent("'%s'"))
-        self.assertEqual("'%% %%'", sql_escape_percent("'% %'"))
-        self.assertEqual("'%%s %%i'", sql_escape_percent("'%s %i'"))
-
-
-def suite():
-    suite = unittest.TestSuite()
-    suite.addTest(unittest.makeSuite(SQLEscapeTestCase, 'test'))
-    return suite
-
-if __name__ == '__main__':
-    unittest.main(defaultTest='suite')
diff -Naur trac/db/util.py ../trac/db/util.py
--- trac/db/util.py	2013-02-01 08:47:40.000000000 +0800
+++ ../trac/db/util.py	2013-08-27 12:17:32.795244987 +0800
@@ -86,6 +86,7 @@
         return self.cursor.executemany(sql, args)
 
 
+
 class ConnectionWrapper(object):
     """Generic wrapper around connection objects.
 
@@ -97,6 +98,12 @@
     """
     __slots__ = ('cnx', 'log', 'readonly')
 
+    def commit(self):
+        return self.cnx.connection().cursor().execute("commit")
+
+    def roolback(self):
+        return self.cnx.connection().cursor().execute("rollback")
+
     def __init__(self, cnx, log=None, readonly=False):
         self.cnx = cnx
         self.log = log
@@ -153,3 +160,10 @@
         if self.readonly and not dql:
             raise ValueError("a 'readonly' connection can only do a SELECT")
         return dql
+
+try: 
+    from DBUtils.PooledDB import PooledDB as connect 
+except ImportError: 
+    def connect(dbapi, maxusage=0, setsession=None, *args, **kwargs): 
+        return dbapi.connect(*args, **kwargs)
+

Reply via email to