Mark Michelson has submitted this change and it was merged. Change subject: stasis: set a channel variable on websocket disconnect error ......................................................................
stasis: set a channel variable on websocket disconnect error This test is to ensure Asterisk applies the correct state to the channel variable, STASISSTATUS. STASISSTATUS was introduced as a means for Stasis applications to have context when errors occur in Stasis that disrupt normal processing. The test scenarios: 1. The 'Babs' scenario: a. A channel is originated through ARI referencing a subscribed app (Babs) that was registered in Stasis during startup. b. After Stasis is started, the channel is then hungup. c. A check is made to ensure that the value of STASISSTATUS is SUCCESS. 2. The 'Bugs' scenario: a. A channel is originated through ARI referencing a subscribed app (BugsAlt) that was never registered in Stasis. b. A check is then made to ensure that the value of STASISSTATUS is FAILED. 3. The 'Buster' scenario: a. A channel is originated through ARI referencing a subscribed app (Buster) that was registered in Stasis during startup. b. While the channel from step 'a' is still active, the websocket is then disconnected out from underneath ARI. c. A new channel is originated through ARI, also referencing the subscribed app (Buster) that was registered in Stasis during startup. d. A check is then made to ensure that the value of STASISSTATUS is FAILED. ASTERISK-24802 Reported By: Kevin Harwell Change-Id: I0f7dadfd429bd30e9f07a531f47884d8c923fc13 --- A tests/rest_api/applications/stasisstatus/__init__.py A tests/rest_api/applications/stasisstatus/ari_client.py A tests/rest_api/applications/stasisstatus/configs/ast1/extensions.conf A tests/rest_api/applications/stasisstatus/monitor.py A tests/rest_api/applications/stasisstatus/observable_object.py A tests/rest_api/applications/stasisstatus/run-test A tests/rest_api/applications/stasisstatus/test-config.yaml A tests/rest_api/applications/stasisstatus/test_case.py A tests/rest_api/applications/stasisstatus/test_scenario.py M tests/rest_api/applications/tests.yaml 10 files changed, 1,447 insertions(+), 0 deletions(-) Approvals: Mark Michelson: Looks good to me, approved; Verified Matt Jordan: Looks good to me, but someone else must approve diff --git a/tests/rest_api/applications/stasisstatus/__init__.py b/tests/rest_api/applications/stasisstatus/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/tests/rest_api/applications/stasisstatus/__init__.py diff --git a/tests/rest_api/applications/stasisstatus/ari_client.py b/tests/rest_api/applications/stasisstatus/ari_client.py new file mode 100644 index 0000000..e7e2eb3 --- /dev/null +++ b/tests/rest_api/applications/stasisstatus/ari_client.py @@ -0,0 +1,404 @@ +#!/usr/bin/env python +""" +Copyright (C) 2015, Digium, Inc. +Ashley Sanders <asand...@digium.com> + +This program is free software, distributed under the terms of +the GNU General Public License Version 2. +""" + +import sys +import logging +import uuid + +sys.path.append("lib/python") +sys.path.append("tests/rest_api/applications") + +from asterisk.ari import ARI, AriClientFactory +from stasisstatus.observable_object import ObservableObject +from twisted.internet import defer, reactor + +LOGGER = logging.getLogger(__name__) + + +class AriClient(ObservableObject): + """The ARI client. + + This class serves as a facade for ARI and AriClientFactory. It is + responsible for creating and persisting the connection state needed to + execute a test scenario. + """ + + def __init__(self, host, port, credentials, name='testsuite'): + """Constructor. + + Keyword Arguments: + host -- The [bindaddr] of the Asterisk HTTP web + server. + port -- The [bindport] of the Asterisk HTTP web + server. + credentials -- User credentials for ARI. A tuple. + E.g.: ('username', 'password'). + name -- The name of the app to register in Stasis via + ARI (optional) (default 'testsuite'). + """ + + + super(AriClient, self).__init__(name, ['on_channelcreated', + 'on_channeldestroyed', + 'on_channelvarset', + 'on_client_start', + 'on_client_stop', + 'on_stasisend', + 'on_stasisstart', + 'on_ws_open', + 'on_ws_closed']) + self.__ari = None + self.__factory = None + self.__ws_client = None + self.__channels = [] + self.__host = host + self.__port = port + self.__credentials = credentials + + def connect_websocket(self): + """Creates an AriClientFactory instance and connects to it.""" + + def wait_for_it(deferred=None): + """Waits for the client to reset before connecting the web socket. + + Keyword Arguments: + deferred -- The twisted.defer instance to use for + chaining callbacks (optional) + (default None). + """ + + msg = '{0} '.format(self) + + if not deferred: + deferred = defer.Deferred() + if not self.clean: + LOGGER.debug(msg + 'I\'m not so fresh so clean.') + reactor.callLater(1, wait_for_it, deferred) + else: + LOGGER.debug(msg + 'Connecting web socket.') + self.__ari = ARI(self.__host, userpass=self.__credentials) + self.__factory = AriClientFactory(receiver=self, + host=self.__host, + port=self.__port, + apps=self.name, + userpass=self.__credentials) + deferred.callback(self.__factory.connect()) + + self.__reset() + wait_for_it() + return + + def __delete_all_channels(self): + """Deletes all the channels.""" + + if len(self.__channels) == 0: + return + + if self.__ari is not None: + allow_errors = self.__ari.allow_errors + self.__ari.set_allow_errors(True) + channels = list().extend(self.__channels) + for channel in channels: + self.hangup_channel(channel) + self.__ari.set_allow_errors(allow_errors) + else: + del self.__channels[:] + return + + def disconnect_websocket(self): + """Disconnects the web socket.""" + + msg = '{0} '.format(self) + + if self.__ws_client is None: + info = 'Cannot disconnect; no web socket is connected.' + LOGGER.debug(msg + info) + return self + + if self.__ari is not None: + warning = 'Disconnecting web socket with an active ARI connection.' + LOGGER.warn(msg + warning) + + LOGGER.debug(msg + 'Disconnecting the web socket.') + self.__ws_client.transport.loseConnection() + return self + + def hangup_channel(self, channel_id): + """Deletes a channel. + + Keyword Arguments: + channel_id -- The id of the channel to delete. + + Returns: + The JSON response object from the DELETE to ARI. + + Raises: + ValueError + """ + + msg = '{0} '.format(self) + + if self.__ari is None: + msg += 'Cannot hangup channel; ARI instance has no value.' + raise ValueError(msg.format(self)) + + LOGGER.debug(msg + 'Deleting channel [{0}].'.format(channel_id)) + + try: + self.__channels.remove(channel_id) + except ValueError: + pass + + return self.__ari.delete('channels', channel_id) + + def on_channelcreated(self, message): + """Callback for the ARI 'ChannelCreated' event. + + Keyword Arguments: + message -- the JSON message + """ + + channel = message['channel']['id'] + if channel not in self.__channels: + self.__channels.append(channel) + + self.notify_observers('on_channelcreated', message) + + def on_channeldestroyed(self, message): + """Callback for the ARI 'ChannelDestroyed' event. + + Keyword Arguments: + message -- the JSON message + """ + + channel = message['channel']['id'] + try: + self.__channels.remove(channel) + except ValueError: + pass + + self.notify_observers('on_channeldestroyed', message) + + def on_channelvarset(self, message): + """Callback for the ARI 'ChannelVarset' event. + + Keyword Arguments: + message -- the JSON message + """ + + self.notify_observers('on_channelvarset', message) + + def on_client_start(self): + """Notifies the observers of the 'on_client_start' event.""" + + LOGGER.debug('{0} Client is started.'.format(self)) + self.notify_observers('on_client_start', None, True) + + def on_client_stop(self): + """Notifies the observers of the 'on_client_stop' event.""" + + LOGGER.debug('{0} Client is stopped.'.format(self)) + self.notify_observers('on_client_stop', None, True) + + def on_stasisend(self, message): + """Callback for the ARI 'StasisEnd' event + + Keyword Arguments: + message -- the JSON message + """ + + self.notify_observers('on_stasisend', message) + + def on_stasisstart(self, message): + """Callback for the ARI 'StasisEnd' event + + Keyword Arguments: + message -- the JSON message + """ + + self.notify_observers('on_stasisstart', message) + + def on_ws_closed(self, ws_client): + """Callback for AriClientProtocol 'onClose' handler. + + Keyword Arguments: + ws_client -- The AriClientProtocol object that raised + the event. + """ + + LOGGER.debug('{0} WebSocket connection closed.'.format(self)) + self.__ws_client = None + self.notify_observers('on_ws_closed', None) + + def on_ws_event(self, message): + """Callback for AriClientProtocol 'onMessage' handler. + + Keyword Arguments: + message -- The event payload. + """ + + LOGGER.debug("{0} In on_ws_event; message={1}".format(self, message)) + + event = 'on_{0}'.format(message.get('type').lower()) + + if event == 'on_ws_open' or event == 'on_ws_closed': + return + + callback = getattr(self, event, None) + if callback and callable(callback): + callback(message) + self.notify_observers(event, message) + + def on_ws_open(self, ws_client): + """Callback for AriClientProtocol 'onOpen' handler. + + Keyword Arguments: + ws_client -- The AriClientProtocol object that raised + the event. + """ + + LOGGER.debug('{0} WebSocket connection opened.'.format(self)) + self.__ws_client = ws_client + self.notify_observers('on_ws_open', None) + self.on_client_start() + + def originate(self, endpoint, app=None): + """Originates a channel. + + Keyword Arguments: + endpoint -- The endpoint to use for the ARI request. + app -- The name of the Stasis app (optional) + (default None). + + Returns: + The JSON response object from the POST to ARI. + + Raises: + ValueError + """ + + msg = '{0} '.format(self) + + if self.__ari is None: + msg += 'Cannot originate channel; ARI instance has no value.' + raise ValueError(msg) + + channel = dict() + if app is not None: + channel['app'] = app + channel['channelId'] = str(uuid.uuid4()) + channel['endpoint'] = endpoint + + msg += 'Originating channel [{0}].' + LOGGER.debug(msg.format(channel['channelId'])) + return self.__ari.post('channels', **channel) + + def __reset(self): + """Resets the AriClient to its initial state. + + Returns: + A twisted.defer instance. + """ + + if not self.clean: + LOGGER.debug('{0} About to reset my state!'.format(self)) + self.__tear_down() + return + + def start(self): + """Starts the client.""" + + LOGGER.debug('{0} Starting client connections.'.format(self)) + self.connect_websocket() + + def stop(self): + """Stops the client.""" + + LOGGER.debug('{0} Stopping client connections.'.format(self)) + self.suspend() + self.__reset() + + def __tear_down(self): + """Tears down the channels and web socket.""" + + def wait_for_it(deferred=None, run=0): + """Disposes each piece, one at a time. + + + The first run (run=0) initialized the deferred and kicks of + the process to destroy all of our channels. + + The second run (run=1) waits for all the channels to be + destroyed then kicks off the process to disconnect the web socket. + + The third run (run=2) waits for the web socket to + disconnect then cleans up the remaining state variables. + + Keyword Arguments: + deferred -- The twisted.defer instance to use for + chaining callbacks (optional) + (default None). + run -- The current phase of tear down: + 0=Entry phase + 1=Waiting for ARI to destroy all channels + 2=Calls ARI to Disconnects the web socket + 3=Waiting for ARI to disconnect the web + socket + """ + + msg = '{0} '.format(self) + + if not deferred: + deferred = defer.Deferred() + self.suspend() + if run == 0: + LOGGER.debug(msg + 'Tearing down active connections.') + self.__delete_all_channels() + reactor.callLater(2, wait_for_it, deferred, 1) + elif run == 1: + if len(self.__channels) > 0: + msg += 'Waiting for channels to be destroyed.' + LOGGER.debug(msg) + reactor.callLater(2, wait_for_it, deferred, 1) + reactor.callLater(2, wait_for_it, deferred, 2) + elif run == 2: + LOGGER.debug(msg + 'Disconnecting web socket.') + self.__ari = None + self.__factory = None + self.disconnect_websocket() + reactor.callLater(2, wait_for_it, deferred, 3) + elif run == 3: + if self.__ws_client is not None: + msg += 'Waiting for web socket to be destroyed.' + LOGGER.debug(msg) + reactor.callLater(2, wait_for_it, deferred, 3) + else: + LOGGER.debug(msg + 'Client successfully torn down.') + reactor.callLater(0, self.on_client_stop) + reactor.callLater(2, self.reset_registrar) + deferred.callback(self.resume()) + wait_for_it() + return + + @property + def clean(self): + """Returns True if the client has no orphaned connections + needing to be torn down. False otherwise.""" + + if len(self.__channels) == 0: + LOGGER.debug('{0} No channels!'.format(self)) + if self.__ws_client is None: + LOGGER.debug('{0} No ws_client!'.format(self)) + if self.__ari is None: + LOGGER.debug('{0} No ari!'.format(self)) + if self.__factory is None: + LOGGER.debug('{0} No factory!'.format(self)) + LOGGER.debug('{0} I\'m clean!'.format(self)) + return True + return False diff --git a/tests/rest_api/applications/stasisstatus/configs/ast1/extensions.conf b/tests/rest_api/applications/stasisstatus/configs/ast1/extensions.conf new file mode 100644 index 0000000..0bc27ab --- /dev/null +++ b/tests/rest_api/applications/stasisstatus/configs/ast1/extensions.conf @@ -0,0 +1,28 @@ +[globals] + +[Acme] + +exten => _[Bb]abs,1,NoOp() + same => n,GoSub(subComicDispenser,1(${EXTEN})) + same => n,Hangup() + +exten => _[Bb]ugs,1,NoOp() + same => n,GoSub(subComicDispenser,1(${EXTEN})) + same => n,Hangup() + +exten => _[Bb]ugs[Aa]lt,1,NoOp() + same => n,GoSub(subComicDispenser,1(${EXTEN})) + same => n,Hangup() + +exten => _[Bb]uster,1,NoOp() + same => n,GoSub(subComicDispenser,1(${EXTEN})) + same => n,Hangup() + +exten => _[Bb]uster[Aa]lt,1,NoOp() + same => n,GoSub(subComicDispenser,1(Buster)) + same => n,Hangup() + +exten => subComicDispenser,1,NoOp() + same => n,Answer() + same => n,Stasis(${ARG1}) + same => n,Return() \ No newline at end of file diff --git a/tests/rest_api/applications/stasisstatus/monitor.py b/tests/rest_api/applications/stasisstatus/monitor.py new file mode 100644 index 0000000..4504ac3 --- /dev/null +++ b/tests/rest_api/applications/stasisstatus/monitor.py @@ -0,0 +1,123 @@ +#!/usr/bin/env python +""" +Copyright (C) 2015, Digium, Inc. +Ashley Sanders <asand...@digium.com> + +This program is free software, distributed under the terms of +the GNU General Public License Version 2. +""" + +import sys +import logging + +sys.path.append("lib/python") +sys.path.append("tests/rest_api/applications") + +from stasisstatus.observable_object import ObservableObject + +LOGGER = logging.getLogger(__name__) + + +class ChannelVariableMonitor(ObservableObject): + """Monitors the system for state changes for a given channel variable.""" + + def __init__(self, ami, variable, name): + """Constructor. + + Keyword Arguments: + ami -- The AMI instance to monitor. + variable -- The name of the channel variable to monitor + (optional) (default None). + name -- The name of this ChannelVariableMonitor + instance. + """ + + super(ChannelVariableMonitor, self).__init__(name, + ['on_value_changed']) + self.__ami = ami + self.__captured_value = None + self.__channel_variable = variable + self.__monitored_channel = None + + self.__ami.registerEvent('VarSet', self.__on_ami_varset) + self.__ami.registerEvent('UserEvent', self.__on_ami_user_event) + + def __log_event(self, handler, event_data): + """Logs event messages. + + Keyword Arguments: + handler -- The name of the event handler. + event_data -- The event payload or message. + """ + + LOGGER.debug('{0} In {1}; event data={2}'.format(self, + handler, + event_data)) + + def __on_ami_user_event(self, ami, message): + """Handles the AMI 'UserEvent' event. + + Keyword Arguments: + ami -- The AMI instance. + message -- The event payload. + """ + + if message['uniqueid'] != self.__monitored_channel: + return + + if message['userevent'] != 'StasisStatus': + return + + self.captured_value = message['value'] + + def __on_ami_varset(self, ami, message): + """Handles the AMI 'VarSet' event. + + Keyword Arguments: + ami -- The AMI instance. + message -- The event payload. + """ + + self.__log_event('__on_ami_varset', message) + + msg = '{0} '.format(self) + + if self.suspended: + LOGGER.debug(msg + 'Monitoring is suspended.') + return + + if message['uniqueid'] != self.__monitored_channel: + return + if message['variable'] != self.__channel_variable: + return + + self.captured_value = message['value'] + + def start(self, channel): + """Tells the monitor to start monitoring for the given channel. + + Keyword Arguments: + channel -- The id of the channel to use for monitoring. + """ + + LOGGER.debug('{0} Monitoring starting for channel[{1}]'.format(self, + channel)) + self.__monitored_channel = channel + self.activate() + + @property + def captured_value(self): + """The current value captured for the monitored channel variable.""" + + return self.__captured_value + + @captured_value.setter + def captured_value(self, value): + """Sets the captured value.""" + + self.__captured_value = value + LOGGER.debug('{0} {1}={2}.'.format(self, + self.__channel_variable, + self.__captured_value)) + self.notify_observers('on_value_changed', None, False) + diff --git a/tests/rest_api/applications/stasisstatus/observable_object.py b/tests/rest_api/applications/stasisstatus/observable_object.py new file mode 100644 index 0000000..20f62f0 --- /dev/null +++ b/tests/rest_api/applications/stasisstatus/observable_object.py @@ -0,0 +1,176 @@ +#!/usr/bin/env python +""" +Copyright (C) 2015, Digium, Inc. +Ashley Sanders <asand...@digium.com> + +This program is free software, distributed under the terms of +the GNU General Public License Version 2. +""" + +import sys +import logging + +sys.path.append("lib/python") +sys.path.append("tests/rest_api/applications") + +LOGGER = logging.getLogger(__name__) + +class ObservableObject(object): + """Definition for an observable object.""" + + def __init__(self, name, events): + """Constructor. + + Keyword Arguments: + name -- The name of this ObservableObject. + events -- The events available for observing. + """ + + self.__name = name + self.__registrar = dict() + self.__suspended = 0 + + for event in events: + self.__registrar[event] = list() + + def __format__(self, format_spec): + """Overrides default format handling for 'self'.""" + + return self.__class__.__name__ + '[' + self.name + ']:' + + def activate(self): + """Activates the object notification system.""" + + self.__suspended = 0 + + def notify_observers(self, event, message, notify_on_suspend=False): + """Starts the chain of invocations for the callbacks. + + Keyword Arguments: + event -- The name of the event being raised. + message -- The event payload. + notify_on_suspend -- Whether or not to override suspended + notification logic (optional) (default False). + + Raises: + ValueError + """ + + msg = '{0} '.format(self) + + if self.suspended and not notify_on_suspend: + LOGGER.debug(msg + " Suspended; cannot notify observers.") + return + + if not self.__validate(event): + error = msg + 'Could not notify observers; Validation failed.' + raise ValueError(error) + + for callback in self.__registrar[event]: + LOGGER.debug(msg + 'Invoking {0}'.format(callback)) + callback(self, message) + return + + def reset_registrar(self): + """Resets the registrar to its initial, empty state. + + Note: This will reset the entire observer registrar. + """ + + msg = '{0} '.format(self) + + LOGGER.debug(msg + 'Resetting the observer registrar') + for event in self.__registrar: + del self.__registrar[event][:] + LOGGER.debug(msg + 'Reset the observer registrar.') + return + + def register_observers(self, event, observers): + """Registers an observer with the list of observers. + + Keyword Arguments: + event -- The event to observe. + observers -- A list of callable observers or a single + callable observer. + + Raises: + TypeError + ValueError + """ + + msg = '{0} '.format(self) + error = msg + 'Could not register observers' + + if observers is None: + error += ' for event [{0}]; [Observers] is None.'.format(event) + return + + if not self.__validate(event): + error += '; Validation failed.' + raise ValueError(error) + + cache = list() + if callable(observers): + cache.append(observers) + elif isinstance(observers, list): + cache.extend(observers) + else: + msg += 'Cannot register observer {0} with registrar; [{1}] \ + is an unsupported type.' + raise TypeError(msg.format(observers, + observers.__class__.__name__)) + + if self.__registrar[event] is None: + msg += 'Instantiating the observers for event {0}.'.format(event) + LOGGER.debug(msg) + self.__registrar[event] = list() + self.__registrar[event].extend(cache) + return + + def resume(self): + """Resumes monitoring.""" + + self.__suspended = max(0, self.__suspended - 1) + + def suspend(self): + """Suspends monitoring.""" + + self.__suspended += 1 + + def __validate(self, event): + """Validates the parameters for value. + + Validates that a given event is registered. + + event -- The event to validate. + + Returns: + True if the event is registered, False otherwise. + """ + + valid = None + error = '{0} Cannot continue; '.format(self) + + if not event: + valid = False + reason = 'No value provided for [%r]' % event + LOGGER.warn(error + reason) + elif event not in self.__registrar: + valid = False + reason = 'Registrar does not contain an entry for the \ + event [{1}]'.format(event) + LOGGER.warn(error + reason) + + return valid if valid is not None else True + + @property + def name(self): + """The friendly name for this instance.""" + + return self.__name + + @property + def suspended(self): + """Flag indicating that the scenario is being torn down.""" + + return self.__suspended > 0 diff --git a/tests/rest_api/applications/stasisstatus/run-test b/tests/rest_api/applications/stasisstatus/run-test new file mode 100644 index 0000000..2d981a2 --- /dev/null +++ b/tests/rest_api/applications/stasisstatus/run-test @@ -0,0 +1,246 @@ +#!/usr/bin/env python +""" +Copyright (C) 2015, Digium, Inc. +Ashley Sanders <asand...@digium.com> + +This program is free software, distributed under the terms of +the GNU General Public License Version 2. +""" + +import sys +import logging + +sys.path.append("lib/python") +sys.path.append("tests/rest_api/applications") + +from stasisstatus.ari_client import AriClient +from stasisstatus.monitor import ChannelVariableMonitor +from stasisstatus.test_case import StasisStatusTestCase +from stasisstatus.test_scenario import TestScenario + +LOGGER = logging.getLogger(__name__) + + +def build_scenarios(ami, host, port, credentials): + """Builds the scenarios. + + Keyword Arguments: + ami -- The AMI instance for this factory. + host -- The [bindaddr] of the Asterisk HTTP web + server. + port -- The [bindport] of the Asterisk HTTP web + server. + credentials -- User credentials for ARI. + A tuple. E.g. ('username', 'password'). + + Returns: + A list of TestScenario objects. + """ + + scenarios = list() + + for name in ['Babs', 'Bugs', 'Buster']: + client = AriClient(host, + port, + credentials, + name) + monitor = ChannelVariableMonitor(ami, + 'STASISSTATUS', + name) + scenario = globals()[name + "TestScenario"](ami, client, monitor, name) + scenarios.append(scenario) + return scenarios + + +class BabsTestScenario(TestScenario): + """The 'Babs' TestScenario. + + This scenario tests for the case where a call is originated under + normal operating conditions and then hungup to determine if Stasis + correctly assigns STASISSTATUS=SUCCESS. + """ + + def __init__(self, ami, ari_client, monitor, name): + """Constructor. + + Keyword Arguments: + ami -- The AMI instance for this TestScenario. + ari_client -- The AriClient to use to for executing the + TestStrategy commands. + monitor -- The ChannelVariableMonitor instance for this + TestScenario. + name -- The name for this TestScenario instance. + """ + + super(BabsTestScenario, self).__init__(ami, + ari_client, + monitor, + 'SUCCESS', + name) + + self.__channel = None + self.__stasis_started = False + self.ari_client.register_observers('on_stasisstart', + self.__on_stasisstart) + + def __on_stasisstart(self, sender, message): + """Handles the AriClient 'on_stasisstart' event. + + Keyword Arguments: + sender -- The object that raised the event. + message -- The event payload. + """ + + if self.__stasis_started: + return + + channel = message['channel']['id'] + if channel == self.__channel: + self.__stasis_started = True + self.monitor.start(self.__channel) + self.ari_client.hangup_channel(self.__channel) + + def run_strategy(self): + """Implements the run_strategy from the base class.""" + + msg = '{0} '.format(self) + LOGGER.debug(msg + 'About to originate a channel with an app that has \ + been registered in Stasis.') + app = self.name + endpoint = 'LOCAL/{0}@Acme'.format(app) + resp = self.ari_client.originate(endpoint, app) + self.__channel = resp.json()['id'] + LOGGER.debug(msg + 'Response was [%r].' % resp) + + +class BugsTestScenario(TestScenario): + """The 'Bugs' TestScenario. + + This scenario tests for the case where a call is originated for an + app that was never registered in Stasis to determine if Stasis correctly + identifies this as a failure and assigns STASISSTATUS=FAILED. + """ + + def __init__(self, ami, ari_client, monitor, name): + """Constructor. + + Keyword Arguments: + ami -- The AMI instance for this TestScenario. + ari_client -- The AriClient to use to for executing the + TestStrategy commands. + monitor -- The ChannelVariableMonitor instance for this + TestScenario. + name -- The name for this TestScenario instance. + """ + + super(BugsTestScenario, self).__init__(ami, + ari_client, + monitor, + 'FAILED', + name) + + def run_strategy(self): + """Implements the run_strategy from the base class.""" + + msg = '{0} '.format(self) + LOGGER.debug('About to originate a channel with an app that was \ + never registered in Stasis.') + app = self.name + 'alt' + endpoint = 'LOCAL/{0}@Acme'.format(app) + resp = self.ari_client.originate(endpoint, app) + self.monitor.start(resp.json()['id']) + LOGGER.debug(msg + 'Response was [%r].' % resp) + + +class BusterTestScenario(TestScenario): + """The 'Buster' TestScenario. + + This scenario tests for the case where a Stasis app that was + registered when channel A was originated, but is no longer registered + when channel B is originated, to determines if Stasis correctly identifies + this as a failure and assigns STASISSTATUS=FAILED. + """ + + def __init__(self, ami, ari_client, monitor, name): + """Constructor. + + Keyword Arguments: + ami -- The AMI instance for this TestScenario. + ari_client -- The AriClient to use to for executing the + TestStrategy commands. + monitor -- The ChannelVariableMonitor instance for this + TestScenario. + name -- The name for this TestScenario instance. + """ + + super(BusterTestScenario, self).__init__(ami, + ari_client, + monitor, + 'FAILED', + name) + + self.__ws_closed = False + self.ari_client.register_observers('on_ws_closed', + self.__on_ws_closed) + + def __on_ws_closed(self, sender, message): + """Handles the AriClient 'on_ws_closed' event. + + Keyword Arguments: + sender -- The object that raised the event. + message -- The event payload. + """ + + msg = '{0} '.format(self) + + if self.suspended: + LOGGER.debug(msg + 'Scenario is suspended.') + return + + if self.__ws_closed: + LOGGER.warn(msg + 'About to run duplicate scenario step.') + + self.__ws_closed = True + + msg = '{0} '.format(self) + LOGGER.debug(msg + 'In {0}; message={1}'.format('__on_ws_closed', + message)) + LOGGER.debug(msg + 'About to originate a channel after the web socket \ + has been disconnected.') + app = self.name + endpoint = 'LOCAL/{0}@Acme'.format(app) + resp = self.ari_client.originate(endpoint, app) + self.monitor.start(resp.json()['id']) + LOGGER.debug(msg + 'Response was [%r].' % resp) + + def run_strategy(self): + """Implements the run_strategy from the base class.""" + + msg = '{0} '.format(self) + LOGGER.debug(msg + 'About to originate a channel with an app that has \ + been registered in Stasis.') + + app = self.name + 'alt' + endpoint = 'LOCAL/{0}@Acme'.format(app) + resp = self. ari_client.originate(endpoint, app) + LOGGER.debug(msg + 'Response was [%r].' % resp) + LOGGER.debug(msg + 'About to disconnect the web socket.') + self.ari_client.disconnect_websocket() + + +def main(): + """Entry point for the test. + + Returns: + 0 if the test passed, 1 otherwise. + """ + + test = StasisStatusTestCase(build_scenarios) + + if test.passed: + return 0 + return 1 + +if __name__ == "__main__": + sys.exit(main()) + diff --git a/tests/rest_api/applications/stasisstatus/test-config.yaml b/tests/rest_api/applications/stasisstatus/test-config.yaml new file mode 100644 index 0000000..1574959 --- /dev/null +++ b/tests/rest_api/applications/stasisstatus/test-config.yaml @@ -0,0 +1,32 @@ +testinfo: + summary: | + Tests that Stasis correctly applies the STASISSTATUS channel + variable. + description: | + The test exercises Stasis in three ways to determine if is + functioning as advertised. + - For the first case (Babs), a channel is originated under normal + conditions and then the channel is hungup. For this case, the + test verifies that Stasis correctly assigns SUCCESS to STASISSTATUS. + - For the second case (Bugs), a channel is originated using an app + that was never registered with Stasis. The test verifies that Stasis + correctly assigns FAILED to STASISSTATUS. + - For the third case (Buster), a channel is made under normal + conditions, but, before the channel is hungup and while the channel is + still active, the websocket is disconnected. A channel is then + originated using the app that was just unregistered. For this case, + the test verifies that Stasis correctly assigns FAILED to STASISSTATUS. + +properties: + minversion: '13.4.0' + dependencies: + - python: 'autobahn.websocket' + - python: 'requests' + - python: 'twisted' + - python: 'starpy' + - asterisk: 'res_ari_applications' + - asterisk: 'res_ari_channels' + tags: + - ARI + issues: + - jira: 'ASTERISK-24802' diff --git a/tests/rest_api/applications/stasisstatus/test_case.py b/tests/rest_api/applications/stasisstatus/test_case.py new file mode 100644 index 0000000..1e8c1dd --- /dev/null +++ b/tests/rest_api/applications/stasisstatus/test_case.py @@ -0,0 +1,172 @@ +#!/usr/bin/env python +""" +Copyright (C) 2015, Digium, Inc. +Ashley Sanders <asand...@digium.com> + +This program is free software, distributed under the terms of +the GNU General Public License Version 2. +""" + +import sys +import logging + +sys.path.append("lib/python") +sys.path.append("tests/rest_api/applications") + +from asterisk.test_case import TestCase +from twisted.internet import reactor, defer + +LOGGER = logging.getLogger(__name__) + + +class StasisStatusTestCase(TestCase): + """The test case. + + This class serves as a harness for the test scenarios. It manages the + life-cycle of the the objects needed to execute the test plan. + """ + + def __init__(self, scenario_builder): + """Constructor. + + scenario_builder -- The builder to use for constructing + the test scenarios. + + """ + + super(StasisStatusTestCase, self).__init__() + + self.create_asterisk() + + self.__host = self.ast[0].host + self.__port = 8088 + self.__credentials = ('testsuite', 'testsuite') + + self.__scenarios = list() + self.__iterator = None + self.__builder = scenario_builder + + reactor.run() + return + + def __format__(self, format_spec): + """Overrides default format handling for 'self'.""" + + return self.__class__.__name__ + ':' + + def ami_connect(self, ami): + """Handler for the AMI connect event. + + Keyword Arguments: + ami -- The AMI instance that raised this event. + """ + + super(StasisStatusTestCase, self).ami_connect(ami) + self.__initialize_scenarios(ami) + + def __get_next_scenario(self): + """ Gets the next scenario from the list.""" + + scenario = None + try: + scenario = self.__iterator.next() + except StopIteration: + pass + return scenario + + def __initialize_scenarios(self, ami): + """Initializes the scenarios. + + Keyword Arguments: + ami -- The AMI instance for this test. + """ + + deferred = defer.Deferred() + self.__scenarios = self.__builder(ami, + self.__host, + self.__port, + self.__credentials) + self.__iterator = iter(self.__scenarios) + + for scenario in self.__scenarios: + deferred.addCallback(self.__try_run_scenario) + + deferred.callback(self.__get_next_scenario()) + + def on_reactor_timeout(self): + """Called when the reactor times out""" + + LOGGER.warn("{0} Reactor is timing out. Setting test to FAILED.") + self.set_passed(False) + + def __on_scenario_complete(self, sender, message): + """Queries the scenarios to determine if it is time to shut down + the test. + + Keyword Arguments: + sender -- The object that raised the event. + message -- The event payload. + """ + + sender.stop() + for scenario in self.__scenarios: + if not scenario.finished: + return + + LOGGER.debug('{0} Test case execution is complete.'.format(self)) + self.stop_reactor() + + def run(self): + """Executes the test case. + + Tries to set up the state needed by the test. If successful, the test + is executed and then the test state is torn down.""" + + LOGGER.debug('{0} Starting test case execution.'.format(self)) + super(StasisStatusTestCase, self).run() + + self.create_ami_factory() + + def stop_reactor(self): + """Clean up actions to perform prior to shutting down the reactor. + + Queries the scenarios for their pass/fail state to determine + overall pass/fail state for the test. Then, destroys the test state + before stopping the reactor.""" + + LOGGER.debug('{0} Stopping reactor.'.format(self)) + for scenario in self.__scenarios: + self.set_passed(scenario.passed) + if not scenario.clean: + scenario.stop() + super(StasisStatusTestCase, self).stop_reactor() + LOGGER.debug('{0} Reactor stopped.'.format(self)) + + def __try_run_scenario(self, scenario): + """Starts the stasis scenario. + + Keyword Arguments: + scenario -- The scenario to try to start. + + Returns: + If the self.__iterator has not yet finished traversing the list, + returns the next scenario in self.__scenarios. + + Otherwise,returns None. + """ + + msg = '{0} {1} scenario [{2}]' + + if scenario is not None: + LOGGER.debug((msg + '.').format(self, + 'Starting', + scenario.name)) + scenario.start(self.__on_scenario_complete) + return self.__get_next_scenario() + + msg = msg + '; {3}.' + LOGGER.warn(msg.format(self, + 'Cannot connect', + None, + 'scenario has not been assigned a value.')) + return None diff --git a/tests/rest_api/applications/stasisstatus/test_scenario.py b/tests/rest_api/applications/stasisstatus/test_scenario.py new file mode 100644 index 0000000..57e16d4 --- /dev/null +++ b/tests/rest_api/applications/stasisstatus/test_scenario.py @@ -0,0 +1,265 @@ +#!/usr/bin/env python +""" +Copyright (C) 2015, Digium, Inc. +Ashley Sanders <asand...@digium.com> + +This program is free software, distributed under the terms of +the GNU General Public License Version 2. +""" + + +import sys +import logging + +sys.path.append("lib/python") +sys.path.append("tests/rest_api/applications") + +from abc import ABCMeta, abstractmethod +from stasisstatus.observable_object import ObservableObject + +LOGGER = logging.getLogger(__name__) + + +class TestScenario(ObservableObject): + """The test scenario. + + This class is responsbile for presenting a facade around the + AriClient and TestStrategy objects. + """ + + __metaclass__ = ABCMeta + + def __init__(self, ami, ari_client, monitor, expected, name='testsuite'): + """Constructor. + + Keyword Arguments: + ami -- The AMI instance for this TestScenario. + ari_client -- The AriClient to use to for executing the + TestStrategy commands. + monitor -- The ChannelVariableMonitor instance for this + TestScenario. + expected -- The expected value for this TestScenario. + name -- The name for this TestScenario instance + (optional) (default 'testsuite'). + """ + + super(TestScenario, self).__init__(name, ['on_complete', + 'on_stop']) + self.__ami = ami + self.__ari_client = ari_client + self.__actual_value = None + self.__expected_value = expected + self.__monitor = monitor + self.__passed = None + + self.__monitor.suspend() + self.ari_client.register_observers('on_client_start', + self.on_ari_client_start) + self.ari_client.register_observers('on_client_stop', + self.on_ari_client_stop) + self.monitor.register_observers('on_value_changed', + self.on_monitor_on_value_changed) + + def compile_results(self): + """Compiles the results after executing the test strategy.""" + + if self.finished: + return + + LOGGER.debug('{0} Compiling the results.'.format(self)) + + passed = self.actual_value == self.expected_value + + LOGGER.debug('{0} Test strategy is complete.'.format(self)) + LOGGER.debug('{0} Test values: Expected [{1}]; Actual [{2}].' \ + .format(self, self.expected_value, self.actual_value)) + LOGGER.debug('{0} Test results: Test {1}.' \ + .format(self, 'Passed' if passed else 'Did Not Pass')) + + self.passed = passed + + def finish_scenario(self): + """Performs the final tasks.""" + + if self.finished: + LOGGER.debug('{0} Scenario is already finished.'.format(self)) + + self.suspend() + LOGGER.debug('{0} Finishing the scenario.'.format(self)) + self.compile_results() + + def on_ari_client_start(self, sender, message): + """Handles the AriClient on_client_start event. + + Keyword Arguments: + sender -- The object that raised the event. + message -- The event payload. + """ + + LOGGER.debug('{0} AriClient started successfully.'.format(self)) + if not self.suspended: + LOGGER.debug('{0} Running scenario.'.format(self)) + self.run_strategy() + + def on_ari_client_stop(self, sender, message): + """Handler for the AriClient on_client_stop event. + + Keyword Arguments: + sender -- The object that raised the event. + message -- The event payload. + """ + + LOGGER.debug('{0} Scenario has stopped.'.format(self)) + self.notify_observers('on_stop', None, True) + + def on_monitor_on_value_changed(self, sender, message): + """Handles the ChannelVariableMonitor 'on_value_changed' event. + + Keyword Arguments: + sender -- The object that raised the event. + message -- The event payload. + """ + + msg = '{0} '.format(self) + + if self.suspended: + LOGGER.debug(msg + 'Scenario is suspended.') + return + + self.actual_value = self.monitor.captured_value + + if self.actual_value == self.expected_value: + self.suspend() + LOGGER.debug('{0} Looks like we made it.'.format(self)) + self.finish_scenario() + + def resume(self): + """Overrides the default behavior of resetting the value of the + suspended flag.""" + + if not self.suspended: + return + + super(TestScenario, self).resume() + self.__monitor.resume() + + @abstractmethod + def run_strategy(self): + """Runs the Test Scenario.""" + + return + + def start(self, on_scenario_complete=None): + """Starts the test scenario. + + Keyword Arguments: + on_scenario_complete -- A callback (or a list of callbacks) to invoke + after the scenario completes (optional) + (default None). + """ + + LOGGER.debug('{0} Starting scenario.'.format(self)) + self.register_observers('on_complete', on_scenario_complete) + self.ari_client.start() + + def stop(self, on_scenario_stop=None): + """Stops the scenario execution and tears down its state. + + Keyword Arguments: + on_scenario_stop -- A callback (or a list of callbacks) to invoke + after the scenario stops (optional) + (default None). + """ + + if self.ari_client.suspended: + return + + LOGGER.debug('{0} Stopping the scenario.'.format(self)) + self.register_observers('on_stop', on_scenario_stop) + self.suspend() + self.ari_client.stop() + + def suspend(self): + """Overrides the default behavior of setting the value of the + suspended flag.""" + + if self.suspended: + return + + super(TestScenario, self).suspend() + self.__monitor.suspend() + + @property + def actual_value(self): + """The actual value for this TestScenario.""" + + return self.__actual_value + + @actual_value.setter + def actual_value(self, value): + """Sets the actual value for this TestScenario.""" + + self.__actual_value = value + + @property + def ami(self): + """The AMI instance for this TestScenario.""" + + return self.__ami + + @property + def ari_client(self): + """The AriClient instance for this TestScenario.""" + + return self.__ari_client + + @property + def clean(self): + """Flag indicating that this scenario has been torn down.""" + + return self.ari_client.clean + + @property + def expected_value(self): + """The expected value for this TestScenario.""" + + return self.__expected_value + + @property + def finished(self): + """Whether or not the strategy for this scenario has completed + execution. + + Returns: + True if the strategy has completed execution, False otherwise. + """ + + return self.__passed is not None + + @property + def monitor(self): + """The ChannelVariableMonitor instance.""" + + return self.__monitor + + @property + def passed(self): + """The state of the strategy. + + Returns: + None if the test strategy has not completed. Else, True if the test + strategy was successful, False otherwise. + """ + + return False if not self.finished else self.__passed + + @passed.setter + def passed(self, value): + """Safely set the passed variable for this scenario.""" + + if self.__passed is False: + return + + self.__passed = value + self.notify_observers('on_complete', None, True) + return diff --git a/tests/rest_api/applications/tests.yaml b/tests/rest_api/applications/tests.yaml index 3a9c104..5d5c4be 100644 --- a/tests/rest_api/applications/tests.yaml +++ b/tests/rest_api/applications/tests.yaml @@ -6,3 +6,4 @@ - test: 'subscribe-device-state' - test: 'double-subscribe-device-state' - dir: 'channel-subscriptions' + - test: 'stasisstatus' -- To view, visit https://gerrit.asterisk.org/18 To unsubscribe, visit https://gerrit.asterisk.org/settings Gerrit-MessageType: merged Gerrit-Change-Id: I0f7dadfd429bd30e9f07a531f47884d8c923fc13 Gerrit-PatchSet: 10 Gerrit-Project: testsuite Gerrit-Branch: master Gerrit-Owner: Ashley Sanders <asand...@digium.com> Gerrit-Reviewer: Ashley Sanders <asand...@digium.com> Gerrit-Reviewer: Mark Michelson <mmichel...@digium.com> Gerrit-Reviewer: Matt Jordan <mjor...@digium.com> -- _____________________________________________________________________ -- Bandwidth and Colocation Provided by http://www.api-digital.com -- asterisk-dev mailing list To UNSUBSCRIBE or update options visit: http://lists.digium.com/mailman/listinfo/asterisk-dev