------------------------------------------------------------ revno: 236 committer: Siegfried-Angel Gevatter Pujals <siegfr...@gevatter.com> branch nick: bluebird timestamp: Wed 2011-09-07 22:48:27 +0200 message: Fix signal test cases so extensions are detected correctly. removed: test/dbus/remote-test.orig.py modified: test/dbus/blacklist-test.py test/dbus/dsr-test.py test/dbus/monitor-test.py test/dbus/testutils.py
-- lp:~zeitgeist/zeitgeist/bluebird https://code.launchpad.net/~zeitgeist/zeitgeist/bluebird Your team Zeitgeist Framework Team is subscribed to branch lp:~zeitgeist/zeitgeist/bluebird. To unsubscribe from this branch go to https://code.launchpad.net/~zeitgeist/zeitgeist/bluebird/+edit-subscription
=== modified file 'test/dbus/blacklist-test.py' --- test/dbus/blacklist-test.py 2011-09-07 16:50:05 +0000 +++ test/dbus/blacklist-test.py 2011-09-07 20:48:27 +0000 @@ -32,7 +32,7 @@ sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) from zeitgeist.client import ZeitgeistDBusInterface from zeitgeist.datamodel import * -from testutils import RemoteTestCase +from testutils import RemoteTestCase, asyncTestMethod class BlacklistTest(RemoteTestCase): @@ -121,6 +121,7 @@ global hit hit = 0 + @asyncTestMethod(mainloop) def cb_added(template_id, event_template): global hit print "*** cb_added with hit", hit @@ -129,6 +130,7 @@ self.assertEquals(template_id, "TestTemplate") self.assertEventsEqual(template1, event_template) + @asyncTestMethod(mainloop) def cb_removed(template_id, event_template): global hit self.assertEquals(hit, 1) === modified file 'test/dbus/dsr-test.py' --- test/dbus/dsr-test.py 2011-09-05 13:09:08 +0000 +++ test/dbus/dsr-test.py 2011-09-07 20:48:27 +0000 @@ -45,7 +45,7 @@ TimeRange, StorageState, DataSource, NULL_EVENT, ResultType) import testutils -from testutils import parse_events, import_events +from testutils import parse_events, import_events, asyncTestMethod class ZeitgeistRemoteDataSourceRegistryTest(testutils.RemoteTestCase): @@ -208,11 +208,13 @@ global hit hit = 0 + @asyncTestMethod(mainloop) def cb_registered(datasource): global hit self.assertEquals(hit, 0) hit = 1 + @asyncTestMethod(mainloop) def cb_enabled(unique_id, enabled): global hit if hit == 1: @@ -228,6 +230,7 @@ else: self.fail("Unexpected number of signals: %d." % hit) + #@asyncTestMethod(mainloop) #def cb_disconnect(datasource): # self.assertEquals(hit, 3) # mainloop.quit() @@ -245,6 +248,7 @@ def testRegisterDataSourceEnabledCallbackOnRegister(self): mainloop = self.create_mainloop() + @asyncTestMethod(mainloop) def callback(enabled): mainloop.quit() self.client.register_data_source(*self._ds1, enabled_callback=callback) @@ -257,6 +261,7 @@ hit = 0 # Register a callback + @asyncTestMethod(mainloop) def callback(enabled): global hit if hit == 0: === modified file 'test/dbus/monitor-test.py' --- test/dbus/monitor-test.py 2011-08-28 17:30:10 +0000 +++ test/dbus/monitor-test.py 2011-09-07 20:48:27 +0000 @@ -45,7 +45,7 @@ TimeRange, StorageState, DataSource, NULL_EVENT, ResultType) import testutils -from testutils import parse_events, import_events +from testutils import parse_events, import_events, asyncTestMethod class ZeitgeistMonitorTest(testutils.RemoteTestCase): @@ -56,10 +56,12 @@ tmpl = Event.new_for_values(interpretation="stfu:OpenEvent") events = parse_events("test/data/five_events.js") + @asyncTestMethod(mainloop) def notify_insert_handler(time_range, events): result.extend(events) mainloop.quit() + @asyncTestMethod(mainloop) def notify_delete_handler(time_range, event_ids): mainloop.quit() self.fail("Unexpected delete notification") @@ -78,10 +80,12 @@ subjects=[Subject.new_for_values(uri="file:///tmp/bar.txt")]) events = parse_events("test/data/five_events.js") + @asyncTestMethod(mainloop) def notify_insert_handler(time_range, events): result.extend(events) mainloop.quit() + @asyncTestMethod(mainloop) def notify_delete_handler(time_range, event_ids): mainloop.quit() self.fail("Unexpected delete notification") @@ -100,10 +104,12 @@ subjects=[Subject.new_for_values(uri="file:///tmp/*")]) events = parse_events("test/data/five_events.js") + @asyncTestMethod(mainloop) def notify_insert_handler(time_range, events): result.extend(events) mainloop.quit() + @asyncTestMethod(mainloop) def notify_delete_handler(time_range, event_ids): mainloop.quit() self.fail("Unexpected delete notification") @@ -122,10 +128,12 @@ subjects=[Subject.new_for_values(uri="!file:///tmp/*")]) events = parse_events("test/data/five_events.js") + @asyncTestMethod(mainloop) def notify_insert_handler(time_range, events): result.extend(events) mainloop.quit() + @asyncTestMethod(mainloop) def notify_delete_handler(time_range, event_ids): mainloop.quit() self.fail("Unexpected delete notification") @@ -144,10 +152,12 @@ subjects=[Subject.new_for_values(uri="!file:///tmp/bar.txt")]) events = parse_events("test/data/five_events.js") + @asyncTestMethod(mainloop) def notify_insert_handler(time_range, events): result.extend(events) mainloop.quit() + @asyncTestMethod(mainloop) def notify_delete_handler(time_range, event_ids): mainloop.quit() self.fail("Unexpected delete notification") @@ -164,15 +174,16 @@ mainloop = self.create_mainloop() events = parse_events("test/data/five_events.js") + @asyncTestMethod(mainloop) def notify_insert_handler(time_range, events): event_ids = map(lambda ev : ev.id, events) self.client.delete_events(event_ids) + @asyncTestMethod(mainloop) def notify_delete_handler(time_range, event_ids): mainloop.quit() result.extend(event_ids) - self.client.install_monitor(TimeRange(125, 145), [], notify_insert_handler, notify_delete_handler) @@ -186,16 +197,19 @@ mainloop = self.create_mainloop(None) events = parse_events("test/data/five_events.js") + @asyncTestMethod(mainloop) def timeout(): # We want this timeout - we should not get informed # about deletions of non-existing events mainloop.quit() return False + @asyncTestMethod(mainloop) def notify_insert_handler(time_range, events): event_ids = map(lambda ev : ev.id, events) self.client.delete_events([9999999]) + @asyncTestMethod(mainloop) def notify_delete_handler(time_range, event_ids): mainloop.quit() self.fail("Notified about deletion of non-existing events %s", events) @@ -213,18 +227,22 @@ mainloop = self.create_mainloop() events = parse_events("test/data/five_events.js") + @asyncTestMethod(mainloop) def check_ok(): if len(result1) == 2 and len(result2) == 2: mainloop.quit() + @asyncTestMethod(mainloop) def notify_insert_handler1(time_range, events): event_ids = map(lambda ev : ev.id, events) self.client.delete_events(event_ids) + @asyncTestMethod(mainloop) def notify_delete_handler1(time_range, event_ids): result1.extend(event_ids) check_ok() + @asyncTestMethod(mainloop) def notify_delete_handler2(time_range, event_ids): result2.extend(event_ids) check_ok() @@ -246,9 +264,11 @@ mainloop = self.create_mainloop() tmpl = Event.new_for_values(interpretation="stfu:OpenEvent") + @asyncTestMethod(mainloop) def notify_insert_handler(notification_type, events): pass + @asyncTestMethod(mainloop) def notify_delete_handler(time_range, event_ids): mainloop.quit() self.fail("Unexpected delete notification") @@ -256,6 +276,7 @@ mon = self.client.install_monitor(TimeRange.always(), [tmpl], notify_insert_handler, notify_delete_handler) + @asyncTestMethod(mainloop) def removed_handler(result_state): result.append(result_state) mainloop.quit() === removed file 'test/dbus/remote-test.orig.py' --- test/dbus/remote-test.orig.py 2011-07-28 18:41:01 +0000 +++ test/dbus/remote-test.orig.py 1970-01-01 00:00:00 +0000 @@ -1,623 +0,0 @@ -#! /usr/bin/python -# -.- coding: utf-8 -.- - -# Zeitgeist -# -# Copyright © 2009-2011 Seif Lotfy <s...@lotfy.com> -# Copyright © 2009-2011 Siegfried-Angel Gevatter Pujals <siegfr...@gevatter.com> -# Copyright © 2009-2011 Mikkel Kamstrup Erlandsen <mikkel.kamst...@gmail.com> -# Copyright © 2009-2011 Markus Korn <thek...@gmx.de> -# Copyright © 2011 Collabora Ltd. -# By Siegfried-Angel Gevatter Pujals <siegfr...@gevatter.com> -# By Seif Lotfy <s...@lotfy.com> -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Lesser General Public License as published by -# the Free Software Foundation, either version 2.1 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -import unittest -import os -import sys -import logging -import signal -import time -import tempfile -import shutil -import pickle -from subprocess import Popen, PIPE - -# DBus setup -import gobject -from dbus.mainloop.glib import DBusGMainLoop -DBusGMainLoop(set_as_default=True) -from dbus.exceptions import DBusException - -from zeitgeist.datamodel import (Event, Subject, Interpretation, Manifestation, - TimeRange, StorageState, DataSource) - -import testutils -from testutils import parse_events - -class ZeitgeistRemoteAPITest(testutils.RemoteTestCase): - - def __init__(self, methodName): - super(ZeitgeistRemoteAPITest, self).__init__(methodName) - - def testInsertAndGetEvent(self): - ev = Event.new_for_values(timestamp=123, - interpretation=Interpretation.ACCESS_EVENT, - manifestation=Manifestation.USER_ACTIVITY, - actor="Freak Mamma") - subj = Subject.new_for_values(uri="void://foobar", - interpretation=Interpretation.DOCUMENT, - manifestation=Manifestation.FILE_DATA_OBJECT) - ev.append_subject(subj) - ids = self.insertEventsAndWait([ev]) - events = self.getEventsAndWait(ids) - self.assertEquals(1, len(ids)) - self.assertEquals(1, len(events)) - - ev = events[0] - self.assertTrue(isinstance(ev, Event)) - self.assertEquals("123", ev.timestamp) - self.assertEquals(Interpretation.ACCESS_EVENT, ev.interpretation) - self.assertEquals(Manifestation.USER_ACTIVITY, ev.manifestation) - self.assertEquals("Freak Mamma", ev.actor) - self.assertEquals(1, len(ev.subjects)) - self.assertEquals("void://foobar", ev.subjects[0].uri) - self.assertEquals(Interpretation.DOCUMENT, ev.subjects[0].interpretation) - self.assertEquals(Manifestation.FILE_DATA_OBJECT, ev.subjects[0].manifestation) - - def testFindTwoOfThreeEvents(self): - ev1 = Event.new_for_values(timestamp=400, - interpretation=Interpretation.ACCESS_EVENT, - manifestation=Manifestation.USER_ACTIVITY, - actor="Boogaloo") - ev2 = Event.new_for_values(timestamp=500, - interpretation=Interpretation.ACCESS_EVENT, - manifestation=Manifestation.USER_ACTIVITY, - actor="Boogaloo") - ev3 = Event.new_for_values(timestamp=600, - interpretation=Interpretation.SEND_EVENT, - manifestation=Manifestation.USER_ACTIVITY, - actor="Boogaloo") - subj1 = Subject.new_for_values(uri="foo://bar", - interpretation=Interpretation.DOCUMENT, - manifestation=Manifestation.FILE_DATA_OBJECT) - subj2 = Subject.new_for_values(uri="foo://baz", - interpretation=Interpretation.IMAGE, - manifestation=Manifestation.FILE_DATA_OBJECT) - subj3 = Subject.new_for_values(uri="foo://quiz", - interpretation=Interpretation.AUDIO, - manifestation=Manifestation.FILE_DATA_OBJECT) - ev1.append_subject(subj1) - ev2.append_subject(subj1) - ev2.append_subject(subj2) - ev3.append_subject(subj2) - ev3.append_subject(subj3) - ids = self.insertEventsAndWait([ev1, ev2, ev3]) - self.assertEquals(3, len(ids)) - - events = self.getEventsAndWait(ids) - self.assertEquals(3, len(events)) - for event in events: - self.assertTrue(isinstance(event, Event)) - self.assertEquals(Manifestation.USER_ACTIVITY, event.manifestation) - self.assertEquals("Boogaloo", event.actor) - - # Search for everything - ids = self.findEventIdsAndWait([], num_events=3) - self.assertEquals(3, len(ids)) # (we can not trust the ids because we don't have a clean test environment) - - # Search for some specific templates - subj_templ1 = Subject.new_for_values(manifestation=Manifestation.FILE_DATA_OBJECT) - subj_templ2 = Subject.new_for_values(interpretation=Interpretation.IMAGE) - event_template = Event.new_for_values( - actor="Boogaloo", - interpretation=Interpretation.ACCESS_EVENT, - subjects=[subj_templ1,subj_templ2]) - ids = self.findEventIdsAndWait([event_template], - num_events=10) - self.assertEquals(1, len(ids)) - - def testUnicodeInsert(self): - events = parse_events("test/data/unicode_event.js") - ids = self.insertEventsAndWait(events) - self.assertEquals(len(ids), len(events)) - result_events = self.getEventsAndWait(ids) - self.assertEquals(len(ids), len(result_events)) - - def testGetEvents(self): - events = parse_events("test/data/five_events.js") - ids = self.insertEventsAndWait(events) + [1000, 2000] - result = self.getEventsAndWait(ids) - self.assertEquals(len(filter(None, result)), len(events)) - self.assertEquals(len(filter(lambda event: event is None, result)), 2) - - def testMonitorInsertEvents(self): - result = [] - mainloop = self.create_mainloop() - tmpl = Event.new_for_values(interpretation="stfu:OpenEvent") - events = parse_events("test/data/five_events.js") - - def notify_insert_handler(time_range, events): - result.extend(events) - mainloop.quit() - - def notify_delete_handler(time_range, event_ids): - mainloop.quit() - self.fail("Unexpected delete notification") - - self.client.install_monitor(TimeRange.always(), [tmpl], - notify_insert_handler, notify_delete_handler) - self.client.insert_events(events) - mainloop.run() - - self.assertEquals(2, len(result)) - - def testMonitorDeleteEvents(self): - result = [] - mainloop = self.create_mainloop() - events = parse_events("test/data/five_events.js") - - def notify_insert_handler(time_range, events): - event_ids = map(lambda ev : ev.id, events) - self.client.delete_events(event_ids) - - def notify_delete_handler(time_range, event_ids): - mainloop.quit() - result.extend(event_ids) - - - self.client.install_monitor(TimeRange(125, 145), [], - notify_insert_handler, notify_delete_handler) - - self.client.insert_events(events) - mainloop.run() - - self.assertEquals(2, len(result)) - - def testMonitorDeleteNonExistingEvent(self): - result = [] - mainloop = self.create_mainloop(None) - events = parse_events("test/data/five_events.js") - - def timeout(): - # We want this timeout - we should not get informed - # about deletions of non-existing events - mainloop.quit() - return False - - def notify_insert_handler(time_range, events): - event_ids = map(lambda ev : ev.id, events) - self.client.delete_events([9999999]) - - def notify_delete_handler(time_range, event_ids): - mainloop.quit() - self.fail("Notified about deletion of non-existing events %s", events) - - self.client.install_monitor(TimeRange(125, 145), [], - notify_insert_handler, notify_delete_handler) - - gobject.timeout_add_seconds(5, timeout) - self.client.insert_events(events) - mainloop.run() - - def testTwoMonitorsDeleteEvents(self): - result1 = [] - result2 = [] - mainloop = self.create_mainloop() - events = parse_events("test/data/five_events.js") - - def check_ok(): - if len(result1) == 2 and len(result2) == 2: - mainloop.quit() - - def notify_insert_handler1(time_range, events): - event_ids = map(lambda ev : ev.id, events) - self.client.delete_events(event_ids) - - def notify_delete_handler1(time_range, event_ids): - result1.extend(event_ids) - check_ok() - - def notify_delete_handler2(time_range, event_ids): - result2.extend(event_ids) - check_ok() - - self.client.install_monitor(TimeRange(125, 145), [], - notify_insert_handler1, notify_delete_handler1) - - self.client.install_monitor(TimeRange(125, 145), [], - lambda x, y: x, notify_delete_handler2) - - self.client.insert_events(events) - mainloop.run() - - self.assertEquals(2, len(result1)) - self.assertEquals(2, len(result2)) - - def testMonitorInstallRemoval(self): - result = [] - mainloop = self.create_mainloop() - tmpl = Event.new_for_values(interpretation="stfu:OpenEvent") - - def notify_insert_handler(notification_type, events): - pass - - def notify_delete_handler(time_range, event_ids): - mainloop.quit() - self.fail("Unexpected delete notification") - - mon = self.client.install_monitor(TimeRange.always(), [tmpl], - notify_insert_handler, notify_delete_handler) - - def removed_handler(result_state): - result.append(result_state) - mainloop.quit() - - self.client.remove_monitor(mon, removed_handler) - mainloop.run() - self.assertEquals(1, len(result)) - self.assertEquals(1, result.pop()) - - def testDeleteEvents(self): - """ delete all events with actor == firefox """ - events = parse_events("test/data/five_events.js") - self.insertEventsAndWait(events) - - event = Event() - event.actor = "firefox" - - # get event ids with actor == firefox - ff_ids = self.findEventIdsAndWait([event]) - # delete this events - time_range = self.deleteEventsAndWait(ff_ids) - # got timerange of deleted events - self.assertEquals(2, len(time_range)) - # get all events, the one with actor == firefox should - # not be there - ids = self.findEventIdsAndWait([]) - self.assertEquals(2, len(ids)) - self.assertEquals(0, len(set(ff_ids) & set(ids))) - - def testFindByRandomActorAndGet(self): - events = parse_events("test/data/five_events.js") - self.insertEventsAndWait(events) - - template = Event.new_for_values(actor="/usr/bliblablu") - - ids = self.findEventIdsAndWait([template]) - self.assertEquals(len(ids), 0) - - events = self.getEventsAndWait(ids) - self.assertEquals(len(events), 0) - - def testFindRelated(self): - events = parse_events("test/data/apriori_events.js") - self.insertEventsAndWait(events) - - uris = self.findRelatedAndWait(["i2"], num_events=2, result_type=1) - self.assertEquals(uris, ["i3", "i1"]) - - uris = self.findRelatedAndWait(["i2"], num_events=2, result_type=0) - self.assertEquals(uris, ["i1", "i3"]) - - def testFindEventsForValues(self): - events = parse_events("test/data/apriori_events.js") - self.insertEventsAndWait(events) - - result = self.findEventsForValuesAndWait(actor="firefox", num_events=1) - self.assertEquals(len(result), 1) - self.assertEquals(result[0].actor, "firefox") - - def testFindEventsWithStringPayload(self): - mainloop = self.create_mainloop() - payload = "Hello World" - def callback(ids): - def callback2(events): - mainloop.quit() - self.assertEquals(events[0].payload, map(ord, payload)) - self.client.get_events(ids, callback2) - events = [Event.new_for_values(actor=u"boo", timestamp=124, subject_uri="file://yomomma")] - events[0].payload = payload - self.client.insert_events(events, callback) - mainloop.run() - - def testFindEventsWithNonASCIIPayload(self): - mainloop = self.create_mainloop() - payload = u"äöü".encode("utf-8") - def callback(ids): - def callback2(events): - mainloop.quit() - self.assertEquals(events[0].payload, map(ord, payload)) - self.client.get_events(ids, callback2) - events = [Event.new_for_values(actor=u"boo", timestamp=124, subject_uri="file://yomomma")] - events[0].payload = payload - self.client.insert_events(events, callback) - mainloop.run() - - def testFindEventsWithBinaryPayload(self): - mainloop = self.create_mainloop() - payload = pickle.dumps(1234) - def callback(ids): - def callback2(events): - mainloop.quit() - self.assertEquals(events[0].payload, map(ord, payload)) - self.client.get_events(ids, callback2) - events = [Event.new_for_values(actor=u"boo", timestamp=124, subject_uri="file://yomomma")] - events[0].payload = payload - self.client.insert_events(events, callback) - mainloop.run() - -class ZeitgeistRemoteInterfaceTest(unittest.TestCase): - - def setUp(self): - from _zeitgeist import engine - from _zeitgeist.engine import sql, constants - engine._engine = None - sql.unset_cursor() - self.saved_data = { - "datapath": constants.DATA_PATH, - "database": constants.DATABASE_FILE, - "extensions": constants.USER_EXTENSION_PATH, - } - constants.DATA_PATH = tempfile.mkdtemp(prefix="zeitgeist.datapath.") - constants.DATABASE_FILE = ":memory:" - constants.USER_EXTENSION_PATH = os.path.join(constants.DATA_PATH, "extensions") - - def tearDown(self): - from _zeitgeist.engine import constants - shutil.rmtree(constants.DATA_PATH) - constants.DATA_PATH = self.saved_data["datapath"] - constants.DATABASE_FILE = self.saved_data["database"] - constants.USER_EXTENSION_PATH = self.saved_data["extensions"] - - def testQuit(self): - """calling Quit() on the remote interface should shutdown the - engine in a clean way""" - from _zeitgeist.engine.remote import RemoteInterface - interface = RemoteInterface() - self.assertEquals(interface._engine.is_closed(), False) - interface.Quit() - self.assertEquals(interface._engine.is_closed(), True) - - -class ZeitgeistRemoteDataSourceRegistryTest(testutils.RemoteTestCase): - - _ds1 = [ - "www.example.com/foo", - "Foo Source", - "Awakes the foo in you", - [ - Event.new_for_values(subject_manifestation = "!stfu:File"), - Event.new_for_values(interpretation = "stfu:CreateEvent") - ], - ] - - _ds2 = [ - "www.example.org/bar", - u"© mwhahaha çà éü", - u"Thŧ ıs teĦ ünâçÃÃe¡", - [] - ] - - _ds2b = [ - "www.example.org/bar", # same unique ID as _ds2 - u"This string has been translated into the ASCII language", - u"Now the unicode is gone :(", - [ - Event.new_for_values(subject_manifestation = "nah"), - ], - ] - - def __init__(self, methodName): - super(ZeitgeistRemoteDataSourceRegistryTest, self).__init__(methodName) - - def _assertDataSourceEquals(self, dsdbus, dsref): - self.assertEquals(dsdbus[DataSource.UniqueId], dsref[0]) - self.assertEquals(dsdbus[DataSource.Name], dsref[1]) - self.assertEquals(dsdbus[DataSource.Description], dsref[2]) - self.assertEquals(len(dsdbus[DataSource.EventTemplates]), len(dsref[3])) - #for i, template in enumerate(dsref[3]): - # tmpl = dsdbus[DataSource.EventTemplates][i] - # self.assertEquals(ZgEvent.get_plain(tmpl), ZgEvent.get_plain(template)) - - def testPresence(self): - """ Ensure that the DataSourceRegistry extension is there """ - iface = self.client._iface # we know that client._iface is as clean as possible - registry = iface.get_extension("DataSourceRegistry", "data_source_registry") - registry.GetDataSources() - - def testGetDataSourcesEmpty(self): - self.assertEquals(self.client._registry.GetDataSources(), []) - - def testRegisterDataSource(self): - self.client.register_data_source(*self._ds1) - datasources = list(self.client._registry.GetDataSources()) - self.assertEquals(len(datasources), 1) - self._assertDataSourceEquals(datasources[0], self._ds1) - - def testRegisterDataSourceUnicode(self): - self.client.register_data_source(*self._ds2) - datasources = list(self.client._registry.GetDataSources()) - self.assertEquals(len(datasources), 1) - self._assertDataSourceEquals(datasources[0], self._ds2) - - def testRegisterDataSourceWithCallback(self): - self.client.register_data_source(*self._ds1, enabled_callback=lambda x: True) - - def testRegisterDataSources(self): - # Insert two data-sources - self.client._registry.RegisterDataSource(*self._ds1) - self.client._registry.RegisterDataSource(*self._ds2) - - # Verify that they have been inserted correctly - datasources = list(self.client._registry.GetDataSources()) - self.assertEquals(len(datasources), 2) - datasources.sort(key=lambda x: x[DataSource.UniqueId]) - self._assertDataSourceEquals(datasources[0], self._ds1) - self._assertDataSourceEquals(datasources[1], self._ds2) - - # Change the information of the second data-source - self.client._registry.RegisterDataSource(*self._ds2b) - - # Verify that it changed correctly - datasources = list(self.client._registry.GetDataSources()) - self.assertEquals(len(datasources), 2) - datasources.sort(key=lambda x: x[DataSource.UniqueId]) - self._assertDataSourceEquals(datasources[0], self._ds1) - self._assertDataSourceEquals(datasources[1], self._ds2b) - - def testSetDataSourceEnabled(self): - # Insert a data-source -- it should be enabled by default - self.client._registry.RegisterDataSource(*self._ds1) - ds = list(self.client._registry.GetDataSources())[0] - self.assertEquals(ds[DataSource.Enabled], True) - - # Now we can choose to disable it... - self.client._registry.SetDataSourceEnabled(self._ds1[0], False) - ds = list(self.client._registry.GetDataSources())[0] - self.assertEquals(ds[DataSource.Enabled], False) - - # And enable it again! - self.client._registry.SetDataSourceEnabled(self._ds1[0], True) - ds = list(self.client._registry.GetDataSources())[0] - self.assertEquals(ds[DataSource.Enabled], True) - - def testGetDataSourceFromId(self): - # Insert a data-source -- and then retrieve it by id - self.client._registry.RegisterDataSource(*self._ds1) - ds = self.client._registry.GetDataSourceFromId(self._ds1[0]) - self._assertDataSourceEquals(ds, self._ds1) - - # Retrieve a data-source from an id that has not been registered - self.assertRaises(DBusException, - self.client._registry.GetDataSourceFromId, - self._ds2[0]) - - def testDataSourceSignals(self): - mainloop = self.create_mainloop() - - global hit - hit = 0 - - def cb_registered(datasource): - global hit - self.assertEquals(hit, 0) - hit = 1 - - def cb_enabled(unique_id, enabled): - global hit - if hit == 1: - self.assertEquals(enabled, False) - hit = 2 - elif hit == 2: - self.assertEquals(enabled, True) - hit = 3 - # We're done -- change this if we figure out how to force a - # disconnection from the bus, so we can also check the - # DataSourceDisconnected signal. - mainloop.quit() - else: - self.fail("Unexpected number of signals: %d." % hit) - - #def cb_disconnect(datasource): - # self.assertEquals(hit, 3) - # mainloop.quit() - - # Connect to signals - self.client._registry.connect('DataSourceRegistered', cb_registered) - self.client._registry.connect('DataSourceEnabled', cb_enabled) - #self.client._registry.connect('DataSourceDisconnected', cb_disconnect) - - # Register data-source, disable it, enable it again - gobject.idle_add(self.testSetDataSourceEnabled) - - mainloop.run() - - def testRegisterDataSourceEnabledCallbackOnRegister(self): - mainloop = self.create_mainloop() - - def callback(enabled): - mainloop.quit() - self.client.register_data_source(*self._ds1, enabled_callback=callback) - - mainloop.run() - - def testRegisterDataSourceEnabledCallbackOnChange(self): - mainloop = self.create_mainloop() - global hit - hit = 0 - - # Register a callback - def callback(enabled): - global hit - if hit == 0: - # Register callback - hit = 1 - elif hit == 1: - # Disable callback - mainloop.quit() - else: - self.fail("Unexpected number of signals: %d." % hit) - self.client.register_data_source(*self._ds1) - self.client.set_data_source_enabled_callback(self._ds1[0], callback) - - # Disable the data-source - self.client._registry.SetDataSourceEnabled(self._ds1[0], False) - - mainloop.run() - -class ZeitgeistRemotePropertiesTest(testutils.RemoteTestCase): - - def __init__(self, methodName): - super(ZeitgeistRemotePropertiesTest, self).__init__(methodName) - - def testVersion(self): - self.assertTrue(len(self.client.get_version()) >= 2) - - def testExtensions(self): - self.assertEquals( - sorted(self.client.get_extensions()), - ["Blacklist", "DataSourceRegistry"] - ) - self.assertEquals( - sorted(self.client._iface.extensions()), - ["Blacklist", "DataSourceRegistry"] - ) - - -class ZeitgeistDaemonTest(unittest.TestCase): - - def setUp(self): - self.env = os.environ.copy() - self.datapath = tempfile.mkdtemp(prefix="zeitgeist.datapath.") - self.env.update({ - "ZEITGEIST_DATABASE_PATH": ":memory:", - "ZEITGEIST_DATA_PATH": self.datapath, - }) - - def tearDown(self): - shutil.rmtree(self.datapath) - - def testSIGHUP(self): - """sending a SIGHUP signal to a running deamon instance results - in a clean shutdown""" - daemon = testutils.RemoteTestCase._safe_start_daemon(env=self.env) - os.kill(daemon.pid, signal.SIGHUP) - err = daemon.wait() - self.assertEqual(err, 0) - - -if __name__ == "__main__": - unittest.main() === modified file 'test/dbus/testutils.py' --- test/dbus/testutils.py 2011-09-05 13:09:08 +0000 +++ test/dbus/testutils.py 2011-09-07 20:48:27 +0000 @@ -85,6 +85,22 @@ events = parse_events(path) return engine.insertEventsAndWait(events) +def asyncTestMethod(mainloop): + """ + Any callbacks happening in a MainLoopWithFailure must use + this decorator for exceptions raised inside them to be visible. + """ + def wrap(f): + def new_f(*args, **kwargs): + try: + f(*args, **kwargs) + except AssertionError, e: + mainloop.fail("Assertion failed: %s" % e) + except Exception, e: + mainloop.fail("Unexpected exception: %s" % e) + return new_f + return wrap + class RemoteTestCase (unittest.TestCase): """ Helper class to implement unit tests against a @@ -295,6 +311,9 @@ def create_mainloop(timeout=5): class MainLoopWithFailure(object): + """ + Remember to wrap callbacks using the asyncTestMethod decorator. + """ def __init__(self): self._mainloop = gobject.MainLoop()
_______________________________________________ Mailing list: https://launchpad.net/~zeitgeist Post to : zeitgeist@lists.launchpad.net Unsubscribe : https://launchpad.net/~zeitgeist More help : https://help.launchpad.net/ListHelp