Implementation of Test Planner execution engine. Is currently able to schedule single-host tests and place the results in the proper planner tables for analysis.
TODO: global support object, execution_engine.py unit tests Signed-off-by: James Ren <[email protected]> --- autotest/frontend/afe/model_logic.py 2010-03-08 17:56:20.000000000 -0800 +++ autotest/frontend/afe/model_logic.py 2010-03-08 17:56:20.000000000 -0800 @@ -947,7 +947,7 @@ if isinstance(id_or_name, (int, long)): return manager.get(pk=id_or_name) - if isinstance(id_or_name, basestring): + if isinstance(id_or_name, basestring) and hasattr(cls, 'name_field'): return manager.get(**{cls.name_field : id_or_name}) raise ValueError( 'Invalid positional argument: %s (%s)' % (id_or_name, --- autotest/frontend/afe/resources.py 2010-03-08 17:56:20.000000000 -0800 +++ autotest/frontend/afe/resources.py 2010-03-08 17:56:20.000000000 -0800 @@ -554,6 +554,8 @@ for label_name in meta_hosts: entry = Label.from_uri_args(self._request, label_name) entries.append({'meta_host': entry.link()}) + if atomic_group_class: + entries.append({'atomic_group_class': atomic_group_class}) result = self.link() result['queue_entries'] = entries @@ -675,9 +677,9 @@ label_entry = containing_collection.resolve_link( queue_entry['meta_host']) metahost_label_objects.append(label_entry.instance) - if 'atomic_group' in queue_entry: + if 'atomic_group_class' in queue_entry: atomic_group_entry = containing_collection.resolve_link( - queue_entry['atomic_group']) + queue_entry['atomic_group_class']) if atomic_group: assert atomic_group_entry.instance.id == atomic_group.id else: --- autotest/frontend/planner/execution_engine.py 2010-03-08 17:56:20.000000000 -0800 +++ autotest/frontend/planner/execution_engine.py 2010-03-08 17:56:20.000000000 -0800 @@ -1,10 +1,26 @@ +import time, logging +from autotest_lib.frontend.afe import model_attributes as afe_model_attributes +from autotest_lib.frontend.shared import rest_client +from autotest_lib.frontend.planner import model_attributes +from autotest_lib.server import frontend + + +TICK_INTERVAL_SECS = 10 + class ExecutionEngine(object): """ Provides the Test Planner execution engine """ - def __init__(self, plan_id): - self.plan_id = plan_id + _planner_rpc = frontend.Planner() + _tko_rpc = frontend.TKO() + + def __init__(self, plan_id, server, label_name): + self._plan_id = plan_id + self._server = server + self._afe_rest = rest_client.Resource.load( + 'http://%s/afe/server/resources' % server) + self._label_name = label_name def start(self): @@ -13,4 +29,173 @@ Thread remains in this method until the execution engine is complete. """ - pass + self._initialize_plan() + + while True: + if self._tick(): + break + time.sleep(TICK_INTERVAL_SECS) + + self._cleanup() + + + def _initialize_plan(self): + """ + Performs actions necessary to start a test plan. + + Adds the hosts into the proper atomic group, and waits for the plan to + be ready to start before returning + """ + plan = self._planner_rpc.run('get_plan', id=self._plan_id) + name = plan['name'] + '_set_atomic_group' + if not self._afe_rest.jobs.get(name=name).total_results: + self._launch_set_atomic_group_job(name) + + self._wait_for_initialization() + + + def _launch_set_atomic_group_job(self, name): + """ + Launch the job to set the hosts' atomic group, and initate the plan + + If the hosts are already part of an atomic group, wait for a tick and + try again. Return when successful + """ + while True: + hosts = self._planner_rpc.run('get_hosts', plan_id=self._plan_id) + control = self._planner_rpc.run('get_atomic_group_control_file') + + info = self._afe_rest.execution_info.get().execution_info + info['control_file'] = control + info['cleanup_before_job'] = afe_model_attributes.RebootBefore.NEVER + info['cleanup_after_job'] = afe_model_attributes.RebootAfter.NEVER + info['run_verify'] = False + info['machines_per_execution'] = len(hosts) + + entries = self._afe_rest.queue_entries_request.get( + hosts=hosts).queue_entries + + keyvals = {'server': self._server, + 'label_name': self._label_name, + 'plan_id': self._plan_id} + + job_req = {'name' : name, + 'execution_info' : info, + 'queue_entries' : entries, + 'keyvals' : keyvals} + + try: + self._afe_rest.jobs.post(job_req) + logging.info('created job to set atomic group') + break + except rest_client.ClientError, e: + logging.info('hosts already in atomic group') + logging.info('(error was %s)' % e.message) + logging.info('waiting...') + time.sleep(TICK_INTERVAL_SECS) + + + def _wait_for_initialization(self): + while True: + plan = self._planner_rpc.run('get_plan', id=self._plan_id) + if plan['initialized']: + break + logging.info('waiting for initialization...') + time.sleep(TICK_INTERVAL_SECS) + + + def _cleanup(self): + self._afe_rest.labels.get(name=self._label_name).members[0].delete() + + + def _tick(self): + """ + Processes one tick of the execution engine. + + Returns True if the engine has completed the plan. + """ + logging.info('tick') + self._process_finished_runs() + self._check_tko_jobs() + return self._schedule_new_runs() + + + def _process_finished_runs(self): + """ + Finalize the test runs that have finished. + + Look for runs that are in PASSED or FAILED, perform any additional + processing required, and set the entry to 'finalized'. + """ + Status = model_attributes.TestRunStatus + runs = self._planner_rpc.run('get_test_runs', plan__id=self._plan_id, + status__in=(Status.PASSED, Status.FAILED), + finalized=False) + for run in runs: + logging.info('finalizing test run %s', run) + if run['status'] == Status.FAILED: + self._planner_rpc.run('modify_host', id=run['host'], + blocked=True) + self._planner_rpc.run('modify_test_run', id=run['id'], + finalized=True) + + + def _check_tko_jobs(self): + """ + Instructs the server to update the Planner test runs table + + Sends an RPC to have the server pull the proper TKO tests and add them + to the Planner tables. Logs information about what was added. + """ + test_runs_updated = self._planner_rpc.run('update_test_runs', + plan_id=self._plan_id) + for update in test_runs_updated: + logging.info('added %s test run for tko test id %s (%s)', + update['status'], update['tko_test_idx'], + update['hostname']) + + + def _schedule_new_runs(self): + next_configs = self._planner_rpc.run('get_next_test_configs', + plan_id=self._plan_id) + if next_configs['complete']: + return True + + for config in next_configs['next_configs']: + self._run_job(hostname=config['host'], + test_config_id=config['next_test_config_id']) + + return False + + + def _run_job(self, hostname, test_config_id): + test_config = self._planner_rpc.run('get_test_config', + id=test_config_id) + + info = self._afe_rest.execution_info.get().execution_info + info['control_file'] = test_config['control_file']['contents'] + info['is_server'] = test_config['is_server'] + + atomic_group_class = self._afe_rest.labels.get( + name=self._label_name).members[0].get().atomic_group_class.href + + request = self._afe_rest.queue_entries_request.get( + hosts=(hostname,), atomic_group_class=atomic_group_class) + entries = request.queue_entries + + plan = self._planner_rpc.run('get_plan', id=self._plan_id) + prefix = plan['label_override'] + if prefix is None: + prefix = plan['name'] + job_req = {'name' : '%s_%s_%s' % (prefix, test_config['alias'], + hostname), + 'execution_info' : info, + 'queue_entries' : entries} + + logging.info('starting test alias %s for host %s', + test_config['alias'], hostname) + job = self._afe_rest.jobs.post(job_req) + self._planner_rpc.run('add_job', + plan_id=self._plan_id, + test_config_id=test_config_id, + afe_job_id=job.get().id) --- autotest/frontend/planner/execution_engine_control.srv 2010-03-08 17:56:20.000000000 -0800 +++ autotest/frontend/planner/execution_engine_control.srv 2010-03-08 17:56:20.000000000 -0800 @@ -1,60 +1,9 @@ -import time from autotest_lib.client.common_lib import utils -from autotest_lib.frontend.afe import model_attributes -from autotest_lib.frontend.shared import rest_client from autotest_lib.frontend.planner import execution_engine -from autotest_lib.server import frontend - - -TICK_INTERVAL_SECS = 10 keyvals = utils.read_keyval(job.resultdir) -planner_rpc = frontend.Planner() -afe_rest = rest_client.Resource.load( - 'http://%s/afe/server/resources' % keyvals['server']) - - -def _launch_set_atomic_group_job(plan, name): - """Launch the job to set the hosts' atomic group, and initate the plan""" - hosts = planner_rpc.run('get_hosts', plan_id=keyvals['plan_id']) - control = planner_rpc.run('get_atomic_group_control_file') - - info = afe_rest.execution_info.get().execution_info - info['control_file'] = control - info['cleanup_before_job'] = model_attributes.RebootBefore.NEVER - info['cleanup_after_job'] = model_attributes.RebootAfter.NEVER - info['run_verify'] = False - info['machines_per_execution'] = len(hosts) - - entries = afe_rest.queue_entries_request.get(hosts=hosts).queue_entries - - job_req = {'name' : name, - 'execution_info' : info, - 'queue_entries' : entries, - 'keyvals' : keyvals} - - afe_rest.jobs.post(job_req) - - -# Check if the plan is already being initialized, and launch the initialization -# job if not -plan = planner_rpc.run('get_plan', id=keyvals['plan_id']) -name = plan['name'] + '_set_atomic_group' -if not afe_rest.jobs.get(name=name).total_results: - _launch_set_atomic_group_job(plan, name) - - -# Wait for the plan to be initialized -while True: - if planner_rpc.run('get_plan', id=keyvals['plan_id'])['initialized']: - break - time.sleep(TICK_INTERVAL_SECS) - - -# Execution engine main loop -execution_engine.ExecutionEngine(plan_id=keyvals['plan_id']).start() - - -# Cleanup -afe_rest.labels.get(name=keyvals['label_name']).members[0].delete() +engine = execution_engine.ExecutionEngine(plan_id=keyvals['plan_id'], + server=keyvals['server'], + label_name=keyvals['label_name']) +engine.start() --- autotest/frontend/planner/models.py 2010-03-08 17:56:20.000000000 -0800 +++ autotest/frontend/planner/models.py 2010-03-08 17:56:20.000000000 -0800 @@ -3,7 +3,8 @@ from autotest_lib.frontend.afe import models as afe_models from autotest_lib.frontend.afe import model_logic, rpc_utils from autotest_lib.frontend.tko import models as tko_models -from autotest_lib.client.common_lib import enum, utils +from autotest_lib.frontend.planner import model_attributes +from autotest_lib.client.common_lib import utils class Plan(dbmodels.Model, model_logic.ModelExtensions): @@ -75,12 +76,13 @@ host: The AFE host complete: True if and only if this host is finished in the test plan blocked: True if and only if the host is blocked (not executing tests) + added_by_label: True if and only if the host was added because of a host + label (as opposed to being explicitly added) """ host = dbmodels.ForeignKey(afe_models.Host) complete = dbmodels.BooleanField(default=False) blocked = dbmodels.BooleanField(default=False) - - Status = enum.Enum('Finished', 'Running', 'Blocked', string_values=True) + added_by_label = dbmodels.BooleanField(default=False) class Meta: db_table = 'planner_hosts' @@ -88,30 +90,26 @@ def status(self): if self.complete: - return Host.Status.FINISHED + return model_attributes.HostStatus.FINISHED if self.blocked: - return Host.Status.BLOCKED - return Host.Status.RUNNING + return model_attributes.HostStatus.BLOCKED + return model_attributes.HostStatus.RUNNING def _get_details_unicode(self): return 'Host: %s' % self.host.hostname - @classmethod - def smart_get(cls, id): - raise NotImplementedError('Planner hosts do not support smart_get()') - - -class ControlFile(model_logic.ModelWithHash): +class ControlFile(model_logic.ModelWithHash, + model_logic.ModelExtensions): """A control file. Immutable once added to the table Required: contents: The text of the control file Others: - control_hash: The SHA1 hash of the control file, for duplicate detection - and fast search + the_hash: The SHA1 hash of the control file, for duplicate detection + and fast search """ contents = dbmodels.TextField() @@ -128,12 +126,13 @@ return u'Control file id %s (SHA1: %s)' % (self.id, self.control_hash) -class Test(ModelWithPlan): +class TestConfig(ModelWithPlan, model_logic.ModelExtensions): """A planned test Required: alias: The name to give this test within the plan. Unique with plan id test_control_file: The control file to run + is_server: True if this control file is a server-side test execution_order: An integer describing when this test should be run in the test plan estimated_runtime: Time in hours that the test is expected to run. Will @@ -142,27 +141,28 @@ """ alias = dbmodels.CharField(max_length=255) control_file = dbmodels.ForeignKey(ControlFile) + is_server = dbmodels.BooleanField(default=True) execution_order = dbmodels.IntegerField(blank=True) estimated_runtime = dbmodels.IntegerField() class Meta: - db_table = 'planner_tests' + db_table = 'planner_test_configs' ordering = ('execution_order',) unique_together = (('plan', 'alias'),) def _get_details_unicode(self): - return 'Planned test - Control file id %s' % self.control_file.id + return 'Planned test config - Control file id %s' % self.control_file.id -class Job(ModelWithPlan): +class Job(ModelWithPlan, model_logic.ModelExtensions): """Represents an Autotest job initiated for a test plan Required: - test: The Test associated with this Job + test: The TestConfig associated with this Job afe_job: The Autotest job """ - test = dbmodels.ForeignKey(Test) + test_config = dbmodels.ForeignKey(TestConfig) afe_job = dbmodels.ForeignKey(afe_models.Job) class Meta: @@ -189,7 +189,7 @@ return u'Bug external ID %s' % self.external_uid -class TestRun(ModelWithPlan): +class TestRun(ModelWithPlan, model_logic.ModelExtensions): """An individual test run from an Autotest job for the test plan. Each Job object may have multiple TestRun objects associated with it. @@ -209,12 +209,13 @@ Optional: bugs: Bugs filed that a relevant to this run """ - Status = enum.Enum('Active', 'Passed', 'Failed', string_values=True) - test_job = dbmodels.ForeignKey(Job) tko_test = dbmodels.ForeignKey(tko_models.Test) host = dbmodels.ForeignKey(Host) - status = dbmodels.CharField(max_length=16, choices=Status.choices()) + status = dbmodels.CharField( + max_length=16, + choices=model_attributes.TestRunStatus.choices(), + default=model_attributes.TestRunStatus.ACTIVE) finalized = dbmodels.BooleanField(default=False) seen = dbmodels.BooleanField(default=False) triaged = dbmodels.BooleanField(default=False) @@ -224,6 +225,7 @@ class Meta: db_table = 'planner_test_runs' + unique_together = (('plan', 'test_job', 'tko_test', 'host'),) def _get_details_unicode(self): @@ -294,12 +296,11 @@ name: The name given to the object encoded_object: The actual object """ - Type = enum.Enum('support', 'triage', 'autoprocess', 'custom_query', - string_values=True) - user = dbmodels.ForeignKey(afe_models.User) - object_type = dbmodels.CharField(max_length=16, - choices=Type.choices(), db_column='type') + object_type = dbmodels.CharField( + max_length=16, + choices=model_attributes.SavedObjectType.choices(), + db_column='type') name = dbmodels.CharField(max_length=255) encoded_object = dbmodels.TextField() @@ -337,8 +338,8 @@ value: The value Others: - keyval_hash: The result of SHA1(SHA1(key) ++ value), for duplicate - detection and fast search. + the_hash: The result of SHA1(SHA1(key) ++ value), for duplicate + detection and fast search. """ key = dbmodels.CharField(max_length=1024) value = dbmodels.CharField(max_length=1024) --- autotest/frontend/planner/rpc_interface.py 2010-03-08 17:56:20.000000000 -0800 +++ autotest/frontend/planner/rpc_interface.py 2010-03-08 17:56:20.000000000 -0800 @@ -11,6 +11,7 @@ from autotest_lib.frontend import thread_local from autotest_lib.frontend.afe import model_logic, models as afe_models from autotest_lib.frontend.afe import rpc_utils as afe_rpc_utils +from autotest_lib.frontend.tko import models as tko_models from autotest_lib.frontend.planner import models, rpc_utils from autotest_lib.client.common_lib import utils @@ -26,6 +27,32 @@ models.Plan.smart_get(id).update_object(data) +def get_test_runs(**filter_data): + return afe_rpc_utils.prepare_for_serialization( + [test_run.get_object_dict() for test_run + in models.TestRun.objects.filter(**filter_data)]) + + +def modify_test_run(id, **data): + models.TestRun.objects.get(id=id).update_object(data) + + +def modify_host(id, **data): + models.Host.objects.get(id=id).update_object(data) + + +def get_test_config(id): + return afe_rpc_utils.prepare_rows_as_nested_dicts( + models.TestConfig.objects.filter(id=id), ('control_file',))[0] + + +def add_job(plan_id, test_config_id, afe_job_id): + models.Job.objects.create( + plan=models.Plan.objects.get(id=plan_id), + test_config=models.TestConfig.objects.get(id=test_config_id), + afe_job=afe_models.Job.objects.get(id=afe_job_id)) + + # more advanced calls def submit_plan(name, hosts, host_labels, tests, @@ -37,7 +64,12 @@ @param hosts: a list of hostnames @param host_labels: a list of host labels. The hosts under test will update to reflect changes in the label - @param tests: a list of test control files to run + @param tests: an ordered list of dictionaries: + alias: an alias for the test + control_file: the test control file + is_server: True if is a server-side control file + estimated_runtime: estimated number of hours this test + will run @param support: the global support object @param label_override: label to prepend to all AFE jobs for this test plan. Defaults to the plan name. @@ -60,6 +92,21 @@ raise model_logic.ValidationError( {'host_labels': 'host label %s does not exist' % label}) + aliases_seen = set() + test_required_fields = ( + 'alias', 'control_file', 'is_server', 'estimated_runtime') + for test in tests: + for field in test_required_fields: + if field not in test: + raise model_logic.ValidationError( + {'tests': 'field %s is required' % field}) + + alias = test['alias'] + if alias in aliases_seen: + raise model_logic.Validationerror( + {'tests': 'alias %s occurs more than once' % alias}) + aliases_seen.add(alias) + plan, created = models.Plan.objects.get_or_create(name=name) if not created: raise model_logic.ValidationError( @@ -67,25 +114,36 @@ try: label = rpc_utils.create_plan_label(plan) + try: + for i, test in enumerate(tests): + control, _ = models.ControlFile.objects.get_or_create( + contents=test['control_file']) + models.TestConfig.objects.create( + plan=plan, alias=test['alias'], control_file=control, + is_server=test['is_server'], execution_order=i, + estimated_runtime=test['estimated_runtime']) + + plan.label_override = label_override + plan.support = support or '' + plan.save() + + plan.owners.add(afe_models.User.current_user()) + + for host in host_objects: + planner_host = models.Host.objects.create(plan=plan, host=host) + + plan.host_labels.add(*label_objects) + + rpc_utils.start_plan(plan, label) + + return plan.id + except: + label.delete() + raise except: plan.delete() raise - plan.label_override = label_override - plan.support = support or '' - plan.save() - - plan.owners.add(afe_models.User.current_user()) - - for host in host_objects: - planner_host = models.Host.objects.create(plan=plan, host=host) - - plan.host_labels.add(*label_objects) - - rpc_utils.start_plan(plan, label) - - return plan.id - def get_hosts(plan_id): """ @@ -108,3 +166,72 @@ """ return rpc_utils.lazy_load(os.path.join(os.path.dirname(__file__), 'set_atomic_group_control.srv')) + + +def get_next_test_configs(plan_id): + """ + Gets information about the next planner test configs that need to be run + + @param plan_id: the ID or name of the test plan + @return a dictionary: + complete: True or False, shows test plan completion + next_configs: a list of dictionaries: + host: ID of the host + next_test_config_id: ID of the next Planner test to run + """ + plan = models.Plan.smart_get(plan_id) + + result = {'next_configs': []} + + rpc_utils.update_hosts_table(plan) + for host in models.Host.objects.filter(plan=plan): + next_test_config_id = rpc_utils.compute_next_test_config(plan, host) + if next_test_config_id: + config = {'next_test_config_id': next_test_config_id, + 'host': host.host.hostname} + result['next_configs'].append(config) + + rpc_utils.check_for_completion(plan) + result['complete'] = plan.complete + + return result + + +def update_test_runs(plan_id): + """ + Add all applicable TKO jobs to the Planner DB tables + + Looks for tests in the TKO tables that were started as a part of the test + plan, and add them to the Planner tables. + + Also updates the status of the test run if the underlying TKO test move from + an active status to a completed status. + + @return a list of dictionaries: + status: the status of the new (or updated) test run + tko_test_idx: the ID of the TKO test added + hostname: the host added + """ + plan = models.Plan.objects.get(id=plan_id) + updated = [] + + for planner_job in plan.job_set.all(): + known_statuses = dict((test_run.tko_test.test_idx, test_run.status) + for test_run in planner_job.testrun_set.all()) + tko_tests_for_job = tko_models.Test.objects.filter( + job__afe_job_id=planner_job.afe_job.id) + + for tko_test in tko_tests_for_job: + status = rpc_utils.compute_test_run_status(tko_test.status.word) + needs_update = (tko_test.test_idx not in known_statuses or + status != known_statuses[tko_test.test_idx]) + if needs_update: + hostnames = tko_test.machine.hostname.split(',') + for hostname in hostnames: + rpc_utils.add_test_run( + plan, planner_job, tko_test, hostname, status) + updated.append({'status': status, + 'tko_test_idx': tko_test.test_idx, + 'hostname': hostname}) + + return updated --- autotest/frontend/planner/rpc_interface_unittest.py 2010-03-08 17:56:20.000000000 -0800 +++ autotest/frontend/planner/rpc_interface_unittest.py 2010-03-08 17:56:20.000000000 -0800 @@ -3,10 +3,11 @@ import unittest import common from autotest_lib.frontend import setup_django_environment -from autotest_lib.frontend.planner import planner_test_utils +from autotest_lib.frontend.planner import planner_test_utils, model_attributes +from autotest_lib.frontend.planner import rpc_interface, models, rpc_utils from autotest_lib.frontend.afe import model_logic from autotest_lib.frontend.afe import models as afe_models -from autotest_lib.frontend.planner import rpc_interface, models, rpc_utils +from autotest_lib.frontend.tko import models as tko_models class RpcInterfaceTest(unittest.TestCase, @@ -72,5 +73,110 @@ self.assertEqual(set(('host1', 'host2')), set(hosts)) + def test_get_next_test_configs(self): + DUMMY_CONFIGS = {'host1': object(), + 'host2': object()} + DUMMY_COMPLETE = object() + self.god.stub_function(rpc_utils, 'compute_next_test_config') + + for host in models.Host.objects.filter(plan=self._plan): + rpc_utils.compute_next_test_config.expect_call( + self._plan, host).and_return( + DUMMY_CONFIGS[host.host.hostname]) + + def _dummy_check_for_completion(plan): + plan.complete = DUMMY_COMPLETE + rpc_utils.check_for_completion = _dummy_check_for_completion + + result = rpc_interface.get_next_test_configs(self._plan.id) + + self.god.check_playback() + self.assertEqual(result['complete'], DUMMY_COMPLETE) + for config in result['next_configs']: + self.assertTrue(config['host'] in DUMMY_CONFIGS) + self.assertEqual(config['next_test_config_id'], + DUMMY_CONFIGS[config['host']]) + + + def test_update_test_runs(self): + GOOD_STATUS_WORD = 'GOOD' + RUNNING_STATUS_WORD = 'RUNNING' + hostname = self.hosts[0].hostname + + self.god.stub_function(rpc_utils, 'compute_test_run_status') + self.god.stub_function(rpc_utils, 'add_test_run') + + control, _ = models.ControlFile.objects.get_or_create( + contents='test_control') + test_config = models.TestConfig.objects.create(plan=self._plan, + alias='config', + control_file=control, + execution_order=1, + estimated_runtime=1) + afe_job = self._create_job(hosts=(1,)) + planner_host = models.Host.objects.create(plan=self._plan, + host=self.hosts[0]) + planner_job = models.Job.objects.create(plan=self._plan, + test_config=test_config, + afe_job=afe_job) + tko_machine = tko_models.Machine.objects.create(hostname=hostname) + tko_job = tko_models.Job.objects.create(tag='job', + machine=tko_machine, + afe_job_id=afe_job.id) + tko_kernel = tko_models.Kernel.objects.create() + running_status = tko_models.Status.objects.create( + word=RUNNING_STATUS_WORD) + good_status = tko_models.Status.objects.create(word=GOOD_STATUS_WORD) + + # No TKO tests + self.assertEqual([], rpc_interface.update_test_runs(self._plan.id)) + self.god.check_playback() + + # active TKO test + tko_test = tko_models.Test.objects.create(job=tko_job, + machine=tko_machine, + kernel=tko_kernel, + status=running_status) + + rpc_utils.compute_test_run_status.expect_call( + RUNNING_STATUS_WORD).and_return( + model_attributes.TestRunStatus.ACTIVE) + rpc_utils.add_test_run.expect_call( + self._plan, planner_job, tko_test, hostname, + model_attributes.TestRunStatus.ACTIVE) + self.assertEqual(rpc_interface.update_test_runs(self._plan.id), + [{'status': model_attributes.TestRunStatus.ACTIVE, + 'tko_test_idx': tko_test.test_idx, + 'hostname': hostname}]) + self.god.check_playback() + test_run = models.TestRun.objects.create( + plan=self._plan, test_job=planner_job, + tko_test=tko_test, host=planner_host, + status=model_attributes.TestRunStatus.ACTIVE) + + # no change to TKO test + rpc_utils.compute_test_run_status.expect_call( + RUNNING_STATUS_WORD).and_return( + model_attributes.TestRunStatus.ACTIVE) + self.assertEqual([], rpc_interface.update_test_runs(self._plan.id)) + self.god.check_playback() + + # TKO test is now complete, passed + tko_test.status = good_status + tko_test.save() + + rpc_utils.compute_test_run_status.expect_call( + GOOD_STATUS_WORD).and_return( + model_attributes.TestRunStatus.PASSED) + rpc_utils.add_test_run.expect_call( + self._plan, planner_job, tko_test, hostname, + model_attributes.TestRunStatus.PASSED) + self.assertEqual(rpc_interface.update_test_runs(self._plan.id), + [{'status': model_attributes.TestRunStatus.PASSED, + 'tko_test_idx': tko_test.test_idx, + 'hostname': hostname}]) + self.god.check_playback() + + if __name__ == '__main__': unittest.main() --- autotest/frontend/planner/rpc_utils.py 2010-03-08 17:56:20.000000000 -0800 +++ autotest/frontend/planner/rpc_utils.py 2010-03-08 17:56:20.000000000 -0800 @@ -1,8 +1,7 @@ import common import os from autotest_lib.frontend.afe import models as afe_models, model_logic -from autotest_lib.frontend.planner import models -from autotest_lib.frontend.shared import rest_client +from autotest_lib.frontend.planner import models, model_attributes from autotest_lib.client.common_lib import global_config, utils @@ -36,23 +35,22 @@ """ Takes the necessary steps to start a test plan in Autotest """ - afe_rest = rest_client.Resource.load( - 'http://%s/afe/server/resources' % SERVER) - keyvals = {'server': SERVER, 'plan_id': plan.id, 'label_name': label.name} - - info = afe_rest.execution_info.get().execution_info - info['control_file'] = _get_execution_engine_control() - info['machines_per_execution'] = None - - job_req = {'name': plan.name + '_execution_engine', - 'execution_info': info, - 'queue_entries': (), + options = {'name': plan.name + '_execution_engine', + 'priority': afe_models.Job.Priority.MEDIUM, + 'control_file': _get_execution_engine_control(), + 'control_type': afe_models.Job.ControlType.SERVER, + 'synch_count': None, + 'run_verify': False, + 'reboot_before': False, + 'reboot_after': False, + 'dependencies': (), 'keyvals': keyvals} - - afe_rest.jobs.post(job_req) + job = afe_models.Job.create(owner=afe_models.User.current_user().login, + options=options, hosts=()) + job.queue(hosts=()) def _get_execution_engine_control(): @@ -71,3 +69,93 @@ LAZY_LOADED_FILES[path] = utils.read_file(path) return LAZY_LOADED_FILES[path] + + +def update_hosts_table(plan): + """ + Resolves the host labels into host objects + + Adds or removes hosts from the planner Hosts model based on changes to the + host label + """ + label_hosts = set() + + for label in plan.host_labels.all(): + for afe_host in label.host_set.all(): + host, created = models.Host.objects.get_or_create(plan=plan, + host=afe_host) + if created: + host.added_by_label = True + host.save() + + label_hosts.add(host.host.id) + + deleted_hosts = models.Host.objects.filter( + plan=plan, added_by_label=True).exclude(host__id__in=label_hosts) + deleted_hosts.delete() + + +def compute_next_test_config(plan, host): + """ + Gets the next test config that should be run for this plan and host + + Returns None if the host is already running a job. Also sets the host's + complete bit if the host is finished running tests. + """ + if host.blocked: + return None + + test_configs = plan.testconfig_set.order_by('execution_order') + for test_config in test_configs: + afe_jobs = plan.job_set.filter(test_config=test_config) + afe_job_ids = afe_jobs.values_list('afe_job', flat=True) + hqes = afe_models.HostQueueEntry.objects.filter(job__id__in=afe_job_ids, + host=host.host) + if not hqes: + return test_config.id + for hqe in hqes: + if not hqe.complete: + # HostQueueEntry still active for this host, + # should not run another test + return None + + # All HQEs related to this host are complete + host.complete = True + host.save() + return None + + +def check_for_completion(plan): + """ + Checks if a plan is actually complete. Sets complete=True if so + """ + if not models.Host.objects.filter(plan=plan, complete=False): + plan.complete = True + plan.save() + + +def compute_test_run_status(status): + """ + Converts a TKO test status to a Planner test run status + """ + Status = model_attributes.TestRunStatus + if status == 'GOOD': + return Status.PASSED + if status == 'RUNNING': + return Status.ACTIVE + return Status.FAILED + + +def add_test_run(plan, planner_job, tko_test, hostname, status): + """ + Adds a TKO test to the Planner Test Run tables + """ + host = afe_models.Host.objects.get(hostname=hostname) + + planner_host = models.Host.objects.get(plan=plan, host=host) + test_run, _ = models.TestRun.objects.get_or_create(plan=plan, + test_job=planner_job, + tko_test=tko_test, + host=planner_host) + test_run.status = status + test_run.save() --- autotest/frontend/planner/views.py 2010-03-08 17:56:20.000000000 -0800 +++ autotest/frontend/planner/views.py 2010-03-08 17:56:20.000000000 -0800 @@ -15,3 +15,10 @@ def rpc_documentation(request): return rpc_handler_obj.get_rpc_documentation() + + +def model_documentation(request): + model_names = ('Plan', 'Host', 'ControlFile', 'TestConfig', 'Job', 'Bug', + 'TestRun', 'DataType', 'History', 'SavedObject', 'KeyVal', + 'AutoProcess') + return views_common.model_documentation(models, model_names) _______________________________________________ Autotest mailing list [email protected] http://test.kernel.org/cgi-bin/mailman/listinfo/autotest
