Add implementation for the Planner global support. Also add unit tests for the execution engine.
Caveat: the global support is still single-threaded with the execution engine. I have an idea of how to make it multi-threaded, but I'd like to push this simpler prototype out first. Signed-off-by: James Ren <[email protected]> --- /dev/null 2009-12-17 12:29:38.000000000 -0800 +++ autotest/frontend/migrations/056_planner_global_support.py 2010-04-06 14:05:56.000000000 -0700 @@ -0,0 +1,19 @@ +UP_SQL = """ +CREATE TABLE planner_test_configs_skipped_hosts ( + testconfig_id INT NOT NULL, + host_id INT NOT NULL, + PRIMARY KEY (testconfig_id, host_id) +) ENGINE = InnoDB; + +ALTER TABLE planner_test_configs_skipped_hosts +ADD CONSTRAINT planner_test_configs_skipped_hosts_testconfig_ibfk +FOREIGN KEY (testconfig_id) REFERENCES planner_test_configs (id); + +ALTER TABLE planner_test_configs_skipped_hosts +ADD CONSTRAINT planner_test_configs_skipped_hosts_host_ibfk +FOREIGN KEY (host_id) REFERENCES afe_hosts (id); +""" + +DOWN_SQL = """ +DROP TABLE IF EXISTS planner_test_configs_skipped_hosts; +""" --- autotest/frontend/planner/execution_engine.py 2010-04-06 14:05:56.000000000 -0700 +++ autotest/frontend/planner/execution_engine.py 2010-04-06 14:05:56.000000000 -0700 @@ -1,7 +1,7 @@ import time, logging from autotest_lib.frontend.afe import model_attributes as afe_model_attributes from autotest_lib.frontend.shared import rest_client -from autotest_lib.frontend.planner import model_attributes +from autotest_lib.frontend.planner import model_attributes, support from autotest_lib.server import frontend @@ -133,8 +133,19 @@ finalized=False) for run in runs: logging.info('finalizing test run %s', run) - if run['status'] == Status.FAILED: - self._planner_rpc.run('modify_host', id=run['host'], + + controller = support.TestPlanController( + machine=run['host']['host'], + test_alias=run['test_job']['test_config']['alias']) + self._run_execute_after(controller, tko_test_id=run['tko_test'], + success=(run['status'] == Status.PASSED)) + + if controller._fail: + raise NotImplemented('TODO: implement forced failure') + + failed = (run['status'] == Status.FAILED or controller._fail) + if failed and not controller._unblock: + self._planner_rpc.run('modify_host', id=run['host']['id'], blocked=True) self._planner_rpc.run('modify_test_run', id=run['id'], finalized=True) @@ -162,19 +173,36 @@ return True for config in next_configs['next_configs']: + config_id = config['next_test_config_id'] + controller = support.TestPlanController( + machine=config['host'], + test_alias=config['next_test_config_alias']) + self._run_execute_before(controller) + if controller._skip: + self._planner_rpc.run('skip_test', test_config_id=config_id, + hostname=config['host']) + continue + self._run_job(hostname=config['host'], - test_config_id=config['next_test_config_id']) + test_config_id=config_id, + cleanup_before_job=controller._reboot_before, + cleanup_after_job=controller._reboot_after, + run_verify=controller._run_verify) return False - def _run_job(self, hostname, test_config_id): + def _run_job(self, hostname, test_config_id, cleanup_before_job, + cleanup_after_job, run_verify): test_config = self._planner_rpc.run('get_test_config', id=test_config_id) info = self._afe_rest.execution_info.get().execution_info info['control_file'] = test_config['control_file']['contents'] info['is_server'] = test_config['is_server'] + info['cleanup_before_job'] = cleanup_before_job + info['cleanup_after_job'] = cleanup_after_job + info['run_verify'] = run_verify atomic_group_class = self._afe_rest.labels.get( name=self._label_name).members[0].get().atomic_group_class.href @@ -199,3 +227,31 @@ plan_id=self._plan_id, test_config_id=test_config_id, afe_job_id=job.get().id) + + + def _run_execute_before(self, controller): + """ + Execute the global support's execute_before() for the plan + """ + self._run_global_support(controller, 'execute_before') + + + def _run_execute_after(self, controller, tko_test_id, success): + """ + Execute the global support's execute_after() for the plan + """ + self._run_global_support(controller, 'execute_after', + tko_test_id=tko_test_id, success=success) + + + def _run_global_support(self, controller, function_name, **kwargs): + plan = self._planner_rpc.run('get_plan', id=self._plan_id) + if plan['support']: + context = {'model_attributes': afe_model_attributes} + exec plan['support'] in context + function = context.get(function_name) + if function: + if not callable(function): + raise Exception('Global support defines %s, but it is not ' + 'callable' % function_name) + function(controller, **kwargs) --- /dev/null 2009-12-17 12:29:38.000000000 -0800 +++ autotest/frontend/planner/execution_engine_unittest.py 2010-04-06 14:05:56.000000000 -0700 @@ -0,0 +1,293 @@ +#!/usr/bin/python + +import unittest +import common +from autotest_lib.frontend import setup_django_environment +from autotest_lib.client.common_lib.test_utils import mock +from autotest_lib.frontend.afe import frontend_test_utils, models as afe_models +from autotest_lib.frontend.afe import model_attributes as afe_model_attributes +from autotest_lib.frontend.shared import rest_client +from autotest_lib.frontend.planner import models, execution_engine, support +from autotest_lib.frontend.planner import model_attributes + + +class MockObject(object): + """ + Empty mock object class, so that setattr() works on all names + """ + pass + + +class MockAfeRest(object): + jobs = MockObject() + execution_info = MockObject() + queue_entries_request = MockObject() + + +class MockRestJobs(object): + def __init__(self, total_results): + self.total_results = total_results + + +class MockExecutionInfo(object): + execution_info = {} + + +class MockQueueEntriesRequest(object): + queue_entries = object() + + +class MockExecutionEngine(execution_engine.ExecutionEngine): + _planner_rpc = MockObject() + _tko_rpc = object() + _plan_id = object() + _server = object() + _afe_rest = MockAfeRest() + _label_name = object() + + + def __init__(self, *args, **kwargs): + pass + + +class MockTestPlanController(support.TestPlanController): + def __init__(self, *args, **kwargs): + super(MockTestPlanController, self).__init__(machine=None, + test_alias=None) + + +class ExecutionEngineTest(unittest.TestCase, + frontend_test_utils.FrontendTestMixin): + def setUp(self): + self._frontend_common_setup() + self.engine = MockExecutionEngine() + + + def tearDown(self): + self._frontend_common_teardown() + + + def _setup_test_initialize_plan(self): + self.god.stub_function(self.engine._planner_rpc, 'run') + self.god.stub_function(self.engine._afe_rest.jobs, 'get') + self.god.stub_function(self.engine, '_wait_for_initialization') + + + def test_initialize_plan_new_plan(self): + self._setup_test_initialize_plan() + self.god.stub_function(self.engine, '_launch_set_atomic_group_job') + + self.engine._planner_rpc.run.expect_call( + 'get_plan', id=self.engine._plan_id).and_return( + {'name': 'plan'}) + self.engine._afe_rest.jobs.get.expect_call( + name='plan_set_atomic_group').and_return(MockRestJobs(None)) + self.engine._launch_set_atomic_group_job.expect_call( + 'plan_set_atomic_group') + self.engine._wait_for_initialization.expect_call() + + self.engine._initialize_plan() + self.god.check_playback + + + def test_initialize_plan_existing(self): + self._setup_test_initialize_plan() + + self.engine._planner_rpc.run.expect_call( + 'get_plan', id=self.engine._plan_id).and_return( + {'name': 'plan'}) + self.engine._afe_rest.jobs.get.expect_call( + name='plan_set_atomic_group').and_return(MockRestJobs(object())) + self.engine._wait_for_initialization.expect_call() + + self.engine._initialize_plan() + self.god.check_playback + + + def _setup_test_launch_atomic_group_job(self, name): + DUMMY_CONTROL = object() + DUMMY_EXECUTION_INFO = MockExecutionInfo() + DUMMY_QUEUE_ENTRIES_REQUEST = MockQueueEntriesRequest() + + self.god.stub_function(self.engine._planner_rpc, 'run') + self.god.stub_function(self.engine._afe_rest.execution_info, 'get') + self.god.stub_function( + self.engine._afe_rest.queue_entries_request, 'get') + + self.engine._planner_rpc.run.expect_call( + 'get_hosts', plan_id=self.engine._plan_id).and_return( + self.hosts) + self.engine._planner_rpc.run.expect_call( + 'get_atomic_group_control_file').and_return(DUMMY_CONTROL) + self.engine._afe_rest.execution_info.get.expect_call().and_return( + DUMMY_EXECUTION_INFO) + self.engine._afe_rest.queue_entries_request.get.expect_call( + hosts=self.hosts).and_return(DUMMY_QUEUE_ENTRIES_REQUEST) + + DUMMY_EXECUTION_INFO.execution_info = { + 'control_file': DUMMY_CONTROL, + 'cleanup_before_job': afe_model_attributes.RebootBefore.NEVER, + 'cleanup_after_job': afe_model_attributes.RebootAfter.NEVER, + 'run_verify': False, + 'machines_per_execution': len(self.hosts)} + + keyvals = {'server': self.engine._server, + 'label_name': self.engine._label_name, + 'plan_id': self.engine._plan_id} + + job_req = {'name': name, + 'execution_info': DUMMY_EXECUTION_INFO.execution_info, + 'queue_entries': DUMMY_QUEUE_ENTRIES_REQUEST.queue_entries, + 'keyvals': keyvals} + + return job_req + + + def test_launch_atomic_group_job(self): + job_req = self._setup_test_launch_atomic_group_job('atomic_group_job') + self.god.stub_function(self.engine._afe_rest.jobs, 'post') + + self.engine._afe_rest.jobs.post.expect_call(job_req) + + self.engine._launch_set_atomic_group_job('atomic_group_job') + self.god.check_playback() + + + def _setup_mock_controller(self, controller_options): + mock_controller = MockTestPlanController() + for key, value in controller_options.iteritems(): + setattr(mock_controller, key, value) + self.god.stub_with(support, 'TestPlanController', + lambda *args, **kwargs : mock_controller) + return mock_controller + + + def _test_process_finished_runs_helper(self, status, should_block=False, + controller_options={}): + Status = model_attributes.TestRunStatus + TEST_RUN_ID = object() + TKO_TEST_ID = object() + HOST_ID = object() + + mock_controller = self._setup_mock_controller(controller_options) + + self.god.stub_function(self.engine._planner_rpc, 'run') + self.god.stub_function(self.engine, '_run_execute_after') + + test_run = {'id': TEST_RUN_ID, + 'host': {'host': self.hosts[0].hostname, + 'id': HOST_ID}, + 'test_job': {'test_config': {'alias': 'test_alias'}}, + 'tko_test': TKO_TEST_ID, + 'status': status} + + self.engine._planner_rpc.run.expect_call( + 'get_test_runs', + plan__id=self.engine._plan_id, + status__in=(Status.PASSED, Status.FAILED), + finalized=False).and_return([test_run]) + self.engine._run_execute_after.expect_call( + mock_controller, tko_test_id=TKO_TEST_ID, + success=(status == Status.PASSED)) + if should_block: + self.engine._planner_rpc.run.expect_call('modify_host', id=HOST_ID, + blocked=True) + self.engine._planner_rpc.run.expect_call('modify_test_run', + id=TEST_RUN_ID, finalized=True) + + self.engine._process_finished_runs() + + self.god.check_playback() + + + def test_process_finished_runs_pass(self): + self._test_process_finished_runs_helper( + model_attributes.TestRunStatus.PASSED) + + + def test_process_finished_runs_fail(self): + self._test_process_finished_runs_helper( + model_attributes.TestRunStatus.FAILED, should_block=True) + + + def test_process_finished_runs_fail_unblock(self): + self._test_process_finished_runs_helper( + model_attributes.TestRunStatus.FAILED, should_block=False, + controller_options={'_unblock': True}) + + + def _test_schedule_new_runs_helper(self, complete=False, should_skip=False, + controller_options={}): + TEST_CONFIG_ID = object() + + self.god.stub_function(self.engine._planner_rpc, 'run') + self.god.stub_function(self.engine, '_run_execute_before') + + result = {'complete': complete, + 'next_configs': [{'next_test_config_id': TEST_CONFIG_ID, + 'host': self.hosts[0].hostname, + 'next_test_config_alias': object()}]} + + mock_controller = self._setup_mock_controller(controller_options) + + self.engine._planner_rpc.run.expect_call( + 'get_next_test_configs', + plan_id=self.engine._plan_id).and_return(result) + + if not complete: + self.engine._run_execute_before.expect_call(mock_controller) + + if should_skip: + self.engine._planner_rpc.run.expect_call( + 'skip_test', test_config_id=TEST_CONFIG_ID, + hostname=self.hosts[0].hostname) + else: + self.god.stub_function(self.engine, '_run_job') + self.engine._run_job.expect_call( + hostname=self.hosts[0].hostname, + test_config_id=TEST_CONFIG_ID, + cleanup_before_job=mock_controller._reboot_before, + cleanup_after_job=mock_controller._reboot_after, + run_verify=mock_controller._run_verify) + + self.engine._schedule_new_runs() + + self.god.check_playback() + + + def test_schedule_new_runs(self): + self._test_schedule_new_runs_helper() + + + def test_schedule_new_runs_complete(self): + self._test_schedule_new_runs_helper(complete=True) + + + def test_schedule_new_runs_skip(self): + self._test_schedule_new_runs_helper(should_skip=True, + controller_options={'_skip': True}) + + + def test_run_global_support(self): + self._ran_global_support = False + support = """ +def test_global_support(controller): + controller._ran_global_support = True +""" + + DUMMY_PLAN = {'support': support} + + self.god.stub_function(self.engine._planner_rpc, 'run') + + self.engine._planner_rpc.run.expect_call( + 'get_plan', id=self.engine._plan_id).and_return(DUMMY_PLAN) + + self.engine._run_global_support(controller=self, + function_name='test_global_support') + + self.assertTrue(self._ran_global_support) + self.god.check_playback() + + +if __name__ == '__main__': + unittest.main() --- autotest/frontend/planner/models.py 2010-04-05 13:07:24.000000000 -0700 +++ autotest/frontend/planner/models.py 2010-04-06 14:05:56.000000000 -0700 @@ -19,7 +19,7 @@ Optional: label_override: A label to apply to each Autotest job. - support: The global support object to apply to this plan + support: The global support script to apply to this plan """ name = dbmodels.CharField(max_length=255, unique=True) label_override = dbmodels.CharField(max_length=255, null=True, blank=True) @@ -138,12 +138,15 @@ estimated_runtime: Time in hours that the test is expected to run. Will be automatically generated (on the frontend) for tests in Autotest. + skipped_hosts: Hosts that are going to skip this test. """ alias = dbmodels.CharField(max_length=255) control_file = dbmodels.ForeignKey(ControlFile) is_server = dbmodels.BooleanField(default=True) execution_order = dbmodels.IntegerField(blank=True) estimated_runtime = dbmodels.IntegerField() + skipped_hosts = dbmodels.ManyToManyField( + afe_models.Host, db_table='planner_test_configs_skipped_hosts') class Meta: db_table = 'planner_test_configs' --- autotest/frontend/planner/rpc_interface.py 2010-04-05 13:07:24.000000000 -0700 +++ autotest/frontend/planner/rpc_interface.py 2010-04-06 14:05:56.000000000 -0700 @@ -27,12 +27,6 @@ models.Plan.smart_get(id).update_object(data) -def get_test_runs(**filter_data): - return afe_rpc_utils.prepare_for_serialization( - [test_run.get_object_dict() for test_run - in models.TestRun.objects.filter(**filter_data)]) - - def modify_test_run(id, **data): models.TestRun.objects.get(id=id).update_object(data) @@ -70,7 +64,7 @@ is_server: True if is a server-side control file estimated_runtime: estimated number of hours this test will run - @param support: the global support object + @param support: the global support script @param label_override: label to prepend to all AFE jobs for this test plan. Defaults to the plan name. """ @@ -185,9 +179,10 @@ rpc_utils.update_hosts_table(plan) for host in models.Host.objects.filter(plan=plan): - next_test_config_id = rpc_utils.compute_next_test_config(plan, host) - if next_test_config_id: - config = {'next_test_config_id': next_test_config_id, + next_test_config = rpc_utils.compute_next_test_config(plan, host) + if next_test_config: + config = {'next_test_config_id': next_test_config.id, + 'next_test_config_alias': next_test_config.alias, 'host': host.host.hostname} result['next_configs'].append(config) @@ -276,6 +271,33 @@ return result +def get_test_runs(**filter_data): + """ + Gets a list of test runs that match the filter data. + + Returns a list of expanded TestRun object dictionaries. Specifically, the + "host" and "test_job" fields are expanded. Additionally, the "test_config" + field of the "test_job" expansion is also expanded. + """ + result = [] + for test_run in models.TestRun.objects.filter(**filter_data): + test_run_dict = test_run.get_object_dict() + test_run_dict['host'] = test_run.host.get_object_dict() + test_run_dict['test_job'] = test_run.test_job.get_object_dict() + test_run_dict['test_job']['test_config'] = ( + test_run.test_job.test_config.get_object_dict()) + result.append(test_run_dict) + return result + + +def skip_test(test_config_id, hostname): + """ + Marks a test config as "skipped" for a given host + """ + config = models.TestConfig.objects.get(id=test_config_id) + config.skipped_hosts.add(afe_models.Host.objects.get(hostname=hostname)) + + def get_static_data(): result = {'motd': afe_rpc_utils.get_motd()} return result --- autotest/frontend/planner/rpc_interface_unittest.py 2010-04-06 14:05:56.000000000 -0700 +++ autotest/frontend/planner/rpc_interface_unittest.py 2010-04-06 14:05:56.000000000 -0700 @@ -10,6 +10,12 @@ from autotest_lib.frontend.tko import models as tko_models +class DummyTestConfig(object): + def __init__(self): + self.id = object() + self.alias = object() + + class RpcInterfaceTest(unittest.TestCase, planner_test_utils.PlannerTestMixin): def setUp(self): @@ -74,8 +80,8 @@ def test_get_next_test_configs(self): - DUMMY_CONFIGS = {'host1': object(), - 'host2': object()} + DUMMY_CONFIGS = {'host1': DummyTestConfig(), + 'host2': DummyTestConfig()} DUMMY_COMPLETE = object() self.god.stub_function(rpc_utils, 'compute_next_test_config') @@ -95,7 +101,9 @@ for config in result['next_configs']: self.assertTrue(config['host'] in DUMMY_CONFIGS) self.assertEqual(config['next_test_config_id'], - DUMMY_CONFIGS[config['host']]) + DUMMY_CONFIGS[config['host']].id) + self.assertEqual(config['next_test_config_alias'], + DUMMY_CONFIGS[config['host']].alias) def test_update_test_runs(self): --- autotest/frontend/planner/rpc_utils.py 2010-04-06 14:05:56.000000000 -0700 +++ autotest/frontend/planner/rpc_utils.py 2010-04-06 14:05:56.000000000 -0700 @@ -111,8 +111,8 @@ afe_job_ids = afe_jobs.values_list('afe_job', flat=True) hqes = afe_models.HostQueueEntry.objects.filter(job__id__in=afe_job_ids, host=host.host) - if not hqes: - return test_config.id + if not hqes and not bool(test_config.skipped_hosts.filter(host=host)): + return test_config for hqe in hqes: if not hqe.complete: # HostQueueEntry still active for this host, --- autotest/frontend/planner/rpc_utils_unittest.py 2010-04-06 14:05:56.000000000 -0700 +++ autotest/frontend/planner/rpc_utils_unittest.py 2010-04-06 14:05:56.000000000 -0700 @@ -108,7 +108,7 @@ hqe.save() self.assertEqual( - test_config.id, + test_config, rpc_utils.compute_next_test_config(self._plan, self._planner_host)) self.assertFalse(self._planner_host.complete) --- /dev/null 2009-12-17 12:29:38.000000000 -0800 +++ autotest/frontend/planner/support.py 2010-04-06 14:05:56.000000000 -0700 @@ -0,0 +1,84 @@ +import common +from autotest_lib.frontend.afe import model_attributes as afe_model_attributes + +class TestPlanController(object): + """ + Allows a TestPlanSupport to manage the test plan. + + Contains the variables that the TestPlanSupport methods can manipulate, as + well as methods for controlling the flow of the test plan. + """ + def __init__(self, machine, test_alias, *args, **kwargs): + super(TestPlanController, self).__init__(*args, **kwargs) + self.machine = machine + self.test_alias = test_alias + + self._skip = False + self._fail = None + self._unblock = False + + self._reboot_before = afe_model_attributes.RebootBefore.IF_DIRTY + self._reboot_after = afe_model_attributes.RebootAfter.ALWAYS + self._run_verify = True + + + def skip_test(self): + """ + Call this in execute_before() to skip the current test. + """ + self._skip = True + + + def fail_test(self, reason, attributes={}): + """ + Fails the test with the reason and optional attributes provided. + + Call this in execute_before() to force the test to fail, setting the + reason to the provided reason. You may optionally specify some test + attributes to set as well, as a dictionary. + """ + self._fail = (reason, attributes) + + + def unblock(self): + """ + Call this in execute_after() to keep the host unblocked. + + Hosts will block by default if a test fails. If this has been called, + the host will be unblocked and will continue in the plan. + + You do not need to call this method for the test plan to continue if the + test succeeded. Calling this method from a successful run has no effect. + """ + self._unblock = True + + + def set_reboot_before(self, reboot_before): + """ + Sets the upcoming job's "Reboot Before" option. + + Must be a value from the RebootBefore frontend model attributes. + Defaults to IF_DIRTY. + """ + assert reboot_before in afe_model_attributes.RebootBefore.values + self._reboot_before = reboot_before + + + def set_reboot_after(self, reboot_after): + """ + Sets the upcoming job's "Reboot After" option. + + Must be a value from the RebootAfter frontend model attributes. + Defaults to ALWAYS. + """ + assert reboot_after in afe_model_attributes.RebootAfter.values + self._reboot_after = reboot_after + + + def set_run_verify(self, run_verify): + """ + Sets whether or not the job should run the "Verify" stage. + + Defaults to True. + """ + self._run_verify = run_verify _______________________________________________ Autotest mailing list [email protected] http://test.kernel.org/cgi-bin/mailman/listinfo/autotest
