Volans has uploaded a new change for review. ( https://gerrit.wikimedia.org/r/361274 )
Change subject: Tests: convert unittest to pytest ...................................................................... Tests: convert unittest to pytest Bug: T154588 Change-Id: Ia75f32b212e36d613c784765a52f1317ac690810 --- M cumin/tests/unit/backends/test_direct.py M cumin/tests/unit/backends/test_puppetdb.py M cumin/tests/unit/test_backends.py M cumin/tests/unit/test_cli.py M cumin/tests/unit/test_grammar.py M cumin/tests/unit/test_query.py M cumin/tests/unit/test_transport.py M cumin/tests/unit/transports/test_clustershell.py M cumin/tests/unit/transports/test_init.py 9 files changed, 563 insertions(+), 570 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/operations/software/cumin refs/changes/74/361274/1 diff --git a/cumin/tests/unit/backends/test_direct.py b/cumin/tests/unit/backends/test_direct.py index feaea9b..d63440b 100644 --- a/cumin/tests/unit/backends/test_direct.py +++ b/cumin/tests/unit/backends/test_direct.py @@ -1,80 +1,77 @@ """Direct backend tests.""" -import unittest +import pytest from ClusterShell.NodeSet import NodeSet from cumin.backends import BaseQuery, InvalidQueryError, direct -class TestDirectQueryClass(unittest.TestCase): - """Direct backend query_class test class.""" - - def test_query_class(self): - """An instance of query_class should be an instance of BaseQuery.""" - query = direct.query_class({}) - self.assertIsInstance(query, BaseQuery) +def test_direct_query_class(): + """An instance of query_class should be an instance of BaseQuery.""" + query = direct.query_class({}) + assert isinstance(query, BaseQuery) -class TestDirectQuery(unittest.TestCase): +class TestDirectQuery(object): """Direct backend query test class.""" - def setUp(self): - """Setup an instace of DirectQuery for each test.""" - self.query = direct.DirectQuery({}) + def setup_method(self, _): + """Setup an instance of DirectQuery for each test.""" + self.query = direct.DirectQuery({}) # pylint: disable=attribute-defined-outside-init def test_instantiation(self): """An instance of DirectQuery should be an instance of BaseQuery.""" - self.assertIsInstance(self.query, BaseQuery) - self.assertDictEqual(self.query.config, {}) + assert isinstance(self.query, BaseQuery) + assert self.query.config == {} def test_add_category_fact(self): """Calling add_category() should raise InvalidQueryError.""" - with self.assertRaisesRegexp(InvalidQueryError, r"Category tokens are not supported"): + with pytest.raises(InvalidQueryError, match='Category tokens are not supported'): self.query.add_category('F', 'key', 'value') def test_add_hosts(self): """Calling add_hosts() should add the hosts to the NodeSet.""" - self.assertListEqual(list(self.query.hosts), []) + assert list(self.query.hosts) == [] # No hosts self.query.add_hosts(NodeSet.fromlist([])) - self.assertListEqual(list(self.query.hosts), []) + assert list(self.query.hosts) == [] # Single host self.query.add_hosts(NodeSet.fromlist(['host'])) - self.assertListEqual(list(self.query.hosts), ['host']) + assert list(self.query.hosts) == ['host'] # Multiple hosts self.query.add_hosts(NodeSet.fromlist(['host1', 'host2'])) - self.assertListEqual(list(self.query.hosts), ['host', 'host1', 'host2']) + assert list(self.query.hosts) == ['host', 'host1', 'host2'] # Negated query self.query.add_hosts(NodeSet.fromlist(['host1']), neg=True) - self.assertListEqual(list(self.query.hosts), ['host', 'host2']) + assert list(self.query.hosts) == ['host', 'host2'] # Globbing is not supported - with self.assertRaisesRegexp(InvalidQueryError, r"Hosts globbing is not supported"): + with pytest.raises(InvalidQueryError, match='Hosts globbing is not supported'): self.query.add_hosts(NodeSet.fromlist(['host1*'])) def test_open_subgroup(self): """Calling open_subgroup() should raise InvalidQueryError.""" - with self.assertRaisesRegexp(InvalidQueryError, r"Subgroups are not supported"): + with pytest.raises(InvalidQueryError, matach='Subgroups are not supported'): self.query.open_subgroup() def test_close_subgroup(self): """Calling close_subgroup() should raise InvalidQueryError.""" - with self.assertRaisesRegexp(InvalidQueryError, r"Subgroups are not supported"): + with pytest.raises(InvalidQueryError, match='Subgroups are not supported'): self.query.close_subgroup() def test_add_and(self): """Calling add_and() should raise InvalidQueryError.""" - with self.assertRaisesRegexp(InvalidQueryError, r"Boolean AND operator is not supported"): + with pytest.raises(InvalidQueryError, match='Boolean AND operator is not supported'): self.query.add_and() def test_add_or(self): """Calling add_or() should be a noop.""" - self.assertListEqual(list(self.query.hosts), []) + assert list(self.query.hosts) == [] self.query.add_or() - self.assertListEqual(list(self.query.hosts), []) + assert list(self.query.hosts) == [] def test_execute(self): """Calling execute() should return the list of hosts.""" - self.assertListEqual(list(self.query.hosts), self.query.execute()) + assert list(self.query.hosts) == self.query.execute() self.query.add_hosts(NodeSet.fromlist(['host1', 'host2'])) - self.assertListEqual(list(self.query.hosts), self.query.execute()) + assert list(self.query.hosts) == self.query.execute() diff --git a/cumin/tests/unit/backends/test_puppetdb.py b/cumin/tests/unit/backends/test_puppetdb.py index b3ee47b..081bf56 100644 --- a/cumin/tests/unit/backends/test_puppetdb.py +++ b/cumin/tests/unit/backends/test_puppetdb.py @@ -1,7 +1,6 @@ """PuppetDB backend tests.""" # pylint: disable=invalid-name -import unittest - +import pytest import requests_mock from requests.exceptions import HTTPError @@ -9,162 +8,157 @@ from cumin.backends import BaseQuery, InvalidQueryError, puppetdb -class TestPuppetDBQueryClass(unittest.TestCase): - """PuppetDB backend query_class test class.""" - - def test_query_class(self): - """An instance of query_class should be an instance of BaseQuery.""" - query = puppetdb.query_class({}) - self.assertIsInstance(query, BaseQuery) +def test_puppetdb_query_class(): + """An instance of query_class should be an instance of BaseQuery.""" + query = puppetdb.query_class({}) + assert isinstance(query, BaseQuery) -class TestPuppetDBQuery(unittest.TestCase): +class TestPuppetDBQuery(object): """PuppetDB backend query test class.""" - def setUp(self): + def setup_method(self, _): """Setup an instace of PuppetDBQuery for each test.""" - self.query = puppetdb.PuppetDBQuery({}) + self.query = puppetdb.PuppetDBQuery({}) # pylint: disable=attribute-defined-outside-init def test_instantiation(self): """An instance of PuppetDBQuery should be an instance of BaseQuery.""" - self.assertIsInstance(self.query, BaseQuery) - self.assertEqual(self.query.url, 'https://localhost:443/v3/') + assert isinstance(self.query, BaseQuery) + assert self.query.url == 'https://localhost:443/v3/' def test_category_getter(self): """Access to category property should return facts by default.""" - self.assertEqual(self.query.category, 'F') + assert self.query.category == 'F' def test_category_setter(self): """Setting category property should accept only valid values, raise InvalidQueryError otherwise.""" self.query.category = 'F' - self.assertEqual(self.query.category, 'F') + assert self.query.category == 'F' - with self.assertRaisesRegexp(InvalidQueryError, r"Invalid value 'invalid_value'"): + with pytest.raises(InvalidQueryError, match="Invalid value 'invalid_value'"): self.query.category = 'invalid_value' - with self.assertRaisesRegexp(InvalidQueryError, r"Mixed F: and R: queries are currently not supported"): + with pytest.raises(InvalidQueryError, match='Mixed F: and R: queries are currently not supported'): self.query.category = 'R' # Get a new query object to test also setting a resource before a fact query = puppetdb.query_class({}) - self.assertEqual(query.category, 'F') + assert query.category == 'F' query.category = 'R' - self.assertEqual(query.category, 'R') + assert query.category == 'R' - with self.assertRaisesRegexp(InvalidQueryError, r"Mixed F: and R: queries are currently not supported"): + with pytest.raises(InvalidQueryError, match='Mixed F: and R: queries are currently not supported'): query.category = 'F' def test_add_category_fact(self): """Calling add_category() with a fact should add the proper query token to the object.""" - self.assertListEqual(self.query.current_group['tokens'], []) + assert self.query.current_group['tokens'] == [] # Base fact query self.query.add_category('F', 'key', 'value') - self.assertListEqual(self.query.current_group['tokens'], ['["=", ["fact", "key"], "value"]']) + assert self.query.current_group['tokens'] == ['["=", ["fact", "key"], "value"]'] self.query.current_group['tokens'] = [] # Negated query self.query.add_category('F', 'key', 'value', neg=True) - self.assertListEqual(self.query.current_group['tokens'], ['["not", ["=", ["fact", "key"], "value"]]']) + assert self.query.current_group['tokens'] == ['["not", ["=", ["fact", "key"], "value"]]'] self.query.current_group['tokens'] = [] # Different operator self.query.add_category('F', 'key', 'value', operator='>=') - self.assertListEqual(self.query.current_group['tokens'], ['[">=", ["fact", "key"], "value"]']) + assert self.query.current_group['tokens'] == ['[">=", ["fact", "key"], "value"]'] self.query.current_group['tokens'] = [] # Regex operator self.query.add_category('F', 'key', r'value\\escaped', operator='~') - self.assertListEqual(self.query.current_group['tokens'], [r'["~", ["fact", "key"], "value\\\\escaped"]']) + assert self.query.current_group['tokens'] == [r'["~", ["fact", "key"], "value\\\\escaped"]'] # != is not supported by PuppetDB - with self.assertRaisesRegexp(InvalidQueryError, r"PuppetDB backend doesn't support"): + with pytest.raises(InvalidQueryError, match="PuppetDB backend doesn't support"): self.query.add_category('F', 'key', 'value', operator='!=') def test_add_category_resource_base(self): """Calling add_category() with a base resource query should add the proper query token to the object.""" - self.assertListEqual(self.query.current_group['tokens'], []) + assert self.query.current_group['tokens'] == [] self.query.add_category('R', 'key', 'value') - self.assertListEqual(self.query.current_group['tokens'], - ['["and", ["=", "type", "Key"], ["=", "title", "value"]]']) + assert self.query.current_group['tokens'] == ['["and", ["=", "type", "Key"], ["=", "title", "value"]]'] def test_add_category_resource_class(self): """Calling add_category() with a class resource query should add the proper query token to the object.""" - self.assertListEqual(self.query.current_group['tokens'], []) + assert self.query.current_group['tokens'] == [] self.query.add_category('R', 'class', 'classtitle') - self.assertListEqual(self.query.current_group['tokens'], - ['["and", ["=", "type", "Class"], ["=", "title", "Classtitle"]]']) + assert self.query.current_group['tokens'] == ['["and", ["=", "type", "Class"], ["=", "title", "Classtitle"]]'] def test_add_category_resource_class_path(self): """Calling add_category() with a class resource query should add the proper query token to the object.""" - self.assertListEqual(self.query.current_group['tokens'], []) + assert self.query.current_group['tokens'] == [] self.query.add_category('R', 'class', 'resource::path::to::class') - self.assertListEqual(self.query.current_group['tokens'], - ['["and", ["=", "type", "Class"], ["=", "title", "Resource::Path::To::Class"]]']) + assert self.query.current_group['tokens'] == \ + ['["and", ["=", "type", "Class"], ["=", "title", "Resource::Path::To::Class"]]'] def test_add_category_resource_neg(self): """Calling add_category() with a negated resource query should add the proper query token to the object.""" self.query.add_category('R', 'key', 'value', neg=True) - self.assertListEqual(self.query.current_group['tokens'], - ['["not", ["and", ["=", "type", "Key"], ["=", "title", "value"]]]']) + assert self.query.current_group['tokens'] == \ + ['["not", ["and", ["=", "type", "Key"], ["=", "title", "value"]]]'] def test_add_category_resource_regex(self): """Calling add_category() with a regex resource query should add the proper query token to the object.""" self.query.add_category('R', 'key', r'value\\escaped', operator='~') - self.assertListEqual(self.query.current_group['tokens'], - [r'["and", ["=", "type", "Key"], ["~", "title", "value\\\\escaped"]]']) + assert self.query.current_group['tokens'] == \ + [r'["and", ["=", "type", "Key"], ["~", "title", "value\\\\escaped"]]'] def test_add_category_resource_class_regex(self): """Calling add_category() with a regex Class resource query should add the proper query token to the object.""" self.query.add_category('R', 'Class', r'Role::(One|Another)', operator='~') - self.assertListEqual(self.query.current_group['tokens'], - [r'["and", ["=", "type", "Class"], ["~", "title", "Role::(One|Another)"]]']) + assert self.query.current_group['tokens'] == \ + [r'["and", ["=", "type", "Class"], ["~", "title", "Role::(One|Another)"]]'] def test_add_category_resource_parameter(self): """Calling add_category() with a resource's parameter query should add the proper query token to the object.""" self.query.add_category('R', 'resource%param', 'value') - self.assertListEqual(self.query.current_group['tokens'], - ['["and", ["=", "type", "Resource"], ["=", ["parameter", "param"], "value"]]']) + assert self.query.current_group['tokens'] == \ + ['["and", ["=", "type", "Resource"], ["=", ["parameter", "param"], "value"]]'] def test_add_category_resource_parameter_regex(self): """Calling add_category() with a resource's parameter query with a regex should raise InvalidQueryError.""" - with self.assertRaisesRegexp(InvalidQueryError, 'Regex operations are not supported in PuppetDB'): + with pytest.raises(InvalidQueryError, match='Regex operations are not supported in PuppetDB'): self.query.add_category('R', 'resource%param', 'value.*', operator='~') def test_add_category_resource_field(self): """Calling add_category() with a resource's field query should add the proper query token to the object.""" self.query.add_category('R', 'resource@field', 'value') - self.assertListEqual(self.query.current_group['tokens'], - ['["and", ["=", "type", "Resource"], ["=", "field", "value"]]']) + assert self.query.current_group['tokens'] == \ + ['["and", ["=", "type", "Resource"], ["=", "field", "value"]]'] def test_add_category_resource(self): """Calling add_category() with a resource type should add the proper query token to the object.""" self.query.add_category('R', 'Resource') - self.assertListEqual(self.query.current_group['tokens'], ['["and", ["=", "type", "Resource"]]']) + assert self.query.current_group['tokens'] == ['["and", ["=", "type", "Resource"]]'] def test_add_category_resource_parameter_field(self): """Calling add_category() with both a parameter and a field should raise InvalidQueryError.""" - with self.assertRaisesRegexp(InvalidQueryError, 'Resource key cannot contain both'): + with pytest.raises(InvalidQueryError, match='Resource key cannot contain both'): self.query.add_category('R', 'resource@field%param') def test_add_hosts(self): """Calling add_hosts() with a resource should add the proper query token to the object.""" - self.assertListEqual(self.query.current_group['tokens'], []) + assert self.query.current_group['tokens'] == [] # No hosts self.query.add_hosts([]) - self.assertListEqual(self.query.current_group['tokens'], []) + assert self.query.current_group['tokens'] == [] # Single host self.query.add_hosts(['host']) - self.assertListEqual(self.query.current_group['tokens'], ['["or", ["=", "{host_key}", "host"]]']) + assert self.query.current_group['tokens'] == ['["or", ["=", "{host_key}", "host"]]'] self.query.current_group['tokens'] = [] # Multiple hosts self.query.add_hosts(['host1', 'host2']) - self.assertListEqual(self.query.current_group['tokens'], - ['["or", ["=", "{host_key}", "host1"], ["=", "{host_key}", "host2"]]']) + assert self.query.current_group['tokens'] == \ + ['["or", ["=", "{host_key}", "host1"], ["=", "{host_key}", "host2"]]'] self.query.current_group['tokens'] = [] # Negated query self.query.add_hosts(['host1', 'host2'], neg=True) - self.assertListEqual(self.query.current_group['tokens'], - ['["not", ["or", ["=", "{host_key}", "host1"], ["=", "{host_key}", "host2"]]]']) + assert self.query.current_group['tokens'] == \ + ['["not", ["or", ["=", "{host_key}", "host1"], ["=", "{host_key}", "host2"]]]'] self.query.current_group['tokens'] = [] # Globbing hosts self.query.add_hosts(['host1*.domain']) - self.assertListEqual(self.query.current_group['tokens'], [r'["or", ["~", "{host_key}", "^host1.*\\.domain$"]]']) + assert self.query.current_group['tokens'] == [r'["or", ["~", "{host_key}", "^host1.*\\.domain$"]]'] def test_open_subgroup(self): """Calling open_subgroup() should open a subgroup and relate it to it's parent.""" @@ -173,28 +167,28 @@ parent['tokens'].append(child) self.query.open_subgroup() self.query.add_hosts(['host']) - self.assertListEqual(self.query.current_group['tokens'], child['tokens']) - self.assertIsNotNone(self.query.current_group['parent']) + assert self.query.current_group['tokens'] == child['tokens'] + assert self.query.current_group['parent'] is not None def test_close_subgroup(self): """Calling close_subgroup() should close a subgroup and return to the parent's context.""" self.query.open_subgroup() self.query.close_subgroup() - self.assertEqual(len(self.query.current_group['tokens']), 1) - self.assertListEqual(self.query.current_group['tokens'][0]['tokens'], []) - self.assertIsNone(self.query.current_group['parent']) + assert len(self.query.current_group['tokens']) == 1 + assert self.query.current_group['tokens'][0]['tokens'] == [] + assert self.query.current_group['parent'] is None def test_add_and(self): """Calling add_and() should set the boolean property to the current group to 'and'.""" - self.assertIsNone(self.query.current_group['bool']) + assert self.query.current_group['bool'] is None self.query.add_and() - self.assertEqual(self.query.current_group['bool'], 'and') + assert self.query.current_group['bool'] == 'and' def test_add_or(self): """Calling add_or() should set the boolean property to the current group to 'or'.""" - self.assertIsNone(self.query.current_group['bool']) + assert self.query.current_group['bool'] is None self.query.add_or() - self.assertEqual(self.query.current_group['bool'], 'or') + assert self.query.current_group['bool'] == 'or' def test_add_and_or(self): """Calling add_or() and add_and() in the same group should raise InvalidQueryError.""" @@ -202,16 +196,17 @@ self.query.add_or() self.query.add_hosts(['host2']) - with self.assertRaises(InvalidQueryError): + with pytest.raises(InvalidQueryError): self.query.add_and() @requests_mock.Mocker() -class TestPuppetDBQueryExecute(unittest.TestCase): +class TestPuppetDBQueryExecute(object): """PuppetDBQuery test execute() method class.""" - def setUp(self): + def setup_method(self, _): """Setup an instace of PuppetDBQuery for each test.""" + # pylint: disable=attribute-defined-outside-init self.query = puppetdb.PuppetDBQuery({'puppetdb': {'urllib3_disable_warnings': ['SubjectAltNameWarning']}}) def _register_uris(self, requests): @@ -235,16 +230,16 @@ self._register_uris(requests) self.query.add_hosts(['nodes_host1', 'nodes_host2']) hosts = self.query.execute() - self.assertListEqual(sorted(hosts), ['nodes_host1', 'nodes_host2']) - self.assertEqual(requests.call_count, 1) + assert sorted(hosts) == ['nodes_host1', 'nodes_host2'] + assert requests.call_count == 1 def test_resources_endpoint(self, requests): """Calling execute() with a query that goes to the resources endpoint should return the list of hosts.""" self._register_uris(requests) self.query.add_category('R', 'Class', 'value') hosts = self.query.execute() - self.assertListEqual(sorted(hosts), ['resources_host1', 'resources_host2']) - self.assertEqual(requests.call_count, 1) + assert sorted(hosts) == ['resources_host1', 'resources_host2'] + assert requests.call_count == 1 def test_with_boolean_operator(self, requests): """Calling execute() with a query with a boolean operator should return the list of hosts.""" @@ -253,8 +248,8 @@ self.query.add_or() self.query.add_hosts(['nodes_host2']) hosts = self.query.execute() - self.assertListEqual(sorted(hosts), ['nodes_host1', 'nodes_host2']) - self.assertEqual(requests.call_count, 1) + assert sorted(hosts) == ['nodes_host1', 'nodes_host2'] + assert requests.call_count == 1 def test_with_subgroup(self, requests): """Calling execute() with a query with a subgroup return the list of hosts.""" @@ -265,23 +260,23 @@ self.query.add_hosts(['nodes_host2']) self.query.close_subgroup() hosts = self.query.execute() - self.assertListEqual(sorted(hosts), ['nodes_host1', 'nodes_host2']) - self.assertEqual(requests.call_count, 1) + assert sorted(hosts) == ['nodes_host1', 'nodes_host2'] + assert requests.call_count == 1 def test_empty(self, requests): """Calling execute() with a query that return no hosts should return an empty list.""" self._register_uris(requests) hosts = self.query.execute() - self.assertListEqual(hosts, []) - self.assertEqual(requests.call_count, 1) + assert hosts == [] + assert requests.call_count == 1 def test_error(self, requests): """Calling execute() if the request fails it should raise the requests exception.""" self._register_uris(requests) self.query.current_group['tokens'].append('invalid_query') - with self.assertRaises(HTTPError): + with pytest.raises(HTTPError): self.query.execute() - self.assertEqual(requests.call_count, 1) + assert requests.call_count == 1 def test_complex_query(self, requests): """Calling execute() with a complex query should return the exptected structure.""" @@ -299,5 +294,5 @@ self.query.add_and() self.query.add_category('R', 'Class', value='MyClass', operator='=') hosts = self.query.execute() - self.assertListEqual(sorted(hosts), ['resources_host1', 'resources_host2']) - self.assertEqual(requests.call_count, 1) + assert sorted(hosts) == ['resources_host1', 'resources_host2'] + assert requests.call_count == 1 diff --git a/cumin/tests/unit/test_backends.py b/cumin/tests/unit/test_backends.py index ad7988d..2027c5e 100644 --- a/cumin/tests/unit/test_backends.py +++ b/cumin/tests/unit/test_backends.py @@ -1,14 +1,11 @@ """Abstract query tests.""" -import unittest +import pytest from cumin.backends import BaseQuery -class TestBaseQuery(unittest.TestCase): - """Class BaseQuery tests.""" - - def test_instantiation(self): - """Class BaseQuery is not instantiable being an abstract class.""" - with self.assertRaises(TypeError): - BaseQuery({}) # pylint: disable=abstract-class-instantiated +def test_base_query_instantiation(): + """Class BaseQuery is not instantiable being an abstract class.""" + with pytest.raises(TypeError): + BaseQuery({}) # pylint: disable=abstract-class-instantiated diff --git a/cumin/tests/unit/test_cli.py b/cumin/tests/unit/test_cli.py index 5290419..f224a9b 100644 --- a/cumin/tests/unit/test_cli.py +++ b/cumin/tests/unit/test_cli.py @@ -1,12 +1,11 @@ """CLI tests.""" - import os import tempfile -import unittest from logging import DEBUG, INFO import mock +import pytest from cumin import cli, CuminError @@ -16,201 +15,213 @@ _ARGV = ['-c', 'doc/examples/config.yaml', '-d', '-m', 'sync', 'host', 'command1', 'command2'] -class TestCLI(unittest.TestCase): - """CLI module tests.""" +def _validate_parsed_args(args, no_commands=False): + """Validate that the parsed args have the proper values.""" + assert args.debug + assert args.config == 'doc/examples/config.yaml' + assert args.hosts == 'host' + if no_commands: + assert args.dry_run + else: + assert args.commands == ['command1', 'command2'] - def _validate_parsed_args(self, args, no_commands=False): - """Validate that the parsed args have the proper values.""" - self.assertTrue(args.debug) - self.assertEqual(args.config, 'doc/examples/config.yaml') - self.assertEqual(args.hosts, 'host') - if no_commands: - self.assertTrue(args.dry_run) - else: - self.assertEqual(args.commands, ['command1', 'command2']) - def test_parse_args_ok(self): - """A standard set of command line parameters should be properly parsed into their respective variables.""" - args = cli.parse_args(argv=_ARGV) - self._validate_parsed_args(args) +def test_parse_args_ok(): + """A standard set of command line parameters should be properly parsed into their respective variables.""" + args = cli.parse_args(argv=_ARGV) + _validate_parsed_args(args) - with mock.patch.object(cli.sys, 'argv', ['progname'] + _ARGV): - args = cli.parse_args() - self._validate_parsed_args(args) + with mock.patch.object(cli.sys, 'argv', ['progname'] + _ARGV): + args = cli.parse_args() + _validate_parsed_args(args) - def test_parse_args_no_commands(self): - """If no commands are specified, dry-run mode should be implied.""" - args = cli.parse_args(argv=_ARGV[:-2]) - self._validate_parsed_args(args, no_commands=True) - def test_parse_args_no_mode(self): - """If mode is not speficied with multiple commands, parsing the args should raise a parser error.""" - index = _ARGV.index('-m') - with self.assertRaises(SystemExit): - cli.parse_args(argv=_ARGV[:index] + _ARGV[index + 1:]) +def test_parse_args_no_commands(): + """If no commands are specified, dry-run mode should be implied.""" + args = cli.parse_args(argv=_ARGV[:-2]) + _validate_parsed_args(args, no_commands=True) - def test_get_running_user(self): - """Unsufficient permissions or unknown user should raise RuntimeError and a proper user should be detected.""" - env = {'USER': None, 'SUDO_USER': None} - with mock.patch('os.getenv', env.get): - with self.assertRaisesRegexp(CuminError, r'Unsufficient privileges, run with sudo'): - cli.get_running_user() - env = {'USER': 'root', 'SUDO_USER': None} - with mock.patch('os.getenv', env.get): - with self.assertRaisesRegexp(CuminError, r'Unable to determine real user'): - cli.get_running_user() +def test_parse_args_no_mode(): + """If mode is not speficied with multiple commands, parsing the args should raise a parser error.""" + index = _ARGV.index('-m') + with pytest.raises(SystemExit): + cli.parse_args(argv=_ARGV[:index] + _ARGV[index + 1:]) - with mock.patch('os.getenv', _ENV.get): - self.assertEqual(cli.get_running_user(), 'user') - @mock.patch('cumin.cli.os') - @mock.patch('cumin.cli.RotatingFileHandler') - @mock.patch('cumin.cli.logger') - def test_setup_logging(self, logging, file_handler, mocked_os): - """Calling setup_logging() should properly setup the logger.""" - mocked_os.path.exists.return_value = False - cli.setup_logging('/path/to/filename') - logging.setLevel.assert_called_with(INFO) +def test_get_running_user(): + """Unsufficient permissions or unknown user should raise RuntimeError and a proper user should be detected.""" + env = {'USER': None, 'SUDO_USER': None} + with mock.patch('os.getenv', env.get): + with pytest.raises(CuminError, match='Unsufficient privileges, run with sudo'): + cli.get_running_user() - mocked_os.path.exists.return_value = True - cli.setup_logging('filename', debug=True) - logging.setLevel.assert_called_with(DEBUG) - self.assertTrue(file_handler.called) + env = {'USER': 'root', 'SUDO_USER': None} + with mock.patch('os.getenv', env.get): + with pytest.raises(CuminError, match='Unable to determine real user'): + cli.get_running_user() - def test_parse_config_ok(self): - """The configuration file is properly parsed and accessible.""" - config = cli.parse_config('doc/examples/config.yaml') - self.assertTrue('log_file' in config) + with mock.patch('os.getenv', _ENV.get): + assert cli.get_running_user() == 'user' - def test_parse_config_non_existent(self): - """A CuminError is raised if the configuration file is not available.""" - with self.assertRaisesRegexp(CuminError, 'Unable to read configuration file'): - cli.parse_config('not_existent_config.yaml') - def test_parse_config_invalid(self): - """A CuminError is raised if the configuration cannot be parsed.""" - invalid_yaml = '\n'.join(( - 'foo:', - ' bar: baz', - ' - foobar', - )) - tmpfile, tmpfilepath = tempfile.mkstemp(suffix='config.yaml', prefix='cumin', text=True) - os.write(tmpfile, invalid_yaml) +@mock.patch('cumin.cli.os') +@mock.patch('cumin.cli.RotatingFileHandler') +@mock.patch('cumin.cli.logger') +def test_setup_logging(logging, file_handler, mocked_os): + """Calling setup_logging() should properly setup the logger.""" + mocked_os.path.exists.return_value = False + cli.setup_logging('/path/to/filename') + logging.setLevel.assert_called_with(INFO) - with self.assertRaisesRegexp(CuminError, 'Unable to parse configuration file'): - cli.parse_config(tmpfilepath) + mocked_os.path.exists.return_value = True + cli.setup_logging('filename', debug=True) + logging.setLevel.assert_called_with(DEBUG) + assert file_handler.called - @mock.patch('cumin.cli.stderr') - @mock.patch('cumin.cli.raw_input') - @mock.patch('cumin.cli.sys.stdout.isatty') - @mock.patch('cumin.cli.logger') - def test_sigint_handler(self, logging, isatty, mocked_raw_input, stderr): # pylint: disable=unused-argument - """Calling the SIGINT handler should raise KeyboardInterrupt or not based on tty and answer.""" - # Signal handler called without a tty - isatty.return_value = False - with self.assertRaises(cli.KeyboardInterruptError): - cli.sigint_handler(1, None) - # Signal handler called with a tty - isatty.return_value = True - with self.assertRaises(cli.KeyboardInterruptError): - cli.sigint_handler(1, None) +def test_parse_config_ok(): + """The configuration file is properly parsed and accessible.""" + config = cli.parse_config('doc/examples/config.yaml') + assert 'log_file' in config - # # Signal handler called with a tty, answered 'y' - # isatty.return_value = True - # mocked_raw_input.return_value = 'y' - # with self.assertRaises(cli.KeyboardInterruptError): - # cli.sigint_handler(1, None) - # - # # Signal handler called with a tty, answered 'n' - # isatty.return_value = True - # mocked_raw_input.return_value = 'n' - # self.assertIsNone(cli.sigint_handler(1, None)) - # - # # Signal handler called with a tty, answered 'invalid_answer' - # isatty.return_value = True - # mocked_raw_input.return_value = 'invalid_answer' - # with self.assertRaises(cli.KeyboardInterruptError): - # cli.sigint_handler(1, None) - # - # # Signal handler called with a tty, empty answer - # isatty.return_value = True - # mocked_raw_input.return_value = '' - # with self.assertRaises(cli.KeyboardInterruptError): - # cli.sigint_handler(1, None) - @mock.patch('cumin.cli.tqdm') - def test_stderr(self, tqdm): - """Calling stderr() should call tqdm.write().""" - cli.stderr('message') - self.assertTrue(tqdm.write.called) +def test_parse_config_non_existent(): + """A CuminError is raised if the configuration file is not available.""" + with pytest.raises(CuminError, match='Unable to read configuration file'): + cli.parse_config('not_existent_config.yaml') - @mock.patch('cumin.cli.stderr') - @mock.patch('cumin.cli.raw_input') - @mock.patch('cumin.cli.sys.stdout.isatty') - def test_get_hosts_ok(self, isatty, mocked_raw_input, stderr): - """Calling get_hosts() should query the backend and return the list of hosts.""" - args = cli.parse_args(argv=['host1', 'command1']) - config = {'backend': 'direct'} - isatty.return_value = True - mocked_raw_input.return_value = 'y' - self.assertListEqual(cli.get_hosts(args, config), ['host1']) +def test_parse_config_invalid(): + """A CuminError is raised if the configuration cannot be parsed.""" + invalid_yaml = '\n'.join(( + 'foo:', + ' bar: baz', + ' - foobar', + )) + tmpfile, tmpfilepath = tempfile.mkstemp(suffix='config.yaml', prefix='cumin', text=True) + os.write(tmpfile, invalid_yaml) - mocked_raw_input.return_value = 'n' - with self.assertRaises(cli.KeyboardInterruptError): - cli.get_hosts(args, config) + with pytest.raises(CuminError, match='Unable to parse configuration file'): + cli.parse_config(tmpfilepath) - mocked_raw_input.return_value = 'invalid_answer' - with self.assertRaises(cli.KeyboardInterruptError): - cli.get_hosts(args, config) - mocked_raw_input.return_value = '' - with self.assertRaises(cli.KeyboardInterruptError): - cli.get_hosts(args, config) +@mock.patch('cumin.cli.stderr') +@mock.patch('cumin.cli.raw_input') +@mock.patch('cumin.cli.sys.stdout.isatty') +@mock.patch('cumin.cli.logger') +def test_sigint_handler(logging, isatty, mocked_raw_input, stderr): # pylint: disable=unused-argument + """Calling the SIGINT handler should raise KeyboardInterrupt or not based on tty and answer.""" + # Signal handler called without a tty + isatty.return_value = False + with pytest.raises(cli.KeyboardInterruptError): + cli.sigint_handler(1, None) - self.assertTrue(stderr.called) + # Signal handler called with a tty + isatty.return_value = True + with pytest.raises(cli.KeyboardInterruptError): + cli.sigint_handler(1, None) - @mock.patch('cumin.cli.stderr') - @mock.patch('cumin.cli.sys.stdout.isatty') - def test_get_hosts_no_tty_ko(self, isatty, stderr): - """Calling get_hosts() without a TTY should raise RuntimeError if --dry-run or --force are not specified.""" - args = cli.parse_args(argv=['host1', 'command1']) - config = {'backend': 'direct'} - isatty.return_value = False - with self.assertRaisesRegexp(CuminError, 'Not in a TTY but neither DRY-RUN nor FORCE mode were specified'): - cli.get_hosts(args, config) - self.assertTrue(stderr.called) + # # Signal handler called with a tty, answered 'y' + # isatty.return_value = True + # mocked_raw_input.return_value = 'y' + # with pytest.raises(cli.KeyboardInterruptError): + # cli.sigint_handler(1, None) + # + # # Signal handler called with a tty, answered 'n' + # isatty.return_value = True + # mocked_raw_input.return_value = 'n' + # assert cli.sigint_handler(1, None) is None + # + # # Signal handler called with a tty, answered 'invalid_answer' + # isatty.return_value = True + # mocked_raw_input.return_value = 'invalid_answer' + # with pytest.raises(cli.KeyboardInterruptError): + # cli.sigint_handler(1, None) + # + # # Signal handler called with a tty, empty answer + # isatty.return_value = True + # mocked_raw_input.return_value = '' + # with pytest.raises(cli.KeyboardInterruptError): + # cli.sigint_handler(1, None) - @mock.patch('cumin.cli.stderr') - @mock.patch('cumin.cli.sys.stdout.isatty') - def test_get_hosts_no_tty_dry_run(self, isatty, stderr): - """Calling get_hosts() with or without a TTY with --dry-run should return an empty list.""" - args = cli.parse_args(argv=['--dry-run', 'host1', 'command1']) - config = {'backend': 'direct'} - self.assertListEqual(cli.get_hosts(args, config), []) - isatty.return_value = True - self.assertListEqual(cli.get_hosts(args, config), []) - self.assertTrue(stderr.called) - @mock.patch('cumin.cli.stderr') - @mock.patch('cumin.cli.sys.stdout.isatty') - def test_get_hosts_no_tty_force(self, isatty, stderr): - """Calling get_hosts() with or without a TTY with --force should return the list of hosts.""" - args = cli.parse_args(argv=['--force', 'host1', 'command1']) - config = {'backend': 'direct'} - self.assertListEqual(cli.get_hosts(args, config), ['host1']) - isatty.return_value = True - self.assertListEqual(cli.get_hosts(args, config), ['host1']) - self.assertTrue(stderr.called) +@mock.patch('cumin.cli.tqdm') +def test_stderr(tqdm): + """Calling stderr() should call tqdm.write().""" + cli.stderr('message') + assert tqdm.write.called - @mock.patch('cumin.cli.Transport') - @mock.patch('cumin.cli.stderr') - def test_run(self, stderr, transport): - """Calling run() should query the hosts and execute the commands on the transport.""" - args = cli.parse_args(argv=['--force', 'host1', 'command1']) - config = {'backend': 'direct', 'transport': 'clustershell'} - cli.run(args, config) - transport.new.assert_called_once_with(config, cli.logger) - self.assertTrue(stderr.called) + +@mock.patch('cumin.cli.stderr') +@mock.patch('cumin.cli.raw_input') +@mock.patch('cumin.cli.sys.stdout.isatty') +def test_get_hosts_ok(isatty, mocked_raw_input, stderr): + """Calling get_hosts() should query the backend and return the list of hosts.""" + args = cli.parse_args(argv=['host1', 'command1']) + config = {'backend': 'direct'} + isatty.return_value = True + + mocked_raw_input.return_value = 'y' + assert cli.get_hosts(args, config) == ['host1'] + + mocked_raw_input.return_value = 'n' + with pytest.raises(cli.KeyboardInterruptError): + cli.get_hosts(args, config) + + mocked_raw_input.return_value = 'invalid_answer' + with pytest.raises(cli.KeyboardInterruptError): + cli.get_hosts(args, config) + + mocked_raw_input.return_value = '' + with pytest.raises(cli.KeyboardInterruptError): + cli.get_hosts(args, config) + + assert stderr.called + + +@mock.patch('cumin.cli.stderr') +@mock.patch('cumin.cli.sys.stdout.isatty') +def test_get_hosts_no_tty_ko(isatty, stderr): + """Calling get_hosts() without a TTY should raise RuntimeError if --dry-run or --force are not specified.""" + args = cli.parse_args(argv=['host1', 'command1']) + config = {'backend': 'direct'} + isatty.return_value = False + with pytest.raises(CuminError, match='Not in a TTY but neither DRY-RUN nor FORCE mode were specified'): + cli.get_hosts(args, config) + assert stderr.called + + +@mock.patch('cumin.cli.stderr') +@mock.patch('cumin.cli.sys.stdout.isatty') +def test_get_hosts_no_tty_dry_run(isatty, stderr): + """Calling get_hosts() with or without a TTY with --dry-run should return an empty list.""" + args = cli.parse_args(argv=['--dry-run', 'host1', 'command1']) + config = {'backend': 'direct'} + assert cli.get_hosts(args, config) == [] + isatty.return_value = True + assert cli.get_hosts(args, config) == [] + assert stderr.called + + +@mock.patch('cumin.cli.stderr') +@mock.patch('cumin.cli.sys.stdout.isatty') +def test_get_hosts_no_tty_force(isatty, stderr): + """Calling get_hosts() with or without a TTY with --force should return the list of hosts.""" + args = cli.parse_args(argv=['--force', 'host1', 'command1']) + config = {'backend': 'direct'} + assert cli.get_hosts(args, config) == ['host1'] + isatty.return_value = True + assert cli.get_hosts(args, config) == ['host1'] + assert stderr.called + + +@mock.patch('cumin.cli.Transport') +@mock.patch('cumin.cli.stderr') +def test_run(stderr, transport): + """Calling run() should query the hosts and execute the commands on the transport.""" + args = cli.parse_args(argv=['--force', 'host1', 'command1']) + config = {'backend': 'direct', 'transport': 'clustershell'} + cli.run(args, config) + transport.new.assert_called_once_with(config, cli.logger) + assert stderr.called diff --git a/cumin/tests/unit/test_grammar.py b/cumin/tests/unit/test_grammar.py index dfd77e9..2c78319 100644 --- a/cumin/tests/unit/test_grammar.py +++ b/cumin/tests/unit/test_grammar.py @@ -1,7 +1,5 @@ """Grammar tests.""" -import unittest - from cumin.grammar import grammar from cumin.tests import get_fixture @@ -13,27 +11,27 @@ return token, expected -class TestGrammar(unittest.TestCase): - """Grammar class tests.""" +def test_valid_strings(): + """Run quick pyparsing test over valid grammar strings.""" + results = grammar.runTests(get_fixture('valid_grammars.txt', as_string=True)) + assert results[0] - def test_valid_strings(self): - """Run quick pyparsing test over valid grammar strings.""" - results = grammar.runTests(get_fixture('valid_grammars.txt', as_string=True)) - self.assertTrue(results[0]) - def test_invalid_strings(self): - """Run quick pyparsing test over invalid grammar strings.""" - results = grammar.runTests(get_fixture('invalid_grammars.txt', as_string=True), failureTests=True) - self.assertTrue(results[0]) +def test_invalid_strings(): + """Run quick pyparsing test over invalid grammar strings.""" + results = grammar.runTests(get_fixture('invalid_grammars.txt', as_string=True), failureTests=True) + assert results[0] - def test_single_category_key_token(self): - """A valid single token with a category that has key is properly parsed and interpreted.""" - token, expected = _get_category_key_token() - parsed = grammar.parseString(token, parseAll=True) - self.assertDictEqual(parsed[0].asDict(), expected) - def test_hosts_selection(self): - """A host selection is properly parsed and interpreted.""" - hosts = {'hosts': 'host[10-20,30-40].domain'} - parsed = grammar.parseString(hosts['hosts'], parseAll=True) - self.assertDictEqual(parsed[0].asDict(), hosts) +def test_single_category_key_token(): + """A valid single token with a category that has key is properly parsed and interpreted.""" + token, expected = _get_category_key_token() + parsed = grammar.parseString(token, parseAll=True) + assert parsed[0].asDict() == expected + + +def test_hosts_selection(): + """A host selection is properly parsed and interpreted.""" + hosts = {'hosts': 'host[10-20,30-40].domain'} + parsed = grammar.parseString(hosts['hosts'], parseAll=True) + assert parsed[0].asDict() == hosts diff --git a/cumin/tests/unit/test_query.py b/cumin/tests/unit/test_query.py index 7c2b88d..4dbd2ee 100644 --- a/cumin/tests/unit/test_query.py +++ b/cumin/tests/unit/test_query.py @@ -3,9 +3,9 @@ import logging import os import pkgutil -import unittest import mock +import pytest from ClusterShell.NodeSet import NodeSet from pyparsing import ParseException @@ -28,12 +28,14 @@ return mock.MagicMock(spec_set=BaseQuery) -class TestQuery(unittest.TestCase): +class TestQuery(object): """Query factory class tests.""" + + # pylint: disable=no-self-use def test_invalid_backend(self): """Passing an invalid backend should raise RuntimeError.""" - with self.assertRaisesRegexp(RuntimeError, r"ImportError\('No module named non_existent_backend'"): + with pytest.raises(RuntimeError, match=r"ImportError\('No module named non_existent_backend'"): Query.new({'backend': 'non_existent_backend'}) def test_missing_query_class(self): @@ -41,17 +43,17 @@ module = mock.MagicMock() del module.query_class with mock.patch('importlib.import_module', lambda _: module): - with self.assertRaisesRegexp(RuntimeError, r"AttributeError\('query_class'"): + with pytest.raises(RuntimeError, match=r"AttributeError\('query_class'"): Query.new({'backend': 'invalid_backend'}) def test_valid_backend(self): """Passing a valid backend should return an instance of BaseQuery.""" backends = [name for _, name, _ in pkgutil.iter_modules([os.path.join('cumin', 'backends')])] for backend in backends: - self.assertIsInstance(Query.new({'backend': backend}), BaseQuery) + assert isinstance(Query.new({'backend': backend}), BaseQuery) -class TestQueryBuilder(unittest.TestCase): +class TestQueryBuilder(object): """Class QueryBuilder tests.""" query_string = 'host1 or (not F:key1 = value and R:key2 ~ regex) or host2' @@ -62,10 +64,10 @@ def test_instantiation(self): """Class QueryBuilder should create an instance of a query_class for the given backend.""" query_builder = QueryBuilder(self.query_string, self.config) - self.assertIsInstance(query_builder, QueryBuilder) - self.assertIsInstance(query_builder.query, BaseQuery) - self.assertEqual(query_builder.query_string, self.query_string) - self.assertEqual(query_builder.level, 0) + assert isinstance(query_builder, QueryBuilder) + assert isinstance(query_builder.query, BaseQuery) + assert query_builder.query_string == self.query_string + assert query_builder.level == 0 @mock.patch('cumin.query.Query', QueryFactory) def test_build_valid(self): @@ -94,12 +96,12 @@ def test_build_invalid(self): """QueryBuilder.build() should raise ParseException for an invalid query.""" query_builder = QueryBuilder(self.invalid_query_string, self.config) - with self.assertRaisesRegexp(ParseException, r"Expected end of text"): + with pytest.raises(ParseException, match='Expected end of text'): query_builder.build() @mock.patch('cumin.query.Query', QueryFactory) def test__parse_token(self): """QueryBuilder._parse_token() should raise RuntimeError for an invalid token.""" query_builder = QueryBuilder(self.invalid_query_string, self.config) - with self.assertRaisesRegexp(RuntimeError, r"Invalid query string syntax"): + with pytest.raises(RuntimeError, match='Invalid query string syntax'): query_builder._parse_token('invalid_token') # pylint: disable=protected-access diff --git a/cumin/tests/unit/test_transport.py b/cumin/tests/unit/test_transport.py index 1aa7bd4..2b10ed6 100644 --- a/cumin/tests/unit/test_transport.py +++ b/cumin/tests/unit/test_transport.py @@ -2,32 +2,31 @@ import os import pkgutil -import unittest +import pytest import mock from cumin.transport import Transport from cumin.transports import BaseWorker -class TestTransport(unittest.TestCase): - """Transport factory class tests.""" +def test_invalid_transport(): + """Passing an invalid transport should raise RuntimeError.""" + with pytest.raises(RuntimeError, match=r"ImportError\('No module named non_existent_transport'"): + Transport.new({'transport': 'non_existent_transport'}) - def test_invalid_transport(self): - """Passing an invalid transport should raise RuntimeError.""" - with self.assertRaisesRegexp(RuntimeError, r"ImportError\('No module named non_existent_transport'"): - Transport.new({'transport': 'non_existent_transport'}) - def test_missing_worker_class(self): - """Passing a transport without a defined worker_class should raise RuntimeError.""" - module = mock.MagicMock() - del module.worker_class - with mock.patch('importlib.import_module', lambda _: module): - with self.assertRaisesRegexp(RuntimeError, r"AttributeError\('worker_class'"): - Transport.new({'transport': 'invalid_transport'}) +def test_missing_worker_class(): + """Passing a transport without a defined worker_class should raise RuntimeError.""" + module = mock.MagicMock() + del module.worker_class + with mock.patch('importlib.import_module', lambda _: module): + with pytest.raises(RuntimeError, match=r"AttributeError\('worker_class'"): + Transport.new({'transport': 'invalid_transport'}) - def test_valid_transport(self): - """Passing a valid transport should return an instance of BaseWorker.""" - transports = [name for _, name, _ in pkgutil.iter_modules([os.path.join('cumin', 'transports')])] - for transport in transports: - self.assertIsInstance(Transport.new({'transport': transport}), BaseWorker) + +def test_valid_transport(): + """Passing a valid transport should return an instance of BaseWorker.""" + transports = [name for _, name, _ in pkgutil.iter_modules([os.path.join('cumin', 'transports')])] + for transport in transports: + assert isinstance(Transport.new({'transport': transport}), BaseWorker) diff --git a/cumin/tests/unit/transports/test_clustershell.py b/cumin/tests/unit/transports/test_clustershell.py index 180e8ac..b6f41b5 100644 --- a/cumin/tests/unit/transports/test_clustershell.py +++ b/cumin/tests/unit/transports/test_clustershell.py @@ -1,38 +1,32 @@ """ClusterShell transport tests.""" -# pylint: disable=invalid-name,no-member,protected-access -import unittest +# pylint: disable=invalid-name,no-member,protected-access,attribute-defined-outside-init import mock +import pytest from cumin.transports import BaseWorker, Command, clustershell, State, WorkerError -class TestNode(unittest.TestCase): - """Node class tests.""" - - def test_instantiation(self): - """Default values should be set when a Node instance is created.""" - node = clustershell.Node('name', [Command('command1'), Command('command2')]) - self.assertEqual(node.running_command_index, -1) - self.assertIsInstance(node.state, State) +def test_node_class_instantiation(): + """Default values should be set when a Node instance is created.""" + node = clustershell.Node('name', [Command('command1'), Command('command2')]) + assert node.running_command_index == -1 + assert isinstance(node.state, State) -class TestWorkerClass(unittest.TestCase): - """ClusterShell backend worker_class test class.""" - - @mock.patch('cumin.transports.clustershell.Task.task_self') - def test_worker_class(self, task_self): - """An instance of worker_class should be an instance of BaseWorker.""" - worker = clustershell.worker_class({}) - self.assertIsInstance(worker, BaseWorker) - task_self.assert_called_once_with() +@mock.patch('cumin.transports.clustershell.Task.task_self') +def test_worker_class(task_self): + """An instance of worker_class should be an instance of BaseWorker.""" + worker = clustershell.worker_class({}) + assert isinstance(worker, BaseWorker) + task_self.assert_called_once_with() -class TestClusterShellWorker(unittest.TestCase): +class TestClusterShellWorker(object): """ClusterShell backend worker test class.""" @mock.patch('cumin.transports.clustershell.Task.task_self') - def setUp(self, task_self): # pylint: disable=arguments-differ + def setup_method(self, _, task_self): # pylint: disable=arguments-differ """Initialize default properties and instances""" self.config = { 'clustershell': { @@ -57,7 +51,7 @@ def test_instantiation(self, task_self): """An instance of ClusterShellWorker should be an instance of BaseWorker and initialize ClusterShell.""" worker = clustershell.ClusterShellWorker(self.config) - self.assertIsInstance(worker, BaseWorker) + assert isinstance(worker, BaseWorker) task_self.assert_called_once_with() worker.task.set_info.assert_has_calls( [mock.call('fanout', 3), @@ -69,7 +63,7 @@ self.worker.execute() self.worker.task.shell.assert_called_once_with( 'command1', nodes=self.nodes_set, handler=self.worker._handler_instance, timeout=None) - self.assertTrue(clustershell.DEFAULT_HANDLERS['sync'].called) + assert clustershell.DEFAULT_HANDLERS['sync'].called def test_execute_default_async_handler(self): """Calling execute() in async mode without event handler should use the default async event handler.""" @@ -77,7 +71,7 @@ self.worker.execute() self.worker.task.shell.assert_called_once_with( 'command1', nodes=self.nodes_set, handler=self.worker._handler_instance, timeout=None) - self.assertTrue(clustershell.DEFAULT_HANDLERS['async'].called) + assert clustershell.DEFAULT_HANDLERS['async'].called def test_execute_timeout(self): """Calling execute() and let the global timeout expire should call on_timeout.""" @@ -90,7 +84,7 @@ """Calling execute() using a custom handler should call ClusterShell task with the custom event handler.""" self.worker.handler = ConcreteBaseEventHandler self.worker.execute() - self.assertIsInstance(self.worker._handler_instance, ConcreteBaseEventHandler) + assert isinstance(self.worker._handler_instance, ConcreteBaseEventHandler) self.worker.task.shell.assert_called_once_with( 'command1', nodes=self.nodes_set, handler=self.worker._handler_instance, timeout=None) @@ -98,17 +92,17 @@ """Calling execute() without commands should return without doing anything.""" self.worker.commands = [] self.worker.execute() - self.assertFalse(self.worker.task.shell.called) + assert not self.worker.task.shell.called def test_execute_one_command_no_mode(self): """Calling execute() with only one command without mode should raise exception.""" self.worker.commands = [self.commands[0]] - with self.assertRaisesRegexp(RuntimeError, 'An EventHandler is mandatory.'): + with pytest.raises(RuntimeError, match=r'An EventHandler is mandatory\.'): self.worker.execute() def test_execute_wrong_mode(self): """Calling execute() without setting the mode with multiple commands should raise RuntimeError.""" - with self.assertRaisesRegexp(RuntimeError, r'An EventHandler is mandatory.'): + with pytest.raises(RuntimeError, match=r'An EventHandler is mandatory\.'): self.worker.execute() def test_execute_batch_size(self): @@ -130,14 +124,14 @@ output = None for nodes, output in self.worker.get_results(): pass - self.assertEqual(str(nodes), 'node[90-92]') - self.assertEqual(output, 'output 9') + assert str(nodes) == 'node[90-92]' + assert output == 'output 9' def test_handler_getter(self): """Access to handler property should return the handler class or None""" - self.assertIsNone(self.worker.handler) + assert self.worker.handler is None self.worker.handler = 'sync' - self.assertEqual(self.worker._handler, clustershell.DEFAULT_HANDLERS['sync']) + assert self.worker._handler == clustershell.DEFAULT_HANDLERS['sync'] def test_handler_setter_invalid(self): """Raise WorkerError if trying to set it to an invalid class or value""" @@ -146,26 +140,26 @@ pass - with self.assertRaisesRegexp(WorkerError, r'handler must be one of'): + with pytest.raises(WorkerError, match='handler must be one of'): self.worker.handler = 'invalid-handler' - with self.assertRaisesRegexp(WorkerError, r'handler must be one of'): + with pytest.raises(WorkerError, match='handler must be one of'): self.worker.handler = InvalidClass def test_handler_setter_default_sync(self): """Should set the handler to the default handler for the sync mode""" self.worker.handler = 'sync' - self.assertEqual(self.worker._handler, clustershell.DEFAULT_HANDLERS['sync']) + assert self.worker._handler == clustershell.DEFAULT_HANDLERS['sync'] def test_handler_setter_default_async(self): """Should set the handler to the default handler for the async mode""" self.worker.handler = 'async' - self.assertEqual(self.worker._handler, clustershell.DEFAULT_HANDLERS['async']) + assert self.worker._handler == clustershell.DEFAULT_HANDLERS['async'] def test_handler_setter_custom(self): """Should set the handler to the given custom class that inherit from BaseEventHandler""" self.worker.handler = ConcreteBaseEventHandler - self.assertEqual(self.worker._handler, ConcreteBaseEventHandler) + assert self.worker._handler == ConcreteBaseEventHandler @staticmethod def iter_buffers(): @@ -174,10 +168,10 @@ yield 'output {}'.format(i), ['node{}0'.format(i), 'node{}1'.format(i), 'node{}2'.format(i)] -class TestBaseEventHandler(unittest.TestCase): +class TestBaseEventHandler(object): """BaseEventHandler test class.""" - def setUp(self, *args): # pylint: disable=arguments-differ + def setup_method(self, *args): # pylint: disable=arguments-differ """Initialize default properties and instances.""" self.nodes = ['node1', 'node2'] self.commands = [Command('command1', ok_codes=[0, 100]), Command('command2', timeout=5)] @@ -192,7 +186,7 @@ def test_close(self, colorama): """Calling close should raise NotImplementedError.""" self.handler = clustershell.BaseEventHandler(self.nodes, self.commands) - with self.assertRaises(NotImplementedError): + with pytest.raises(NotImplementedError): self.handler.close(self.worker) colorama.init.assert_called_once_with() @@ -215,9 +209,9 @@ @mock.patch('cumin.transports.clustershell.colorama') @mock.patch('cumin.transports.clustershell.tqdm') - def setUp(self, tqdm, colorama): # pylint: disable=arguments-differ + def setup_method(self, _, tqdm, colorama): # pylint: disable=arguments-differ """Initialize default properties and instances.""" - super(TestConcreteBaseEventHandler, self).setUp() + super(TestConcreteBaseEventHandler, self).setup_method() self.handler = ConcreteBaseEventHandler( self.nodes, self.commands, batch_size=len(self.nodes), batch_sleep=0.0, first_batch=self.nodes) self.worker.eh = self.handler @@ -226,7 +220,7 @@ def test_instantiation(self): """An instance of ConcreteBaseEventHandler should be an instance of BaseEventHandler and initialize colorama.""" - self.assertListEqual(sorted(self.handler.nodes.keys()), self.nodes) + assert sorted(self.handler.nodes.keys()) == self.nodes self.colorama.init.assert_called_once_with() @mock.patch('cumin.transports.clustershell.tqdm') @@ -238,19 +232,19 @@ self.worker.task.num_timeout.return_value = 1 self.worker.task.iter_keys_timeout.return_value = [self.nodes[0]] - self.assertFalse(self.handler.global_timedout) + assert not self.handler.global_timedout self.handler.on_timeout(self.worker.task) - self.assertTrue(self.handler.pbar_ko.update.called) - self.assertTrue(self.handler.global_timedout) - self.assertTrue(tqdm.write.called) + assert self.handler.pbar_ko.update.called + assert self.handler.global_timedout + assert tqdm.write.called def test_ev_pickup(self): """Calling ev_pickup() should set the state of the current node to running.""" for node in self.nodes: self.worker.current_node = node self.handler.ev_pickup(self.worker) - self.assertListEqual([node for node in self.worker.eh.nodes.itervalues() if node.state.is_running], - self.worker.eh.nodes.values()) + running_nodes = [node for node in self.worker.eh.nodes.itervalues() if node.state.is_running] + assert running_nodes == self.worker.eh.nodes.values() @mock.patch('cumin.transports.clustershell.tqdm') def test_ev_read_many_hosts(self, tqdm): @@ -259,7 +253,7 @@ self.worker.current_node = node self.worker.current_msg = 'Node output' self.handler.ev_read(self.worker) - self.assertFalse(tqdm.write.called) + assert not tqdm.write.called @mock.patch('cumin.transports.clustershell.tqdm') def test_ev_read_single_host(self, tqdm): @@ -282,10 +276,10 @@ self.worker.current_node = node self.handler.ev_pickup(self.worker) - self.assertEqual(self.handler.counters['timeout'], 0) + assert self.handler.counters['timeout'] == 0 self.worker.task.num_timeout.return_value = 2 self.handler.ev_timeout(self.worker) - self.assertEqual(self.handler.counters['timeout'], 2) + assert self.handler.counters['timeout'] == 2 class TestSyncEventHandler(TestBaseEventHandler): @@ -294,9 +288,9 @@ @mock.patch('cumin.transports.clustershell.logging') @mock.patch('cumin.transports.clustershell.colorama') @mock.patch('cumin.transports.clustershell.tqdm') - def setUp(self, tqdm, colorama, logger): # pylint: disable=arguments-differ + def setup_method(self, _, tqdm, colorama, logger): # pylint: disable=arguments-differ """Initialize default properties and instances.""" - super(TestSyncEventHandler, self).setUp() + super(TestSyncEventHandler, self).setup_method() self.handler = clustershell.SyncEventHandler( self.nodes, self.commands, success_threshold=1, batch_size=len(self.nodes), batch_sleep=0, logger=None, first_batch=self.nodes) @@ -307,14 +301,14 @@ def test_instantiation(self): """An instance of SyncEventHandler should be an instance of BaseEventHandler.""" - self.assertIsInstance(self.handler, clustershell.BaseEventHandler) + assert isinstance(self.handler, clustershell.BaseEventHandler) @mock.patch('cumin.transports.clustershell.tqdm') def test_start_command_no_schedule(self, tqdm): """Calling start_command() should reset the success counter and initialize the progress bars.""" self.handler.start_command() - self.assertTrue(tqdm.called) - self.assertEqual(self.handler.counters['success'], 0) + assert tqdm.called + assert self.handler.counters['success'] == 0 @mock.patch('cumin.transports.clustershell.Task.task_self') @mock.patch('cumin.transports.clustershell.tqdm') @@ -327,24 +321,24 @@ node.state.update(clustershell.State.pending) self.handler.start_command(schedule=True) - self.assertTrue(tqdm.called) - self.assertEqual(self.handler.counters['success'], 0) - self.assertListEqual(sorted(node.name for node in self.handler.nodes.itervalues() if node.state.is_scheduled), - sorted(['node1', 'node2'])) - self.assertTrue(task_self.called) + assert tqdm.called + assert self.handler.counters['success'] == 0 + scheduled_nodes = sorted(node.name for node in self.handler.nodes.itervalues() if node.state.is_scheduled) + assert scheduled_nodes == sorted(['node1', 'node2']) + assert task_self.called @mock.patch('cumin.transports.clustershell.tqdm') def test_end_command(self, tqdm): """Calling end_command() should wrap up the command execution.""" - self.assertFalse(self.handler.end_command()) + assert not self.handler.end_command() self.handler.counters['success'] = 2 - self.assertTrue(self.handler.end_command()) + assert self.handler.end_command() self.handler.kwargs['success_threshold'] = 0.5 self.handler.counters['success'] = 1 - self.assertTrue(self.handler.end_command()) + assert self.handler.end_command() self.handler.current_command_index = 1 - self.assertFalse(self.handler.end_command()) - self.assertTrue(tqdm.write.called) + assert not self.handler.end_command() + assert tqdm.write.called @mock.patch('cumin.transports.clustershell.tqdm') def test_on_timeout(self, tqdm): @@ -352,7 +346,7 @@ self.worker.task.num_timeout.return_value = 0 self.worker.task.iter_keys_timeout.return_value = [] self.handler.on_timeout(self.worker.task) - self.assertTrue(tqdm.write.called) + assert tqdm.write.called def test_ev_timer(self): """Calling ev_timer() should schedule the execution of the next node/command.""" @@ -365,9 +359,9 @@ self.worker.current_rc = 100 self.handler.ev_pickup(self.worker) self.handler.ev_hup(self.worker) - self.assertTrue(self.handler.pbar_ok.update.called) - self.assertFalse(timer.called) - self.assertTrue(self.handler.nodes[self.worker.current_node].state.is_success) + assert self.handler.pbar_ok.update.called + assert not timer.called + assert self.handler.nodes[self.worker.current_node].state.is_success @mock.patch('cumin.transports.clustershell.Task.Task.timer') def test_ev_hup_ko(self, timer): @@ -375,16 +369,16 @@ self.worker.current_rc = 1 self.handler.ev_pickup(self.worker) self.handler.ev_hup(self.worker) - self.assertTrue(self.handler.pbar_ko.update.called) - self.assertFalse(timer.called) - self.assertTrue(self.handler.nodes[self.worker.current_node].state.is_failed) + assert self.handler.pbar_ko.update.called + assert not timer.called + assert self.handler.nodes[self.worker.current_node].state.is_failed @mock.patch('cumin.transports.clustershell.tqdm') def test_close(self, tqdm): # pylint: disable=arguments-differ """Calling close should print the report when needed.""" self.handler.current_command_index = 2 self.handler.close(self.worker) - self.assertTrue(tqdm.write.called) + assert tqdm.write.called class TestAsyncEventHandler(TestBaseEventHandler): @@ -393,9 +387,9 @@ @mock.patch('cumin.transports.clustershell.logging') @mock.patch('cumin.transports.clustershell.colorama') @mock.patch('cumin.transports.clustershell.tqdm') - def setUp(self, tqdm, colorama, logger): # pylint: disable=arguments-differ + def setup_method(self, _, tqdm, colorama, logger): # pylint: disable=arguments-differ """Initialize default properties and instances.""" - super(TestAsyncEventHandler, self).setUp() + super(TestAsyncEventHandler, self).setup_method() self.handler = clustershell.AsyncEventHandler(self.nodes, self.commands) self.worker.eh = self.handler self.tqdm = tqdm @@ -404,8 +398,8 @@ def test_instantiation(self): """An instance of AsyncEventHandler should be an instance of BaseEventHandler and initialize progress bars.""" - self.assertIsInstance(self.handler, clustershell.BaseEventHandler) - self.assertTrue(self.handler.pbar_ok.refresh.called) + assert isinstance(self.handler, clustershell.BaseEventHandler) + assert self.handler.pbar_ok.refresh.called def test_ev_hup_ok(self): """Calling ev_hup with a worker that has zero exit status should enqueue the next command.""" @@ -422,8 +416,8 @@ self.handler.ev_pickup(self.worker) self.worker.current_rc = 0 self.handler.ev_hup(self.worker) - self.assertEqual(self.handler.counters['success'], 1) - self.assertTrue(self.handler.pbar_ok.update.called) + assert self.handler.counters['success'] == 1 + assert self.handler.pbar_ok.update.called def test_ev_hup_ko(self): """Calling ev_hup with a worker that has non-zero exit status should not enqueue the next command.""" @@ -432,7 +426,7 @@ self.handler.ev_pickup(self.worker) self.worker.current_rc = 1 self.handler.ev_hup(self.worker) - self.assertTrue(self.handler.pbar_ko.update.called) + assert self.handler.pbar_ko.update.called def test_ev_timer(self): """Calling ev_timer() should schedule the execution of the next node/command.""" @@ -445,5 +439,5 @@ self.worker.task.iter_buffers = TestClusterShellWorker.iter_buffers self.worker.num_timeout.return_value = 0 self.handler.close(self.worker) - self.assertTrue(self.handler.pbar_ok.close.called) - self.assertTrue(tqdm.write.called) + assert self.handler.pbar_ok.close.called + assert tqdm.write.called diff --git a/cumin/tests/unit/transports/test_init.py b/cumin/tests/unit/transports/test_init.py index 5911c2e..970a969 100644 --- a/cumin/tests/unit/transports/test_init.py +++ b/cumin/tests/unit/transports/test_init.py @@ -1,8 +1,7 @@ """Transport tests.""" -# pylint: disable=protected-access -import unittest - +# pylint: disable=protected-access,no-self-use import mock +import pytest import cumin # noqa: F401 (dynamically used in TestCommand) @@ -30,11 +29,12 @@ self._handler = value -class TestCommand(unittest.TestCase): +class TestCommand(object): """Command class tests.""" - def setUp(self): + def setup_method(self, _): """Initialize test commands.""" + # pylint: disable=attribute-defined-outside-init command_with_options = r'command --with "options" -a -n -d params with\ spaces' self.same_command_a = transports.Command(command_with_options) self.same_command_b = r"command --with 'options' -a -n -d params with\ spaces" @@ -62,83 +62,84 @@ def test_instantiation(self): """A new Command instance should set the command property to the given command.""" for command in self.commands: - self.assertIsInstance(command['obj'], transports.Command) - self.assertEqual(command['obj'].command, command['command']) - self.assertEqual(command['obj']._timeout, command.get('timeout', None)) - self.assertEqual(command['obj']._ok_codes, command.get('ok_codes', None)) + assert isinstance(command['obj'], transports.Command) + assert command['obj'].command == command['command'] + assert command['obj']._timeout == command.get('timeout', None) + assert command['obj']._ok_codes == command.get('ok_codes', None) def test_repr(self): """A repr of a Command should allow to instantiate an instance with the same properties.""" for command in self.commands: # Bandit and pylint would require to use ast.literal_eval, but it will not work with objects command_instance = eval(repr(command['obj'])) # nosec pylint: disable=eval-used - self.assertIsInstance(command_instance, transports.Command) - self.assertEqual(repr(command_instance), repr(command['obj'])) - self.assertEqual(command_instance.command, command['obj'].command) - self.assertEqual(command_instance._timeout, command['obj']._timeout) - self.assertEqual(command_instance._ok_codes, command['obj']._ok_codes) + assert isinstance(command_instance, transports.Command) + assert repr(command_instance) == repr(command['obj']) + assert command_instance.command == command['obj'].command + assert command_instance._timeout == command['obj']._timeout + assert command_instance._ok_codes == command['obj']._ok_codes def test_str(self): """A cast to string of a Command should return its command.""" for command in self.commands: - self.assertEqual(str(command['obj']), command['command']) + assert str(command['obj']) == command['command'] def test_eq(self): """A Command instance can be compared to another or to a string with the equality operator.""" for command in self.commands: - self.assertEqual(command['obj'], transports.Command( - command['command'], timeout=command.get('timeout', None), ok_codes=command.get('ok_codes', None))) + assert command['obj'] == transports.Command( + command['command'], timeout=command.get('timeout', None), ok_codes=command.get('ok_codes', None)) if command.get('timeout', None) is None and command.get('ok_codes', None) is None: - self.assertEqual(command['obj'], command['command']) + assert command['obj'] == command['command'] - with self.assertRaisesRegexp(ValueError, 'Unable to compare instance of'): + with pytest.raises(ValueError, match='Unable to compare instance of'): command['obj'] == 1 # pylint: disable=pointless-statement - self.assertEqual(self.same_command_a, self.same_command_b) + assert self.same_command_a == self.same_command_b def test_ne(self): """A Command instance can be compared to another or to a string with the inequality operator.""" # Just a basic test, all the cases are covered by the test_eq test for command in self.commands: # Different command with same or differnt properties - self.assertNotEqual(command['obj'], transports.Command( - self.different_command, timeout=command.get('timeout', None), ok_codes=command.get('ok_codes', None))) - self.assertNotEqual(command['obj'], transports.Command( - self.different_command, timeout=999, ok_codes=command.get('ok_codes', None))) - self.assertNotEqual(command['obj'], transports.Command( - self.different_command, timeout=command.get('timeout', None), ok_codes=[99])) - self.assertNotEqual(command['obj'], transports.Command(self.different_command, timeout=999, ok_codes=[99])) - self.assertNotEqual(command['obj'], self.different_command) + assert command['obj'] != transports.Command( + self.different_command, timeout=command.get('timeout', None), ok_codes=command.get('ok_codes', None)) + assert command['obj'] != transports.Command( + self.different_command, timeout=999, ok_codes=command.get('ok_codes', None)) + assert command['obj'] != transports.Command( + self.different_command, timeout=command.get('timeout', None), ok_codes=[99]) + assert command['obj'] != transports.Command(self.different_command, timeout=999, ok_codes=[99]) + assert command['obj'] != self.different_command # Same command, properties different - self.assertNotEqual(command['obj'], transports.Command( - command['command'], timeout=999, ok_codes=command.get('ok_codes', None))) - self.assertNotEqual(command['obj'], transports.Command( - command['command'], timeout=command.get('timeout', None), ok_codes=[99])) - self.assertNotEqual(command['obj'], transports.Command(command['command'], timeout=999, ok_codes=[99])) + assert command['obj'] != transports.Command( + command['command'], timeout=999, ok_codes=command.get('ok_codes', None)) + assert command['obj'] != transports.Command( + command['command'], timeout=command.get('timeout', None), ok_codes=[99]) + assert command['obj'] != transports.Command(command['command'], timeout=999, ok_codes=[99]) if command.get('timeout', None) is not None or command.get('ok_codes', None) is not None: - self.assertNotEqual(command['obj'], command['command']) + assert command['obj'] != command['command'] - with self.assertRaisesRegexp(ValueError, 'Unable to compare instance of'): + with pytest.raises(ValueError, match='Unable to compare instance of'): command['obj'] == 1 # pylint: disable=pointless-statement def test_timeout_getter(self): """Should return the timeout set, None otherwise.""" for command in self.commands: - self.assertAlmostEqual(command['obj'].timeout, command['obj']._timeout) + if command['obj'].timeout is not None and command['obj']._timeout is not None: + assert command['obj'].timeout == pytest.approx(command['obj']._timeout) def test_timeout_setter(self): """Should set the timeout to its value, converted to float if integer. Unset it if None is passed.""" command = transports.Command('command1') command.timeout = 1.0 - self.assertAlmostEqual(command._timeout, 1.0) + assert command._timeout == pytest.approx(1.0) command.timeout = None - self.assertIsNone(command._timeout) + assert command._timeout is None command.timeout = 1 - self.assertAlmostEqual(command._timeout, 1.0) - with self.assertRaisesRegexp(transports.WorkerError, 'timeout must be a positive float'): + assert command._timeout == pytest.approx(1.0) + with pytest.raises(transports.WorkerError, match='timeout must be a positive float'): command.timeout = -1.0 def test_ok_codes_getter(self): @@ -146,93 +147,91 @@ # Test empty list command = transports.Command('command1') command.ok_codes = [] - self.assertListEqual(command.ok_codes, []) + assert command.ok_codes == [] for command in self.commands: - self.assertListEqual(command['obj'].ok_codes, command.get('ok_codes', [0])) + assert command['obj'].ok_codes == command.get('ok_codes', [0]) def test_ok_codes_setter(self): """Should set the ok_codes to its value, unset it if None is passed.""" command = transports.Command('command1') - self.assertIsNone(command._ok_codes) + assert command._ok_codes is None for i in xrange(256): codes = [i] command.ok_codes = codes - self.assertListEqual(command._ok_codes, codes) + assert command._ok_codes == codes codes.insert(0, 0) command.ok_codes = codes - self.assertListEqual(command._ok_codes, codes) + assert command._ok_codes == codes command.ok_codes = None - self.assertIsNone(command._ok_codes) + assert command._ok_codes is None command.ok_codes = [] - self.assertListEqual(command._ok_codes, []) + assert command._ok_codes == [] - with self.assertRaisesRegexp(transports.WorkerError, r'ok_codes must be a list or None'): + with pytest.raises(transports.WorkerError, match='ok_codes must be a list or None'): command.ok_codes = 'invalid_value' - message_regex = r'must be a list of integers in the range' + message = 'must be a list of integers in the range' for i in (-1, 0.0, 100.0, 256, 'invalid_value'): codes = [i] - with self.assertRaisesRegexp(transports.WorkerError, message_regex): + with pytest.raises(transports.WorkerError, match=message): command.ok_codes = codes codes.insert(0, 0) - with self.assertRaisesRegexp(transports.WorkerError, message_regex): + with pytest.raises(transports.WorkerError, match=message): command.ok_codes = codes -class TestState(unittest.TestCase): +class TestState(object): """State class tests.""" def test_instantiation_no_init(self): """A new State without an init value should start in the pending state.""" state = transports.State() - self.assertEqual(state._state, transports.State.pending) + assert state._state == transports.State.pending def test_instantiation_init_ok(self): """A new State with a valid init value should start in this state.""" state = transports.State(init=transports.State.running) - self.assertEqual(state._state, transports.State.running) + assert state._state == transports.State.running def test_instantiation_init_ko(self): """A new State with an invalid init value should raise InvalidStateError.""" - with self.assertRaisesRegexp(transports.InvalidStateError, 'is not a valid state'): + with pytest.raises(transports.InvalidStateError, match='is not a valid state'): transports.State(init='invalid_state') def test_getattr_current(self): """Accessing the 'current' property should return the current state.""" - self.assertEqual(transports.State().current, transports.State.pending) + assert transports.State().current == transports.State.pending def test_getattr_is_valid_state(self): """Accessing a property named is_{a_valid_state_name} should return a boolean.""" state = transports.State(init=transports.State.failed) - self.assertFalse(state.is_pending) - self.assertFalse(state.is_scheduled) - self.assertFalse(state.is_running) - self.assertFalse(state.is_timeout) - self.assertFalse(state.is_success) - self.assertTrue(state.is_failed) + assert not state.is_pending + assert not state.is_scheduled + assert not state.is_running + assert not state.is_timeout + assert not state.is_success + assert state.is_failed def test_getattr_invalid_property(self): """Accessing a property with an invalid name should raise AttributeError.""" state = transports.State(init=transports.State.failed) - with self.assertRaisesRegexp(AttributeError, 'object has no attribute'): + with pytest.raises(AttributeError, match='object has no attribute'): state.invalid_property # pylint: disable=pointless-statement def test_repr(self): """A State repr should return its representation that allows to recreate the same State instance.""" - self.assertEqual( - repr(transports.State()), 'cumin.transports.State(init={state})'.format(state=transports.State.pending)) + assert repr(transports.State()) == 'cumin.transports.State(init={state})'.format(state=transports.State.pending) state = transports.State.running - self.assertEqual( - repr(transports.State(init=state)), 'cumin.transports.State(init={state})'.format(state=state)) + assert repr(transports.State(init=state)) == 'cumin.transports.State(init={state})'.format(state=state) def test_str(self): """A State string should return its string representation.""" - self.assertEqual(str(transports.State()), 'pending') - self.assertEqual(str(transports.State(init=transports.State.running)), 'running') + assert str(transports.State()) == 'pending' + assert str(transports.State(init=transports.State.running)) == 'running' def test_cmp_state(self): """Two State instance can be compared between each other.""" @@ -240,14 +239,14 @@ greater_state = transports.State(init=transports.State.failed) same_state = transports.State() - self.assertGreater(greater_state, state) - self.assertGreaterEqual(greater_state, state) - self.assertGreaterEqual(same_state, state) - self.assertLess(state, greater_state) - self.assertLessEqual(state, greater_state) - self.assertLessEqual(state, same_state) - self.assertEqual(state, same_state) - self.assertNotEqual(state, greater_state) + assert greater_state > state + assert greater_state >= state + assert same_state >= state + assert state < greater_state + assert state <= greater_state + assert state <= same_state + assert state == same_state + assert state != greater_state def test_cmp_int(self): """A State instance can be compared with integers.""" @@ -255,56 +254,56 @@ greater_state = transports.State.running same_state = transports.State.pending - self.assertGreater(greater_state, state) - self.assertGreaterEqual(greater_state, state) - self.assertGreaterEqual(same_state, state) - self.assertLess(state, greater_state) - self.assertLessEqual(state, greater_state) - self.assertLessEqual(state, same_state) - self.assertEqual(state, same_state) - self.assertNotEqual(state, greater_state) + assert greater_state > state + assert greater_state >= state + assert same_state >= state + assert state < greater_state + assert state <= greater_state + assert state <= same_state + assert state == same_state + assert state != greater_state def test_cmp_invalid(self): """Trying to compare a State instance with an invalid object should raise ValueError.""" state = transports.State() invalid_state = 'invalid_state' - with self.assertRaisesRegexp(ValueError, 'Unable to compare instance'): - self.assertEqual(state, invalid_state) + with pytest.raises(ValueError, match='Unable to compare instance'): + state == invalid_state # pylint: disable=pointless-statement def test_update_invalid_state(self): """Trying to update a State with an invalid value should raise ValueError.""" state = transports.State() - with self.assertRaisesRegexp(ValueError, 'State must be one of'): + with pytest.raises(ValueError, match='State must be one of'): state.update('invalid_state') def test_update_invalid_transition(self): """Trying to update a State with an invalid transition should raise StateTransitionError.""" state = transports.State() - with self.assertRaisesRegexp(transports.StateTransitionError, 'the allowed states are'): + with pytest.raises(transports.StateTransitionError, match='the allowed states are'): state.update(transports.State.failed) def test_update_ok(self): """Properly updating a State should update it without errors.""" state = transports.State() state.update(transports.State.scheduled) - self.assertEqual(state.current, transports.State.scheduled) + assert state.current == transports.State.scheduled state.update(transports.State.running) - self.assertEqual(state.current, transports.State.running) + assert state.current == transports.State.running state.update(transports.State.success) - self.assertEqual(state.current, transports.State.success) + assert state.current == transports.State.success state.update(transports.State.pending) - self.assertEqual(state.current, transports.State.pending) + assert state.current == transports.State.pending -class TestBaseWorker(unittest.TestCase): +class TestBaseWorker(object): """Concrete BaseWorker class for tests.""" def test_instantiation(self): """Raise if instantiated directly, should return an instance of BaseWorker if inherited.""" - with self.assertRaises(TypeError): + with pytest.raises(TypeError): transports.BaseWorker({}) # pylint: disable=abstract-class-instantiated - self.assertIsInstance(ConcreteBaseWorker({}), transports.BaseWorker) + assert isinstance(ConcreteBaseWorker({}), transports.BaseWorker) @mock.patch.dict(transports.os.environ, {}, clear=True) def test_init(self): @@ -313,141 +312,142 @@ config = {'transport': 'test_transport', 'environment': env_dict} - self.assertEqual(transports.os.environ, {}) + assert transports.os.environ == {} worker = ConcreteBaseWorker(config) - self.assertEqual(transports.os.environ, env_dict) - self.assertEqual(worker.config, config) + assert transports.os.environ == env_dict + assert worker.config == config -class TestConcreteBaseWorker(unittest.TestCase): +class TestConcreteBaseWorker(object): """BaseWorker test class.""" - def setUp(self): + def setup_method(self, _): """Initialize default properties and instances.""" + # pylint: disable=attribute-defined-outside-init self.worker = ConcreteBaseWorker({}) self.hosts = ['node1', 'node2'] self.commands = [transports.Command('command1'), transports.Command('command2')] def test_hosts_getter(self): """Access to hosts property should return an empty list if not set and the list of hosts otherwise.""" - self.assertListEqual(self.worker.hosts, []) + assert self.worker.hosts == [] self.worker._hosts = self.hosts - self.assertListEqual(self.worker.hosts, self.hosts) + assert self.worker.hosts == self.hosts def test_hosts_setter(self): """Raise WorkerError if trying to set it not to an iterable, set it otherwise.""" - with self.assertRaisesRegexp(transports.WorkerError, r'hosts must be a list'): + with pytest.raises(transports.WorkerError, match='hosts must be a list'): self.worker.hosts = 'not-list' self.worker.hosts = self.hosts - self.assertListEqual(self.worker._hosts, self.hosts) + assert self.worker._hosts == self.hosts def test_commands_getter(self): """Access to commands property should return an empty list if not set and the list of commands otherwise.""" - self.assertListEqual(self.worker.commands, []) + assert self.worker.commands == [] self.worker._commands = self.commands - self.assertListEqual(self.worker.commands, self.commands) + assert self.worker.commands == self.commands self.worker._commands = None - self.assertListEqual(self.worker.commands, []) + assert self.worker.commands == [] def test_commands_setter(self): """Raise WorkerError if trying to set it not to a list, set it otherwise.""" - with self.assertRaisesRegexp(transports.WorkerError, r'commands must be a list'): + with pytest.raises(transports.WorkerError, match='commands must be a list'): self.worker.commands = 'invalid_value' - with self.assertRaisesRegexp(transports.WorkerError, r'commands must be a list of Command objects or strings'): + with pytest.raises(transports.WorkerError, match='commands must be a list of Command objects'): self.worker.commands = [1, 'command2'] self.worker.commands = self.commands - self.assertListEqual(self.worker._commands, self.commands) + assert self.worker._commands == self.commands self.worker.commands = None - self.assertIsNone(self.worker._commands) + assert self.worker._commands is None self.worker.commands = ['command1', 'command2'] - self.assertListEqual(self.worker._commands, self.commands) + assert self.worker._commands == self.commands def test_timeout_getter(self): """Return default value if not set, the value otherwise.""" - self.assertEqual(self.worker.timeout, 0) + assert self.worker.timeout == 0 self.worker._timeout = 10 - self.assertEqual(self.worker.timeout, 10) + assert self.worker.timeout == 10 def test_timeout_setter(self): """Raise WorkerError if not a positive integer, set it otherwise.""" message = r'timeout must be a positive integer' - with self.assertRaisesRegexp(transports.WorkerError, message): + with pytest.raises(transports.WorkerError, match=message): self.worker.timeout = -1 - with self.assertRaisesRegexp(transports.WorkerError, message): + with pytest.raises(transports.WorkerError, match=message): self.worker.timeout = 0 self.worker.timeout = 10 - self.assertEqual(self.worker._timeout, 10) + assert self.worker._timeout == 10 self.worker.timeout = None - self.assertEqual(self.worker._timeout, None) + assert self.worker._timeout is None def test_success_threshold_getter(self): """Return default value if not set, the value otherwise.""" - self.assertAlmostEqual(self.worker.success_threshold, 1.0) + assert self.worker.success_threshold == pytest.approx(1.0) for success_threshold in (0.0, 0.0001, 0.5, 0.99): self.worker._success_threshold = success_threshold - self.assertAlmostEqual(self.worker.success_threshold, success_threshold) + assert self.worker.success_threshold == pytest.approx(success_threshold) def test_success_threshold_setter(self): """Raise WorkerError if not float between 0 and 1, set it otherwise.""" message = r'success_threshold must be a float beween 0 and 1' - with self.assertRaisesRegexp(transports.WorkerError, message): + with pytest.raises(transports.WorkerError, match=message): self.worker.success_threshold = 1 - with self.assertRaisesRegexp(transports.WorkerError, message): + with pytest.raises(transports.WorkerError, match=message): self.worker.success_threshold = -0.1 self.worker.success_threshold = 0.3 - self.assertAlmostEqual(self.worker._success_threshold, 0.3) + assert self.worker._success_threshold == pytest.approx(0.3) def test_batch_size_getter(self): """Return default value if not set, the value otherwise.""" self.worker.hosts = self.hosts - self.assertEqual(self.worker.batch_size, len(self.hosts)) + assert self.worker.batch_size == len(self.hosts) self.worker._batch_size = 1 - self.assertEqual(self.worker.batch_size, 1) + assert self.worker.batch_size == 1 def test_batch_size_setter(self): """Raise WorkerError if not positive integer, set it otherwise forcing it to len(hosts) if greater.""" - with self.assertRaisesRegexp(transports.WorkerError, r'batch_size must be a positive integer'): + with pytest.raises(transports.WorkerError, match='batch_size must be a positive integer'): self.worker.batch_size = -1 - with self.assertRaisesRegexp(transports.WorkerError, r'batch_size must be a positive integer'): + with pytest.raises(transports.WorkerError, match='batch_size must be a positive integer'): self.worker.batch_size = 0 self.worker.hosts = self.hosts self.worker.batch_size = 10 - self.assertEqual(self.worker._batch_size, len(self.hosts)) + assert self.worker._batch_size == len(self.hosts) self.worker.batch_size = 1 - self.assertEqual(self.worker._batch_size, 1) + assert self.worker._batch_size == 1 def test_batch_sleep_getter(self): """Return default value if not set, the value otherwise.""" - self.assertAlmostEqual(self.worker.batch_sleep, 0.0) + assert self.worker.batch_sleep == pytest.approx(0.0) self.worker._batch_sleep = 10.0 - self.assertAlmostEqual(self.worker.batch_sleep, 10.0) + assert self.worker.batch_sleep == pytest.approx(10.0) def test_batch_sleep_setter(self): """Raise WorkerError if not positive integer, set it otherwise.""" message = r'batch_sleep must be a positive float' - with self.assertRaisesRegexp(transports.WorkerError, message): + with pytest.raises(transports.WorkerError, match=message): self.worker.batch_sleep = 1 - with self.assertRaisesRegexp(transports.WorkerError, message): + with pytest.raises(transports.WorkerError, match=message): self.worker.batch_sleep = '1' - with self.assertRaisesRegexp(transports.WorkerError, message): + with pytest.raises(transports.WorkerError, match=message): self.worker.batch_sleep = -1.0 self.worker.batch_sleep = 10.0 - self.assertAlmostEqual(self.worker._batch_sleep, 10.0) + assert self.worker._batch_sleep == pytest.approx(10.0) -class TestModuleFunctions(unittest.TestCase): +class TestModuleFunctions(object): """Transports module functions test class.""" def test_validate_list(self): @@ -458,9 +458,9 @@ transports.validate_list('Test', ['value1', 'value2']) message = r'Test must be a list' - func = transports.validate_list for invalid_value in (0, 'invalid_value', {'invalid': 'value'}): - self.assertRaisesRegexp(transports.WorkerError, message, func, 'Test', invalid_value) + with pytest.raises(transports.WorkerError, match=message): + transports.validate_list('Test', invalid_value) def test_validate_positive_integer(self): """Should raise a WorkerError if the argument is not a positive integer or None.""" @@ -469,11 +469,11 @@ transports.validate_positive_integer('Test', 100) message = r'Test must be a positive integer' - func = transports.validate_positive_integer for invalid_value in (0, -1, 'invalid_value', ['invalid_value']): - self.assertRaisesRegexp(transports.WorkerError, message, func, 'Test', invalid_value) + with pytest.raises(transports.WorkerError, match=message): + transports.validate_positive_integer('Test', invalid_value) def test_raise_error(self): """Should raise a WorkerError.""" - with self.assertRaisesRegexp(transports.WorkerError, 'Test message'): + with pytest.raises(transports.WorkerError, match='Test message'): transports.raise_error('Test', 'message', 'value') -- To view, visit https://gerrit.wikimedia.org/r/361274 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: Ia75f32b212e36d613c784765a52f1317ac690810 Gerrit-PatchSet: 1 Gerrit-Project: operations/software/cumin Gerrit-Branch: master Gerrit-Owner: Volans <rcocci...@wikimedia.org> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits