http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/token_generator_test.py ---------------------------------------------------------------------- diff --git a/token_generator_test.py b/token_generator_test.py index c4846e6..b6e9025 100644 --- a/token_generator_test.py +++ b/token_generator_test.py @@ -1,15 +1,18 @@ -# coding: utf-8 import os import subprocess import time - +import pytest import parse +import logging + from cassandra.util import sortedset from ccmlib import common -from dtest import DISABLE_VNODES, Tester, debug +from dtest import Tester from tools.data import rows_to_list -from tools.decorators import since + +since = pytest.mark.since +logger = logging.getLogger(__name__) @since('2.0.16', max_version='3.0.0') @@ -36,7 +39,7 @@ class TestTokenGenerator(Tester): for n in nodes: args.append(str(n)) - debug('Invoking {}'.format(args)) + logger.debug('Invoking {}'.format(args)) token_gen_output = subprocess.check_output(args) lines = token_gen_output.split("\n") dc_tokens = None @@ -44,19 +47,19 @@ class TestTokenGenerator(Tester): for line in lines: if line.startswith("DC #"): if dc_tokens is not None: - self.assertGreater(dc_tokens.__len__(), 0, "dc_tokens is empty from token-generator {}".format(args)) + assert dc_tokens.__len__(), 0 > "dc_tokens is empty from token-generator {}".format(args) generated_tokens.append(dc_tokens) dc_tokens = [] else: if line: m = parse.search('Node #{node_num:d}:{:s}{node_token:d}', line) - self.assertIsNotNone(m, "Line \"{}\" does not match pattern from token-generator {}".format(line, args)) + assert m, "Line \"{}\" does not match pattern from token-generator {}".format(line is not None, args) node_num = int(m.named['node_num']) node_token = int(m.named['node_token']) dc_tokens.append(node_token) - self.assertEqual(node_num, dc_tokens.__len__(), "invalid token count from token-generator {}".format(args)) - self.assertIsNotNone(dc_tokens, "No tokens from token-generator {}".format(args)) - self.assertGreater(dc_tokens.__len__(), 0, "No tokens from token-generator {}".format(args)) + assert node_num, dc_tokens.__len__() == "invalid token count from token-generator {}".format(args) + assert dc_tokens is not None, "No tokens from token-generator {}".format(args) + assert dc_tokens.__len__(), 0 > "No tokens from token-generator {}".format(args) generated_tokens.append(dc_tokens) return generated_tokens @@ -77,10 +80,10 @@ class TestTokenGenerator(Tester): # remove these from cluster options - otherwise node's config would be overridden with cluster._config_options_ cluster._config_options.__delitem__('num_tokens') - if not DISABLE_VNODES: + if self.dtest_config.use_vnodes: cluster._config_options.__delitem__('initial_token') - self.assertTrue(not cluster.nodelist(), "nodelist() already initialized") + assert not cluster.nodelist(), "nodelist() already initialized" cluster.populate(nodes, use_vnodes=False, tokens=generated_tokens[0]).start(wait_for_binary_proto=True) time.sleep(0.2) @@ -95,22 +98,22 @@ class TestTokenGenerator(Tester): tokens = [] local_tokens = rows_to_list(session.execute("SELECT tokens FROM system.local"))[0] - self.assertEqual(local_tokens.__len__(), 1, "too many tokens for peer") + assert local_tokens.__len__(), 1 == "too many tokens for peer" for tok in local_tokens: tokens += tok rows = rows_to_list(session.execute("SELECT tokens FROM system.peers")) - self.assertEqual(rows.__len__(), nodes - 1) + assert rows.__len__() == nodes - 1 for row in rows: peer_tokens = row[0] - self.assertEqual(peer_tokens.__len__(), 1, "too many tokens for peer") + assert peer_tokens.__len__(), 1 == "too many tokens for peer" for tok in peer_tokens: tokens.append(tok) - self.assertEqual(tokens.__len__(), dc_tokens.__len__()) + assert tokens.__len__() == dc_tokens.__len__() for cluster_token in tokens: tok = int(cluster_token) - self.assertGreaterEqual(dc_tokens.index(tok), 0, "token in cluster does not match generated tokens") + assert dc_tokens.index(tok), 0 >= "token in cluster does not match generated tokens" def token_gen_def_test(self, nodes=3): """ Validate token-generator with Murmur3Partitioner with default token-generator behavior """ @@ -148,23 +151,23 @@ class TestTokenGenerator(Tester): all_tokens = sortedset() node_count = 0 generated_tokens = self.call_token_generator(self.cluster.get_install_dir(), random, dc_nodes) - self.assertEqual(dc_nodes.__len__(), generated_tokens.__len__()) + assert dc_nodes.__len__() == generated_tokens.__len__() for n in range(0, dc_nodes.__len__()): nodes = dc_nodes[n] node_count += nodes tokens = generated_tokens[n] - self.assertEqual(nodes, tokens.__len__()) + assert nodes == tokens.__len__() for tok in tokens: - self.assertTrue(t_min <= tok < t_max, "Generated token %r out of Murmur3Partitioner range %r..%r" % (tok, t_min, t_max - 1)) - self.assertTrue(not all_tokens.__contains__(tok), "Duplicate token %r for nodes-counts %r" % (tok, dc_nodes)) + assert t_min <= tok < t_max, "Generated token %r out of Murmur3Partitioner range %r..%r" % (tok, t_min, t_max - 1) + assert not all_tokens.__contains__(tok), "Duplicate token %r for nodes-counts %r" % (tok, dc_nodes) all_tokens.add(tok) - self.assertEqual(all_tokens.__len__(), node_count, "Number of tokens %r and number of nodes %r does not match for %r" % (all_tokens.__len__(), node_count, dc_nodes)) + assert all_tokens.__len__() == node_count, "Number of tokens %r and number of nodes %r does not match for %r" % (all_tokens.__len__(), node_count, dc_nodes) - def multi_dc_tokens_default_test(self): + def test_multi_dc_tokens_default(self): self._multi_dc_tokens() - def multi_dc_tokens_murmur3_test(self): + def test_multi_dc_tokens_murmur3(self): self._multi_dc_tokens(False) - def multi_dc_tokens_random_test(self): + def test_multi_dc_tokens_random(self): self._multi_dc_tokens(True)
http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/tools/assertions.py ---------------------------------------------------------------------- diff --git a/tools/assertions.py b/tools/assertions.py index 2e88067..864a4df 100644 --- a/tools/assertions.py +++ b/tools/assertions.py @@ -1,18 +1,16 @@ - import re from time import sleep +from tools.misc import list_to_hashed_dict from cassandra import (InvalidRequest, ReadFailure, ReadTimeout, Unauthorized, Unavailable, WriteFailure, WriteTimeout) from cassandra.query import SimpleStatement -from nose.tools import (assert_equal, assert_false, assert_regexp_matches, - assert_true) """ -The assertion methods in this file are used to structure, execute, and test different queries and scenarios. Use these anytime you are trying -to check the content of a table, the row count of a table, if a query should raise an exception, etc. These methods handle error messaging -well, and will help discovering and treating bugs. +The assertion methods in this file are used to structure, execute, and test different queries and scenarios. +Use these anytime you are trying to check the content of a table, the row count of a table, if a query should +raise an exception, etc. These methods handle error messaging well, and will help discovering and treating bugs. An example: Imagine some table, test: @@ -57,7 +55,8 @@ def _assert_exception(fun, *args, **kwargs): fun(*args) except expected as e: if matching is not None: - assert_regexp_matches(str(e), matching) + regex = re.compile(matching) + assert regex.match(repr(e)) is None except Exception as e: raise e else: @@ -73,13 +72,14 @@ def assert_exception(session, query, matching=None, expected=None): def assert_unavailable(fun, *args): """ - Attempt to execute a function, and assert Unavailable, WriteTimeout, WriteFailure, ReadTimeout, or ReadFailure exception is raised. + Attempt to execute a function, and assert Unavailable, WriteTimeout, WriteFailure, + ReadTimeout, or ReadFailure exception is raised. @param fun Function to be executed @param *args Arguments to be passed to the function Examples: assert_unavailable(session2.execute, "SELECT * FROM ttl_table;") - assert_unavailable(lambda c: debug(c.execute(statement)), session) + assert_unavailable(lambda c: logger.debug(c.execute(statement)), session) """ _assert_exception(fun, *args, expected=(Unavailable, WriteTimeout, WriteFailure, ReadTimeout, ReadFailure)) @@ -106,8 +106,10 @@ def assert_unauthorized(session, query, message): @param message Expected error message Examples: - assert_unauthorized(session, "ALTER USER cassandra NOSUPERUSER", "You aren't allowed to alter your own superuser status") - assert_unauthorized(cathy, "ALTER TABLE ks.cf ADD val int", "User cathy has no ALTER permission on <table ks.cf> or any of its parents") + assert_unauthorized(session, "ALTER USER cassandra NOSUPERUSER", + "You aren't allowed to alter your own superuser status") + assert_unauthorized(cathy, "ALTER TABLE ks.cf ADD val int", + "User cathy has no ALTER permission on <table ks.cf> or any of its parents") """ assert_exception(session, query, matching=message, expected=Unauthorized) @@ -165,8 +167,8 @@ def assert_all(session, query, expected, cl=None, ignore_order=False, timeout=No res = session.execute(simple_query) if timeout is None else session.execute(simple_query, timeout=timeout) list_res = _rows_to_list(res) if ignore_order: - expected = sorted(expected) - list_res = sorted(list_res) + expected = list_to_hashed_dict(expected) + list_res = list_to_hashed_dict(list_res) assert list_res == expected, "Expected {} from {}, but got {}".format(expected, query, list_res) @@ -185,16 +187,17 @@ def assert_almost_equal(*args, **kwargs): vmax = max(args) vmin = min(args) error_message = '' if 'error_message' not in kwargs else kwargs['error_message'] - assert vmin > vmax * (1.0 - error) or vmin == vmax, "values not within {:.2f}% of the max: {} ({})".format(error * 100, args, error_message) + assert vmin > vmax * (1.0 - error) or vmin == vmax, \ + "values not within {:.2f}% of the max: {} ({})".format(error * 100, args, error_message) def assert_row_count(session, table_name, expected, where=None): """ Assert the number of rows in a table matches expected. - @params session Session to use + @param session Session to use @param table_name Name of the table to query @param expected Number of rows expected to be in table - + @param where string to append to CQL select query as where clause Examples: assert_row_count(self.session1, 'ttl_table', 1) """ @@ -214,6 +217,7 @@ def assert_crc_check_chance_equal(session, table, expected, ks="ks", view=False) Assert crc_check_chance equals expected for a given table or view @param session Session to use @param table Name of the table or view to check + @param expected Expected value to assert on that query result matches @param ks Optional Name of the keyspace @param view Optional Boolean flag indicating if the table is a view @@ -226,13 +230,13 @@ def assert_crc_check_chance_equal(session, table, expected, ks="ks", view=False) """ if view: assert_one(session, - "SELECT crc_check_chance from system_schema.views WHERE keyspace_name = 'ks' AND " - "view_name = '{table}';".format(table=table), + "SELECT crc_check_chance from system_schema.views WHERE keyspace_name = '{keyspace}' AND " + "view_name = '{table}';".format(keyspace=ks, table=table), [expected]) else: assert_one(session, - "SELECT crc_check_chance from system_schema.tables WHERE keyspace_name = 'ks' AND " - "table_name = '{table}';".format(table=table), + "SELECT crc_check_chance from system_schema.tables WHERE keyspace_name = '{keyspace}' AND " + "table_name = '{table}';".format(keyspace=ks, table=table), [expected]) @@ -245,9 +249,9 @@ def assert_length_equal(object_with_length, expected_length): Examples: assert_length_equal(res, nb_counter) """ - assert_equal(len(object_with_length), expected_length, - "Expected {} to have length {}, but instead is of length {}".format(object_with_length, - expected_length, len(object_with_length))) + assert len(object_with_length) == expected_length, \ + "Expected {} to have length {}, but instead is of length {}"\ + .format(object_with_length, expected_length, len(object_with_length)) def assert_not_running(node): @@ -260,7 +264,7 @@ def assert_not_running(node): sleep(1) attempts = attempts + 1 - assert_false(node.is_running()) + assert not node.is_running() def assert_read_timeout_or_failure(session, query): @@ -281,9 +285,15 @@ def assert_stderr_clean(err, acceptable_errors=None): "Failed to connect over JMX; not collecting these stats"] regex_str = "^({}|\s*|\n)*$".format("|".join(acceptable_errors)) - match = re.search(regex_str, err) + err_str = err.decode("utf-8").strip() + # empty string, as good as we can get for a clean stderr output! + if not err_str: + return - assert_true(match, "Attempted to check that stderr was empty. Instead, stderr is {}, but the regex used to check against stderr is {}".format(err, regex_str)) + match = re.search(regex_str, err_str) + + assert match, "Attempted to check that stderr was empty. Instead, stderr is {}, but the regex used to check " \ + "stderr is {}".format(err_str, regex_str) def assert_bootstrap_state(tester, node, expected_bootstrap_state): @@ -298,3 +308,41 @@ def assert_bootstrap_state(tester, node, expected_bootstrap_state): """ session = tester.patient_exclusive_cql_connection(node) assert_one(session, "SELECT bootstrapped FROM system.local WHERE key='local'", [expected_bootstrap_state]) + session.shutdown() + + +def assert_lists_equal_ignoring_order(list1, list2, sort_key=None): + """ + asserts that the contents of the two provided lists are equal + but ignoring the order that the items of the lists are actually in + :param list1: list to check if it's contents are equal to list2 + :param list2: list to check if it's contents are equal to list1 + :param sort_key: if the contents of the list are of type dict, the + key to use of each object to sort the overall object with + """ + normalized_list1 = [] + for obj in list1: + normalized_list1.append(obj) + + normalized_list2 = [] + for obj in list2: + normalized_list2.append(obj) + + if not sort_key: + sorted_list1 = sorted(normalized_list1, key=lambda elm: elm[0]) + sorted_list2 = sorted(normalized_list2, key=lambda elm: elm[0]) + else: + # first always sort by "id" + # that way we get a two factor sort which will increase the chance of ordering lists exactly the same + if not sort_key == 'id' and 'id' in list1[0].keys(): + sorted_list1 = sorted(sorted(normalized_list1, key=lambda elm: elm["id"]), key=lambda elm: elm[sort_key]) + sorted_list2 = sorted(sorted(normalized_list2, key=lambda elm: elm["id"]), key=lambda elm: elm[sort_key]) + else: + if isinstance(list1[0]['id'], (int, float)): + sorted_list1 = sorted(normalized_list1, key=lambda elm: elm[sort_key]) + sorted_list2 = sorted(normalized_list2, key=lambda elm: elm[sort_key]) + else: + sorted_list1 = sorted(normalized_list1, key=lambda elm: str(elm[sort_key])) + sorted_list2 = sorted(normalized_list2, key=lambda elm: str(elm[sort_key])) + + assert sorted_list1 == sorted_list2 http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/tools/context.py ---------------------------------------------------------------------- diff --git a/tools/context.py b/tools/context.py index a17a64c..0b39534 100644 --- a/tools/context.py +++ b/tools/context.py @@ -5,8 +5,6 @@ making those context managers function. import logging from contextlib import contextmanager -from six import print_ - from tools.env import ALLOW_NOISY_LOGGING @@ -22,7 +20,7 @@ def log_filter(log_id, expected_strings=None): logger.addFilter(log_filter) yield if log_filter.records_silenced > 0: - print_("Logs were filtered to remove messages deemed unimportant, total count: {}".format(log_filter.records_silenced)) + print("Logs were filtered to remove messages deemed unimportant, total count: %d" % log_filter.records_silenced) logger.removeFilter(log_filter) http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/tools/data.py ---------------------------------------------------------------------- diff --git a/tools/data.py b/tools/data.py index 2f86166..d5607e0 100644 --- a/tools/data.py +++ b/tools/data.py @@ -1,14 +1,16 @@ import time +import logging from cassandra import ConsistencyLevel from cassandra.concurrent import execute_concurrent_with_args from cassandra.query import SimpleStatement -from nose.tools import assert_equal, assert_true -import assertions -from dtest import debug, create_cf, DtestTimeoutError +from . import assertions +from dtest import create_cf, DtestTimeoutError from tools.funcutils import get_rate_limited_function +logger = logging.getLogger(__name__) + def create_c1c2_table(tester, session, read_repair=None): create_cf(session, 'cf', columns={'c1': 'text', 'c2': 'text'}, read_repair=read_repair) @@ -33,13 +35,13 @@ def query_c1c2(session, key, consistency=ConsistencyLevel.QUORUM, tolerate_missi if not tolerate_missing: assertions.assert_length_equal(rows, 1) res = rows[0] - assert_true(len(res) == 2 and res[0] == 'value1' and res[1] == 'value2', res) + assert len(res) == 2 and res[0] == 'value1' and res[1] == 'value2', res if must_be_missing: assertions.assert_length_equal(rows, 0) def insert_columns(tester, session, key, columns_count, consistency=ConsistencyLevel.QUORUM, offset=0): - upds = ["UPDATE cf SET v=\'value%d\' WHERE key=\'k%s\' AND c=\'c%06d\'" % (i, key, i) for i in xrange(offset * columns_count, columns_count * (offset + 1))] + upds = ["UPDATE cf SET v=\'value%d\' WHERE key=\'k%s\' AND c=\'c%06d\'" % (i, key, i) for i in range(offset * columns_count, columns_count * (offset + 1))] query = 'BEGIN BATCH %s; APPLY BATCH' % '; '.join(upds) simple_query = SimpleStatement(query, consistency_level=consistency) session.execute(simple_query) @@ -49,8 +51,8 @@ def query_columns(tester, session, key, columns_count, consistency=ConsistencyLe query = SimpleStatement('SELECT c, v FROM cf WHERE key=\'k%s\' AND c >= \'c%06d\' AND c <= \'c%06d\'' % (key, offset, columns_count + offset - 1), consistency_level=consistency) res = list(session.execute(query)) assertions.assert_length_equal(res, columns_count) - for i in xrange(0, columns_count): - assert_equal(res[i][1], 'value{}'.format(i + offset)) + for i in range(0, columns_count): + assert res[i][1] == 'value{}'.format(i + offset) # Simple puts and get (on one row), testing both reads by names and by slice, @@ -74,20 +76,20 @@ def putget(cluster, session, cl=ConsistencyLevel.QUORUM): def _put_with_overwrite(cluster, session, nb_keys, cl=ConsistencyLevel.QUORUM): - for k in xrange(0, nb_keys): - kvs = ["UPDATE cf SET v=\'value%d\' WHERE key=\'k%s\' AND c=\'c%02d\'" % (i, k, i) for i in xrange(0, 100)] + for k in range(0, nb_keys): + kvs = ["UPDATE cf SET v=\'value%d\' WHERE key=\'k%s\' AND c=\'c%02d\'" % (i, k, i) for i in range(0, 100)] query = SimpleStatement('BEGIN BATCH %s APPLY BATCH' % '; '.join(kvs), consistency_level=cl) session.execute(query) time.sleep(.01) cluster.flush() - for k in xrange(0, nb_keys): - kvs = ["UPDATE cf SET v=\'value%d\' WHERE key=\'k%s\' AND c=\'c%02d\'" % (i * 4, k, i * 2) for i in xrange(0, 50)] + for k in range(0, nb_keys): + kvs = ["UPDATE cf SET v=\'value%d\' WHERE key=\'k%s\' AND c=\'c%02d\'" % (i * 4, k, i * 2) for i in range(0, 50)] query = SimpleStatement('BEGIN BATCH %s APPLY BATCH' % '; '.join(kvs), consistency_level=cl) session.execute(query) time.sleep(.01) cluster.flush() - for k in xrange(0, nb_keys): - kvs = ["UPDATE cf SET v=\'value%d\' WHERE key=\'k%s\' AND c=\'c%02d\'" % (i * 20, k, i * 5) for i in xrange(0, 20)] + for k in range(0, nb_keys): + kvs = ["UPDATE cf SET v=\'value%d\' WHERE key=\'k%s\' AND c=\'c%02d\'" % (i * 20, k, i * 5) for i in range(0, 20)] query = SimpleStatement('BEGIN BATCH %s APPLY BATCH' % '; '.join(kvs), consistency_level=cl) session.execute(query) time.sleep(.01) @@ -96,13 +98,13 @@ def _put_with_overwrite(cluster, session, nb_keys, cl=ConsistencyLevel.QUORUM): def _validate_row(cluster, res): assertions.assert_length_equal(res, 100) - for i in xrange(0, 100): + for i in range(0, 100): if i % 5 == 0: - assert_equal(res[i][2], 'value{}'.format(i * 4), 'for {}, expecting value{}, got {}'.format(i, i * 4, res[i][2])) + assert res[i][2] == 'value{}'.format(i * 4), 'for {}, expecting value{}, got {}'.format(i, i * 4, res[i][2]) elif i % 2 == 0: - assert_equal(res[i][2], 'value{}'.format(i * 2), 'for {}, expecting value{}, got {}'.format(i, i * 2, res[i][2])) + assert res[i][2] == 'value{}'.format(i * 2), 'for {}, expecting value{}, got {}'.format(i, i * 2, res[i][2]) else: - assert_equal(res[i][2], 'value{}'.format(i), 'for {}, expecting value{}, got {}'.format(i, i, res[i][2])) + assert res[i][2] == 'value{}'.format(i), 'for {}, expecting value{}, got {}'.format(i, i, res[i][2]) # Simple puts and range gets, with overwrites and flushes between inserts to @@ -116,7 +118,7 @@ def range_putget(cluster, session, cl=ConsistencyLevel.QUORUM): rows = [result for result in paged_results] assertions.assert_length_equal(rows, keys * 100) - for k in xrange(0, keys): + for k in range(0, keys): res = rows[:100] del rows[:100] _validate_row(cluster, res) @@ -158,9 +160,9 @@ def block_until_index_is_built(node, session, keyspace, table_name, idx_name): DtestTimeoutError if it is not. """ start = time.time() - rate_limited_debug = get_rate_limited_function(debug, 5) + rate_limited_debug_logger = get_rate_limited_function(logger.debug, 5) while time.time() < start + 30: - rate_limited_debug("waiting for index to build") + rate_limited_debug_logger("waiting for index to build") time.sleep(1) if index_is_built(node, session, keyspace, table_name, idx_name): break http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/tools/datahelp.py ---------------------------------------------------------------------- diff --git a/tools/datahelp.py b/tools/datahelp.py index 891fac7..86afc03 100644 --- a/tools/datahelp.py +++ b/tools/datahelp.py @@ -35,13 +35,13 @@ def parse_headers_into_list(data): # throw out leading/trailing space and pipes # so we can split on the data without getting # extra empty fields - rows = map(strip, data.split('\n')) + rows = list(map(strip, data.split('\n'))) # remove any remaining empty lines (i.e. '') from data - rows = filter(None, rows) + rows = [_f for _f in rows if _f] # separate headers from actual data and remove extra spaces from them - headers = [unicode(h.strip()) for h in rows.pop(0).split('|')] + headers = [str(h.strip()) for h in rows.pop(0).split('|')] return headers @@ -77,10 +77,10 @@ def parse_row_into_dict(row, headers, format_funcs=None): ) return multirows - row_map = dict(zip(headers, row_cells)) + row_map = dict(list(zip(headers, row_cells))) if format_funcs: - for colname, value in row_map.items(): + for colname, value in list(row_map.items()): func = format_funcs.get(colname) if func is not None: @@ -110,10 +110,10 @@ def parse_data_into_dicts(data, format_funcs=None): # throw out leading/trailing space and pipes # so we can split on the data without getting # extra empty fields - rows = map(strip, data.split('\n')) + rows = list(map(strip, data.split('\n'))) # remove any remaining empty/decoration lines (i.e. '') from data - rows = filter(row_describes_data, rows) + rows = list(filter(row_describes_data, rows)) # remove headers headers = parse_headers_into_list(rows.pop(0)) @@ -149,13 +149,13 @@ def create_rows(data, session, table_name, cl=None, format_funcs=None, prefix='' # use the first dictionary to build a prepared statement for all prepared = session.prepare( "{prefix} INSERT INTO {table} ({cols}) values ({vals}) {postfix}".format( - prefix=prefix, table=table_name, cols=', '.join(dicts[0].keys()), - vals=', '.join('?' for k in dicts[0].keys()), postfix=postfix) + prefix=prefix, table=table_name, cols=', '.join(list(dicts[0].keys())), + vals=', '.join('?' for k in list(dicts[0].keys())), postfix=postfix) ) if cl is not None: prepared.consistency_level = cl - query_results = execute_concurrent_with_args(session, prepared, [d.values() for d in dicts]) + query_results = execute_concurrent_with_args(session, prepared, [list(d.values()) for d in dicts]) for i, (status, result_or_exc) in enumerate(query_results): # should maybe check status here before appening to expected values http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/tools/decorators.py ---------------------------------------------------------------------- diff --git a/tools/decorators.py b/tools/decorators.py deleted file mode 100644 index a11a4fc..0000000 --- a/tools/decorators.py +++ /dev/null @@ -1,106 +0,0 @@ -import functools -import unittest -from distutils.version import LooseVersion - -from nose.plugins.attrib import attr -from nose.tools import assert_in, assert_is_instance - -from dtest import DISABLE_VNODES - - -class since(object): - - def __init__(self, cass_version, max_version=None): - self.cass_version = LooseVersion(cass_version) - self.max_version = max_version - if self.max_version is not None: - self.max_version = LooseVersion(self.max_version) - - def _skip_msg(self, version): - if version < self.cass_version: - return "%s < %s" % (version, self.cass_version) - if self.max_version and version > self.max_version: - return "%s > %s" % (version, self.max_version) - - def _wrap_setUp(self, cls): - orig_setUp = cls.setUp - - @functools.wraps(cls.setUp) - def wrapped_setUp(obj, *args, **kwargs): - obj.max_version = self.max_version - orig_setUp(obj, *args, **kwargs) - version = obj.cluster.version() - msg = self._skip_msg(version) - if msg: - obj.skip(msg) - - cls.setUp = wrapped_setUp - return cls - - def _wrap_function(self, f): - @functools.wraps(f) - def wrapped(obj): - obj.max_version = self.max_version - version = obj.cluster.version() - msg = self._skip_msg(version) - if msg: - obj.skip(msg) - f(obj) - return wrapped - - def __call__(self, skippable): - if isinstance(skippable, type): - return self._wrap_setUp(skippable) - return self._wrap_function(skippable) - - -def no_vnodes(): - """ - Skips the decorated test or test class if using vnodes. - """ - return unittest.skipIf(not DISABLE_VNODES, 'Test disabled for vnodes') - - -def known_failure(failure_source, jira_url, flaky=False, notes=''): - """ - Tag a test as a known failure. Associate it with the URL for a JIRA - ticket and tag it as flaky or not. - - Valid values for failure_source include: 'cassandra', 'test', 'driver', and - 'systemic'. - - To run all known failures, use the functionality provided by the nosetests - attrib plugin, using the known_failure attributes: - - # only run tests that are known to fail - $ nosetests -a known_failure - # only run tests that are not known to fail - $ nosetests -a !known_failure - # only run tests that fail because of cassandra bugs - $ nosetests -A "'cassandra' in [d['failure_source'] for d in known_failure]" - - Known limitations: a given test may only be tagged once and still work as - expected with the attrib plugin machinery; if you decorate a test with - known_failure multiple times, the known_failure attribute of that test - will have the value applied by the outermost instance of the decorator. - """ - valid_failure_sources = ('cassandra', 'test', 'systemic', 'driver') - - def wrapper(f): - assert_in(failure_source, valid_failure_sources) - assert_is_instance(flaky, bool) - - try: - existing_failure_annotations = f.known_failure - except AttributeError: - existing_failure_annotations = [] - - new_annotation = [{'failure_source': failure_source, 'jira_url': jira_url, 'notes': notes, 'flaky': flaky}] - - failure_annotations = existing_failure_annotations + new_annotation - - tagged_func = attr(known_failure=failure_annotations)(f) - - return tagged_func - - return wrapper http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/tools/files.py ---------------------------------------------------------------------- diff --git a/tools/files.py b/tools/files.py index c81cea0..7f0cd97 100644 --- a/tools/files.py +++ b/tools/files.py @@ -3,8 +3,9 @@ import os import re import sys import tempfile +import logging -from dtest import debug # Depending on dtest is not good long-term. +logger = logging.getLogger(__name__) def replace_in_file(filepath, search_replacements): @@ -37,5 +38,5 @@ def size_of_files_in_dir(dir_name, verbose=True): """ files = [os.path.join(dir_name, f) for f in os.listdir(dir_name)] if verbose: - debug('getting sizes of these files: {}'.format(files)) + logger.debug('getting sizes of these files: {}'.format(files)) return sum(os.path.getsize(f) for f in files) http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/tools/git.py ---------------------------------------------------------------------- diff --git a/tools/git.py b/tools/git.py index 2f5afa6..7daf4b8 100644 --- a/tools/git.py +++ b/tools/git.py @@ -1,22 +1,22 @@ import subprocess +import logging -from dtest import CASSANDRA_DIR, debug +logger = logging.getLogger(__name__) -def cassandra_git_branch(cdir=None): +def cassandra_git_branch(cassandra_dir): '''Get the name of the git branch at CASSANDRA_DIR. ''' - cdir = CASSANDRA_DIR if cdir is None else cdir try: - p = subprocess.Popen(['git', 'branch'], cwd=cdir, + p = subprocess.Popen(['git', 'branch'], cwd=cassandra_dir, stdout=subprocess.PIPE, stderr=subprocess.PIPE) except OSError as e: # e.g. if git isn't available, just give up and return None - debug('shelling out to git failed: {}'.format(e)) + logger.debug('shelling out to git failed: {}'.format(e)) return out, err = p.communicate() # fail if git failed if p.returncode != 0: - raise RuntimeError('Git printed error: {err}'.format(err=err)) - [current_branch_line] = [line for line in out.splitlines() if line.startswith('*')] + raise RuntimeError('Git printed error: {err}'.format(err=err.decode("utf-8"))) + [current_branch_line] = [line for line in out.decode("utf-8").splitlines() if line.startswith('*')] return current_branch_line[1:].strip() http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/tools/hacks.py ---------------------------------------------------------------------- diff --git a/tools/hacks.py b/tools/hacks.py index 9faa778..f68e20d 100644 --- a/tools/hacks.py +++ b/tools/hacks.py @@ -4,13 +4,14 @@ weirdnesses in Cassandra. """ import os import time +import logging from cassandra.concurrent import execute_concurrent -from nose.tools import assert_less_equal -import dtest from tools.funcutils import get_rate_limited_function +logger = logging.getLogger(__name__) + def _files_in(directory): return { @@ -20,7 +21,7 @@ def _files_in(directory): def advance_to_next_cl_segment(session, commitlog_dir, keyspace_name='ks', table_name='junk_table', - timeout=60, debug=True): + timeout=60): """ This is a hack to work around problems like CASSANDRA-11811. @@ -29,15 +30,6 @@ def advance_to_next_cl_segment(session, commitlog_dir, replaying some mutations that initialize system tables, so this function advances the node to the next CL by filling up the first one. """ - if debug: - _debug = dtest.debug - else: - def _debug(*args, **kwargs): - """ - noop debug method - """ - pass - session.execute( 'CREATE TABLE {ks}.{tab} (' 'a uuid PRIMARY KEY, b uuid, c uuid, d uuid, ' @@ -58,17 +50,15 @@ def advance_to_next_cl_segment(session, commitlog_dir, start = time.time() stop_time = start + timeout - rate_limited_debug = get_rate_limited_function(_debug, 5) - _debug('attempting to write until we start writing to new CL segments: {}'.format(initial_cl_files)) + rate_limited_debug_logger = get_rate_limited_function(logger.debug, 5) + logger.debug('attempting to write until we start writing to new CL segments: {}'.format(initial_cl_files)) while _files_in(commitlog_dir) <= initial_cl_files: elapsed = time.time() - start - rate_limited_debug(' commitlog-advancing load step has lasted {s:.2f}s'.format(s=elapsed)) - assert_less_equal( - time.time(), stop_time, - "It's been over a {s}s and we haven't written a new " + rate_limited_debug_logger(' commitlog-advancing load step has lasted {s:.2f}s'.format(s=elapsed)) + assert ( + time.time() <= stop_time), "It's been over a {s}s and we haven't written a new " + \ "commitlog segment. Something is wrong.".format(s=timeout) - ) execute_concurrent( session, ((prepared_insert, ()) for _ in range(1000)), @@ -76,4 +66,4 @@ def advance_to_next_cl_segment(session, commitlog_dir, raise_on_first_error=True, ) - _debug('present commitlog segments: {}'.format(_files_in(commitlog_dir))) + logger.debug('present commitlog segments: {}'.format(_files_in(commitlog_dir))) http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/tools/intervention.py ---------------------------------------------------------------------- diff --git a/tools/intervention.py b/tools/intervention.py index 27b1977..6a8add8 100644 --- a/tools/intervention.py +++ b/tools/intervention.py @@ -1,8 +1,10 @@ import random import time +import logging + from threading import Thread -from dtest import debug +logger = logging.getLogger(__name__) class InterruptBootstrap(Thread): @@ -38,9 +40,9 @@ class InterruptCompaction(Thread): self.node.watch_log_for("Compacting(.*)%s" % (self.tablename,), from_mark=self.mark, filename=self.filename) if self.delay > 0: random_delay = random.uniform(0, self.delay) - debug("Sleeping for {} seconds".format(random_delay)) + logger.debug("Sleeping for {} seconds".format(random_delay)) time.sleep(random_delay) - debug("Killing node {}".format(self.node.address())) + logger.debug("Killing node {}".format(self.node.address())) self.node.stop(gently=False) http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/tools/jmxutils.py ---------------------------------------------------------------------- diff --git a/tools/jmxutils.py b/tools/jmxutils.py index 7468226..83459e0 100644 --- a/tools/jmxutils.py +++ b/tools/jmxutils.py @@ -1,13 +1,16 @@ import json import os import subprocess -from urllib2 import urlopen +import urllib.request +import urllib.parse +import logging import ccmlib.common as common -from dtest import warning from distutils.version import LooseVersion +logger = logging.getLogger(__name__) + JOLOKIA_JAR = os.path.join('lib', 'jolokia-jvm-1.2.3-agent.jar') CLASSPATH_SEP = ';' if common.is_win() else ':' JVM_OPTIONS = "jvm.options" @@ -18,7 +21,7 @@ def jolokia_classpath(): tools_jar = os.path.join(os.environ['JAVA_HOME'], 'lib', 'tools.jar') return CLASSPATH_SEP.join((tools_jar, JOLOKIA_JAR)) else: - warning("Environment variable $JAVA_HOME not present: jmx-based " + + logger.warning("Environment variable $JAVA_HOME not present: jmx-based " + "tests may fail because of missing $JAVA_HOME/lib/tools.jar.") return JOLOKIA_JAR @@ -50,7 +53,7 @@ def make_mbean(package, type, **kwargs): rv = 'org.apache.cassandra.%s:type=%s' % (package, type) if kwargs: rv += ',' + ','.join('{k}={v}'.format(k=k, v=v) - for k, v in kwargs.iteritems()) + for k, v in kwargs.items()) return rv @@ -204,9 +207,9 @@ class JolokiaAgent(object): try: subprocess.check_output(args, stderr=subprocess.STDOUT) except subprocess.CalledProcessError as exc: - print "Failed to start jolokia agent (command was: %s): %s" % (' '.join(args), exc) - print "Exit status was: %d" % (exc.returncode,) - print "Output was: %s" % (exc.output,) + print("Failed to start jolokia agent (command was: %s): %s" % (' '.join(args), exc)) + print("Exit status was: %d" % (exc.returncode,)) + print("Output was: %s" % (exc.output,)) raise def stop(self): @@ -220,15 +223,16 @@ class JolokiaAgent(object): try: subprocess.check_output(args, stderr=subprocess.STDOUT) except subprocess.CalledProcessError as exc: - print "Failed to stop jolokia agent (command was: %s): %s" % (' '.join(args), exc) - print "Exit status was: %d" % (exc.returncode,) - print "Output was: %s" % (exc.output,) + print("Failed to stop jolokia agent (command was: %s): %s" % (' '.join(args), exc)) + print("Exit status was: %d" % (exc.returncode,)) + print("Output was: %s" % (exc.output,)) raise def _query(self, body, verbose=True): - request_data = json.dumps(body) + request_data = json.dumps(body).encode("utf-8") url = 'http://%s:8778/jolokia/' % (self.node.network_interfaces['binary'][0],) - response = urlopen(url, data=request_data, timeout=10.0) + req = urllib.request.Request(url) + response = urllib.request.urlopen(req, data=request_data, timeout=10.0) if response.code != 200: raise Exception("Failed to query Jolokia agent; HTTP response code: %d; response: %s" % (response.code, response.readlines())) @@ -237,9 +241,9 @@ class JolokiaAgent(object): if response['status'] != 200: stacktrace = response.get('stacktrace') if stacktrace and verbose: - print "Stacktrace from Jolokia error follows:" + print("Stacktrace from Jolokia error follows:") for line in stacktrace.splitlines(): - print line + print(line) raise Exception("Jolokia agent returned non-200 status: %s" % (response,)) return response http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/tools/metadata_wrapper.py ---------------------------------------------------------------------- diff --git a/tools/metadata_wrapper.py b/tools/metadata_wrapper.py index 267acc5..43bdbfb 100644 --- a/tools/metadata_wrapper.py +++ b/tools/metadata_wrapper.py @@ -1,9 +1,7 @@ from abc import ABCMeta, abstractproperty -class UpdatingMetadataWrapperBase(object): - __metaclass__ = ABCMeta - +class UpdatingMetadataWrapperBase(object, metaclass=ABCMeta): @abstractproperty def _wrapped(self): pass http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/tools/misc.py ---------------------------------------------------------------------- diff --git a/tools/misc.py b/tools/misc.py index 0ca7adf..aa2c084 100644 --- a/tools/misc.py +++ b/tools/misc.py @@ -1,11 +1,15 @@ import os import subprocess import time +import hashlib +import logging + from collections import Mapping from ccmlib.node import Node -from dtest import debug + +logger = logging.getLogger(__name__) # work for cluster started by populate @@ -53,23 +57,65 @@ def generate_ssl_stores(base_dir, passphrase='cassandra'): """ if os.path.exists(os.path.join(base_dir, 'keystore.jks')): - debug("keystores already exists - skipping generation of ssl keystores") + logger.debug("keystores already exists - skipping generation of ssl keystores") return - debug("generating keystore.jks in [{0}]".format(base_dir)) + logger.debug("generating keystore.jks in [{0}]".format(base_dir)) subprocess.check_call(['keytool', '-genkeypair', '-alias', 'ccm_node', '-keyalg', 'RSA', '-validity', '365', '-keystore', os.path.join(base_dir, 'keystore.jks'), '-storepass', passphrase, '-dname', 'cn=Cassandra Node,ou=CCMnode,o=DataStax,c=US', '-keypass', passphrase]) - debug("exporting cert from keystore.jks in [{0}]".format(base_dir)) + logger.debug("exporting cert from keystore.jks in [{0}]".format(base_dir)) subprocess.check_call(['keytool', '-export', '-rfc', '-alias', 'ccm_node', '-keystore', os.path.join(base_dir, 'keystore.jks'), '-file', os.path.join(base_dir, 'ccm_node.cer'), '-storepass', passphrase]) - debug("importing cert into truststore.jks in [{0}]".format(base_dir)) + logger.debug("importing cert into truststore.jks in [{0}]".format(base_dir)) subprocess.check_call(['keytool', '-import', '-file', os.path.join(base_dir, 'ccm_node.cer'), '-alias', 'ccm_node', '-keystore', os.path.join(base_dir, 'truststore.jks'), '-storepass', passphrase, '-noprompt']) +def list_to_hashed_dict(list): + """ + takes a list and hashes the contents and puts them into a dict so the contents can be compared + without order. unfortunately, we need to do a little massaging of our input; the result from + the driver can return a OrderedMapSerializedKey (e.g. [0, 9, OrderedMapSerializedKey([(10, 11)])]) + but our "expected" list is simply a list of elements (or list of list). this means if we + hash the values as is we'll get different results. to avoid this, when we see a dict, + convert the raw values (key, value) into a list and insert that list into a new list + :param list the list to convert into a dict + :return: a dict containing the contents fo the list with the hashed contents + """ + hashed_dict = dict() + for item_lst in list: + normalized_list = [] + for item in item_lst: + if hasattr(item, "items"): + tmp_list = [] + for a, b in item.items(): + tmp_list.append(a) + tmp_list.append(b) + normalized_list.append(tmp_list) + else: + normalized_list.append(item) + list_digest = hashlib.sha256(str(normalized_list).encode('utf-8', 'ignore')).hexdigest() + hashed_dict[list_digest] = normalized_list + return hashed_dict + + +def get_current_test_name(): + """ + See https://docs.pytest.org/en/latest/example/simple.html#pytest-current-test-environment-variable + :return: returns just the name of the current running test name + """ + pytest_current_test = os.environ.get('PYTEST_CURRENT_TEST') + test_splits = pytest_current_test.split("::") + current_test_name = test_splits[len(test_splits) - 1] + current_test_name = current_test_name.replace(" (call)", "") + current_test_name = current_test_name.replace(" (setup)", "") + current_test_name = current_test_name.replace(" (teardown)", "") + return current_test_name + + class ImmutableMapping(Mapping): """ Convenience class for when you want an immutable-ish map. http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/tools/paging.py ---------------------------------------------------------------------- diff --git a/tools/paging.py b/tools/paging.py index a9b6756..0d99bfd 100644 --- a/tools/paging.py +++ b/tools/paging.py @@ -1,7 +1,7 @@ import time from tools.datahelp import flatten_into_set - +from tools.misc import list_to_hashed_dict class Page(object): data = None @@ -165,7 +165,8 @@ class PageAssertionMixin(object): """Can be added to subclasses of unittest.Tester""" def assertEqualIgnoreOrder(self, actual, expected): - return self.assertItemsEqual(actual, expected) + assert list_to_hashed_dict(actual) == list_to_hashed_dict(expected) + def assertIsSubsetOf(self, subset, superset): - self.assertLessEqual(flatten_into_set(subset), flatten_into_set(superset)) + assert flatten_into_set(subset) <= flatten_into_set(superset) http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/topology_test.py ---------------------------------------------------------------------- diff --git a/topology_test.py b/topology_test.py index 02d1806..47426f0 100644 --- a/topology_test.py +++ b/topology_test.py @@ -1,21 +1,24 @@ import re import time +import pytest +import logging + from threading import Thread -from unittest import skip from cassandra import ConsistencyLevel from ccmlib.node import TimeoutError, ToolError -from nose.plugins.attrib import attr -from dtest import Tester, debug, create_ks, create_cf +from dtest import Tester, create_ks, create_cf from tools.assertions import assert_almost_equal, assert_all, assert_none from tools.data import insert_c1c2, query_c1c2 -from tools.decorators import no_vnodes, since + +since = pytest.mark.since +logger = logging.getLogger(__name__) class TestTopology(Tester): - def do_not_join_ring_test(self): + def test_do_not_join_ring(self): """ @jira_ticket CASSANDRA-9034 Check that AssertionError is not thrown on SizeEstimatesRecorder before node joins ring @@ -32,19 +35,19 @@ class TestTopology(Tester): node1.stop(gently=False) @since('3.0.11') - def size_estimates_multidc_test(self): + def test_size_estimates_multidc(self): """ Test that primary ranges are correctly generated on system.size_estimates for multi-dc, multi-ks scenario @jira_ticket CASSANDRA-9639 """ - debug("Creating cluster") + logger.debug("Creating cluster") cluster = self.cluster cluster.set_configuration_options(values={'num_tokens': 2}) cluster.populate([2, 1]) node1_1, node1_2, node2_1 = cluster.nodelist() - debug("Setting tokens") + logger.debug("Setting tokens") node1_tokens, node2_tokens, node3_tokens = ['-6639341390736545756,-2688160409776496397', '-2506475074448728501,8473270337963525440', '-3736333188524231709,8673615181726552074'] @@ -53,20 +56,20 @@ class TestTopology(Tester): node2_1.set_configuration_options(values={'initial_token': node3_tokens}) cluster.set_configuration_options(values={'num_tokens': 2}) - debug("Starting cluster") + logger.debug("Starting cluster") cluster.start() out, _, _ = node1_1.nodetool('ring') - debug("Nodetool ring output {}".format(out)) + logger.debug("Nodetool ring output {}".format(out)) - debug("Creating keyspaces") + logger.debug("Creating keyspaces") session = self.patient_cql_connection(node1_1) create_ks(session, 'ks1', 3) create_ks(session, 'ks2', {'dc1': 2}) create_cf(session, 'ks1.cf1', columns={'c1': 'text', 'c2': 'text'}) create_cf(session, 'ks2.cf2', columns={'c1': 'text', 'c2': 'text'}) - debug("Refreshing size estimates") + logger.debug("Refreshing size estimates") node1_1.nodetool('refreshsizeestimates') node1_2.nodetool('refreshsizeestimates') node2_1.nodetool('refreshsizeestimates') @@ -94,7 +97,7 @@ class TestTopology(Tester): 127.0.0.3 8673615181726552074 """ - debug("Checking node1_1 size_estimates primary ranges") + logger.debug("Checking node1_1 size_estimates primary ranges") session = self.patient_exclusive_cql_connection(node1_1) assert_all(session, "SELECT range_start, range_end FROM system.size_estimates " "WHERE keyspace_name = 'ks1'", [['-3736333188524231709', '-2688160409776496397'], @@ -107,7 +110,7 @@ class TestTopology(Tester): ['8473270337963525440', '8673615181726552074'], ['8673615181726552074', '-9223372036854775808']]) - debug("Checking node1_2 size_estimates primary ranges") + logger.debug("Checking node1_2 size_estimates primary ranges") session = self.patient_exclusive_cql_connection(node1_2) assert_all(session, "SELECT range_start, range_end FROM system.size_estimates " "WHERE keyspace_name = 'ks1'", [['-2506475074448728501', '8473270337963525440'], @@ -116,7 +119,7 @@ class TestTopology(Tester): "WHERE keyspace_name = 'ks2'", [['-2506475074448728501', '8473270337963525440'], ['-2688160409776496397', '-2506475074448728501']]) - debug("Checking node2_1 size_estimates primary ranges") + logger.debug("Checking node2_1 size_estimates primary ranges") session = self.patient_exclusive_cql_connection(node2_1) assert_all(session, "SELECT range_start, range_end FROM system.size_estimates " "WHERE keyspace_name = 'ks1'", [['-6639341390736545756', '-3736333188524231709'], @@ -124,7 +127,7 @@ class TestTopology(Tester): assert_none(session, "SELECT range_start, range_end FROM system.size_estimates " "WHERE keyspace_name = 'ks2'") - def simple_decommission_test(self): + def test_simple_decommission(self): """ @jira_ticket CASSANDRA-9912 Check that AssertionError is not thrown on SizeEstimatesRecorder after node is decommissioned @@ -151,8 +154,8 @@ class TestTopology(Tester): # described in 9912. Do not remove it. time.sleep(10) - @skip('Hangs on CI for 2.1') - def concurrent_decommission_not_allowed_test(self): + @pytest.mark.skip(reason='Hangs on CI for 2.1') + def test_concurrent_decommission_not_allowed(self): """ Test concurrent decommission is not allowed """ @@ -179,7 +182,7 @@ class TestTopology(Tester): node2.watch_log_for('DECOMMISSIONING', filename='debug.log') # Launch a second decommission, should fail - with self.assertRaises(ToolError): + with pytest.raises(ToolError): node2.nodetool('decommission') # Check data is correctly forwarded to node1 after node2 is decommissioned @@ -187,17 +190,20 @@ class TestTopology(Tester): node2.watch_log_for('DECOMMISSIONED', from_mark=mark) session = self.patient_cql_connection(node1) session.execute('USE ks') - for n in xrange(0, 10000): + for n in range(0, 10000): query_c1c2(session, n, ConsistencyLevel.ONE) @since('3.10') - def resumable_decommission_test(self): + def test_resumable_decommission(self): """ @jira_ticket CASSANDRA-12008 Test decommission operation is resumable """ - self.ignore_log_patterns = [r'Streaming error occurred', r'Error while decommissioning node', r'Remote peer 127.0.0.2 failed stream session', r'Remote peer 127.0.0.2:7000 failed stream session'] + self.fixture_dtest_setup.ignore_log_patterns = [r'Streaming error occurred', + r'Error while decommissioning node', + r'Remote peer 127.0.0.2 failed stream session', + r'Remote peer 127.0.0.2:7000 failed stream session'] cluster = self.cluster cluster.set_configuration_options(values={'stream_throughput_outbound_megabits_per_sec': 1}) cluster.populate(3, install_byteman=True).start(wait_other_notice=True) @@ -211,7 +217,7 @@ class TestTopology(Tester): insert_c1c2(session, n=10000, consistency=ConsistencyLevel.ALL) # Execute first rebuild, should fail - with self.assertRaises(ToolError): + with pytest.raises(ToolError): if cluster.version() >= '4.0': script = ['./byteman/4.0/decommission_failure_inject.btm'] else: @@ -235,7 +241,7 @@ class TestTopology(Tester): node3.stop(gently=False) session = self.patient_exclusive_cql_connection(node1) session.execute('USE ks') - for i in xrange(0, 10000): + for i in range(0, 10000): query_c1c2(session, i, ConsistencyLevel.ONE) node1.stop(gently=False) node3.start() @@ -244,11 +250,11 @@ class TestTopology(Tester): node3.watch_log_for('Starting listening for CQL clients', from_mark=mark) session = self.patient_exclusive_cql_connection(node3) session.execute('USE ks') - for i in xrange(0, 10000): + for i in range(0, 10000): query_c1c2(session, i, ConsistencyLevel.ONE) - @no_vnodes() - def movement_test(self): + @pytest.mark.no_vnodes + def test_movement(self): cluster = self.cluster # Create an unbalanced ring @@ -281,7 +287,7 @@ class TestTopology(Tester): cluster.cleanup() # Check we can get all the keys - for n in xrange(0, 30000): + for n in range(0, 30000): query_c1c2(session, n, ConsistencyLevel.ONE) # Now the load should be basically even @@ -291,8 +297,8 @@ class TestTopology(Tester): assert_almost_equal(sizes[0], sizes[2]) assert_almost_equal(sizes[1], sizes[2]) - @no_vnodes() - def decommission_test(self): + @pytest.mark.no_vnodes + def test_decommission(self): cluster = self.cluster tokens = cluster.balanced_tokens(4) @@ -317,17 +323,17 @@ class TestTopology(Tester): time.sleep(.5) # Check we can get all the keys - for n in xrange(0, 30000): + for n in range(0, 30000): query_c1c2(session, n, ConsistencyLevel.QUORUM) sizes = [node.data_size() for node in cluster.nodelist() if node.is_running()] - debug(sizes) + logger.debug(sizes) assert_almost_equal(sizes[0], sizes[1]) assert_almost_equal((2.0 / 3.0) * sizes[0], sizes[2]) assert_almost_equal(sizes[2], init_size) - @no_vnodes() - def move_single_node_test(self): + @pytest.mark.no_vnodes + def test_move_single_node(self): """ Test moving a node in a single-node cluster (#4200) """ cluster = self.cluster @@ -350,12 +356,12 @@ class TestTopology(Tester): cluster.cleanup() # Check we can get all the keys - for n in xrange(0, 10000): + for n in range(0, 10000): query_c1c2(session, n, ConsistencyLevel.ONE) @since('3.0') - def decommissioned_node_cant_rejoin_test(self): - ''' + def test_decommissioned_node_cant_rejoin(self): + """ @jira_ticket CASSANDRA-8801 Test that a decommissioned node can't rejoin the cluster by: @@ -365,22 +371,19 @@ class TestTopology(Tester): - asserting that the "decommissioned node won't rejoin" error is in the logs for that node and - asserting that the node is not running. - ''' + """ rejoin_err = 'This node was decommissioned and will not rejoin the ring' - try: - self.ignore_log_patterns = list(self.ignore_log_patterns) - except AttributeError: - self.ignore_log_patterns = [] - self.ignore_log_patterns.append(rejoin_err) + self.fixture_dtest_setup.ignore_log_patterns = list(self.fixture_dtest_setup.ignore_log_patterns) + [ + rejoin_err] self.cluster.populate(3).start(wait_for_binary_proto=True) node1, node2, node3 = self.cluster.nodelist() - debug('decommissioning...') + logger.debug('decommissioning...') node3.decommission(force=self.cluster.version() >= '4.0') - debug('stopping...') + logger.debug('stopping...') node3.stop() - debug('attempting restart...') + logger.debug('attempting restart...') node3.start(wait_other_notice=False) try: # usually takes 3 seconds, so give it a generous 15 @@ -390,9 +393,8 @@ class TestTopology(Tester): # let that pass and move on to string assertion below pass - self.assertIn(rejoin_err, - '\n'.join(['\n'.join(err_list) - for err_list in node3.grep_log_for_errors()])) + assert re.search(rejoin_err, + '\n'.join(['\n'.join(err_list) for err_list in node3.grep_log_for_errors()]), re.MULTILINE) # Give the node some time to shut down once it has detected # its invalid state. If it doesn't shut down in the 30 seconds, @@ -401,10 +403,10 @@ class TestTopology(Tester): while start + 30 > time.time() and node3.is_running(): time.sleep(1) - self.assertFalse(node3.is_running()) + assert not node3.is_running() @since('3.0') - def crash_during_decommission_test(self): + def test_crash_during_decommission(self): """ If a node crashes whilst another node is being decommissioned, upon restarting the crashed node should not have invalid entries @@ -412,7 +414,7 @@ class TestTopology(Tester): @jira_ticket CASSANDRA-10231 """ cluster = self.cluster - self.ignore_log_patterns = [r'Streaming error occurred', 'Stream failed'] + self.fixture_dtest_setup.ignore_log_patterns = [r'Streaming error occurred', 'Stream failed'] cluster.populate(3).start(wait_other_notice=True) node1, node2 = cluster.nodelist()[0:2] @@ -425,24 +427,24 @@ class TestTopology(Tester): while t.is_alive(): out = self.show_status(node2) if null_status_pattern.search(out): - debug("Matched null status entry") + logger.debug("Matched null status entry") break - debug("Restarting node2") + logger.debug("Restarting node2") node2.stop(gently=False) node2.start(wait_for_binary_proto=True, wait_other_notice=False) - debug("Waiting for decommission to complete") + logger.debug("Waiting for decommission to complete") t.join() self.show_status(node2) - debug("Sleeping for 30 seconds to allow gossip updates") + logger.debug("Sleeping for 30 seconds to allow gossip updates") time.sleep(30) out = self.show_status(node2) - self.assertFalse(null_status_pattern.search(out)) + assert not null_status_pattern.search(out) @since('3.12') - @attr('resource-intensive') - def stop_decommission_too_few_replicas_multi_dc_test(self): + @pytest.mark.resource_intensive + def test_stop_decommission_too_few_replicas_multi_dc(self): """ Decommission should fail when it would result in the number of live replicas being less than the replication factor. --force should bypass this requirement. @@ -455,22 +457,22 @@ class TestTopology(Tester): session = self.patient_cql_connection(node2) session.execute("ALTER KEYSPACE system_distributed WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':'2'};") create_ks(session, 'ks', {'dc1': 2, 'dc2': 2}) - with self.assertRaises(ToolError): + with pytest.raises(ToolError): node4.nodetool('decommission') session.execute('DROP KEYSPACE ks') create_ks(session, 'ks2', 4) - with self.assertRaises(ToolError): + with pytest.raises(ToolError): node4.nodetool('decommission') node4.nodetool('decommission --force') decommissioned = node4.watch_log_for("DECOMMISSIONED", timeout=120) - self.assertTrue(decommissioned, "Node failed to decommission when passed --force") + assert decommissioned, "Node failed to decommission when passed --force" def show_status(self, node): out, _, _ = node.nodetool('status') - debug("Status as reported by node {}".format(node.address())) - debug(out) + logger.debug("Status as reported by node {}".format(node.address())) + logger.debug(out) return out @@ -486,8 +488,8 @@ class DecommissionInParallel(Thread): try: out, err, _ = node.nodetool("decommission") node.watch_log_for("DECOMMISSIONED", from_mark=mark) - debug(out) - debug(err) + logger.debug(out) + logger.debug(err) except ToolError as e: - debug("Decommission failed with exception: " + str(e)) + logger.debug("Decommission failed with exception: " + str(e)) pass http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/ttl_test.py ---------------------------------------------------------------------- diff --git a/ttl_test.py b/ttl_test.py index 46df7b5..b7237d6 100644 --- a/ttl_test.py +++ b/ttl_test.py @@ -1,22 +1,29 @@ import time +import pytest +import logging + from collections import OrderedDict from cassandra import ConsistencyLevel from cassandra.query import SimpleStatement from cassandra.util import sortedset -from dtest import Tester, debug, create_ks +from dtest import Tester, create_ks from tools.assertions import (assert_all, assert_almost_equal, assert_none, assert_row_count, assert_unavailable) -from tools.decorators import since + +since = pytest.mark.since +logger = logging.getLogger(__name__) @since('2.0') class TestTTL(Tester): """ Test Time To Live Feature """ - def setUp(self): - super(TestTTL, self).setUp() + @pytest.fixture(scope='function', autouse=True) + def fixture_ttl_test_setup(self, fixture_dtest_setup): + self.cluster = fixture_dtest_setup.cluster + self.fixture_dtest_setup = fixture_dtest_setup self.cluster.populate(1).start() [node1] = self.cluster.nodelist() self.session1 = self.patient_cql_connection(node1) @@ -51,9 +58,8 @@ class TestTTL(Tester): if real_time_to_wait > 0: time.sleep(real_time_to_wait) - def default_ttl_test(self): + def test_default_ttl(self): """ Test default_time_to_live specified on a table """ - self.prepare(default_time_to_live=1) start = time.time() self.session1.execute("INSERT INTO ttl_table (key, col1) VALUES (%d, %d)" % (1, 1)) @@ -62,9 +68,8 @@ class TestTTL(Tester): self.smart_sleep(start, 3) assert_row_count(self.session1, 'ttl_table', 0) - def insert_ttl_has_priority_on_defaut_ttl_test(self): + def test_insert_ttl_has_priority_on_defaut_ttl(self): """ Test that a ttl specified during an insert has priority on the default table ttl """ - self.prepare(default_time_to_live=1) start = time.time() @@ -76,9 +81,8 @@ class TestTTL(Tester): self.smart_sleep(start, 7) assert_row_count(self.session1, 'ttl_table', 0) - def insert_ttl_works_without_default_ttl_test(self): + def test_insert_ttl_works_without_default_ttl(self): """ Test that a ttl specified during an insert works even if a table has no default ttl """ - self.prepare() start = time.time() @@ -88,9 +92,8 @@ class TestTTL(Tester): self.smart_sleep(start, 3) assert_row_count(self.session1, 'ttl_table', 0) - def default_ttl_can_be_removed_test(self): + def test_default_ttl_can_be_removed(self): """ Test that default_time_to_live can be removed """ - self.prepare(default_time_to_live=1) start = time.time() @@ -101,9 +104,8 @@ class TestTTL(Tester): self.smart_sleep(start, 1.5) assert_row_count(self.session1, 'ttl_table', 1) - def removing_default_ttl_does_not_affect_existing_rows_test(self): + def test_removing_default_ttl_does_not_affect_existing_rows(self): """ Test that removing a default_time_to_live doesn't affect the existings rows """ - self.prepare(default_time_to_live=1) self.session1.execute("ALTER TABLE ttl_table WITH default_time_to_live = 10;") @@ -123,9 +125,8 @@ class TestTTL(Tester): self.smart_sleep(start, 20) assert_row_count(self.session1, 'ttl_table', 1) - def update_single_column_ttl_test(self): + def test_update_single_column_ttl(self): """ Test that specifying a TTL on a single column works """ - self.prepare() self.session1.execute(""" @@ -137,9 +138,8 @@ class TestTTL(Tester): self.smart_sleep(start, 5) assert_all(self.session1, "SELECT * FROM ttl_table;", [[1, None, 1, 1]]) - def update_multiple_columns_ttl_test(self): + def test_update_multiple_columns_ttl(self): """ Test that specifying a TTL on multiple columns works """ - self.prepare() self.session1.execute(""" @@ -153,12 +153,11 @@ class TestTTL(Tester): self.smart_sleep(start, 4) assert_all(self.session1, "SELECT * FROM ttl_table;", [[1, None, None, None]]) - def update_column_ttl_with_default_ttl_test(self): + def test_update_column_ttl_with_default_ttl(self): """ Test that specifying a column ttl works when a default ttl is set. This test specify a lower ttl for the column than the default ttl. """ - self.prepare(default_time_to_live=8) start = time.time() @@ -190,11 +189,10 @@ class TestTTL(Tester): self.smart_sleep(start, 8) assert_row_count(self.session1, 'ttl_table', 0) - def remove_column_ttl_test(self): + def test_remove_column_ttl(self): """ Test that removing a column ttl works. """ - self.prepare() start = time.time() @@ -206,12 +204,11 @@ class TestTTL(Tester): assert_all(self.session1, "SELECT * FROM ttl_table;", [[1, 42, None, None]]) @since('3.6') - def set_ttl_to_zero_to_default_ttl_test(self): + def test_set_ttl_to_zero_to_default_ttl(self): """ Test that we can remove the default ttl by setting the ttl explicitly to zero. CASSANDRA-11207 """ - self.prepare(default_time_to_live=2) start = time.time() @@ -225,11 +222,10 @@ class TestTTL(Tester): assert_all(self.session1, "SELECT * FROM ttl_table;", [[1, 42, None, None]]) @since('2.1', max_version='3.5') - def remove_column_ttl_with_default_ttl_test(self): + def test_remove_column_ttl_with_default_ttl(self): """ Test that we cannot remove a column ttl when a default ttl is set. """ - self.prepare(default_time_to_live=2) start = time.time() @@ -247,11 +243,10 @@ class TestTTL(Tester): self.smart_sleep(start, 10) assert_row_count(self.session1, 'ttl_table', 0) - def collection_list_ttl_test(self): + def test_collection_list_ttl(self): """ Test that ttl has a granularity of elements using a list collection. """ - self.prepare(default_time_to_live=10) self.session1.execute("ALTER TABLE ttl_table ADD mylist list<int>;""") @@ -268,11 +263,10 @@ class TestTTL(Tester): self.smart_sleep(start, 12) assert_row_count(self.session1, 'ttl_table', 0) - def collection_set_ttl_test(self): + def test_collection_set_ttl(self): """ Test that ttl has a granularity of elements using a set collection. """ - self.prepare(default_time_to_live=10) self.session1.execute("ALTER TABLE ttl_table ADD myset set<int>;""") @@ -297,11 +291,10 @@ class TestTTL(Tester): self.smart_sleep(start, 12) assert_row_count(self.session1, 'ttl_table', 0) - def collection_map_ttl_test(self): + def test_collection_map_ttl(self): """ Test that ttl has a granularity of elements using a map collection. """ - self.prepare(default_time_to_live=6) self.session1.execute("ALTER TABLE ttl_table ADD mymap map<int, int>;""") @@ -326,7 +319,7 @@ class TestTTL(Tester): self.smart_sleep(start, 8) assert_row_count(self.session1, 'ttl_table', 0) - def delete_with_ttl_expired_test(self): + def test_delete_with_ttl_expired(self): """ Updating a row with a ttl does not prevent deletion, test for CASSANDRA-6363 """ @@ -344,13 +337,14 @@ class TestTTL(Tester): class TestDistributedTTL(Tester): """ Test Time To Live Feature in a distributed environment """ - def setUp(self): - super(TestDistributedTTL, self).setUp() - self.cluster.populate(2).start() - [self.node1, self.node2] = self.cluster.nodelist() - self.session1 = self.patient_cql_connection(self.node1) + @pytest.fixture(scope='function', autouse=True) + def fixture_set_cluster_settings(self, fixture_dtest_setup): + fixture_dtest_setup.cluster.populate(2).start() + [self.node1, self.node2] = fixture_dtest_setup.cluster.nodelist() + self.session1 = fixture_dtest_setup.patient_cql_connection(self.node1) create_ks(self.session1, 'ks', 2) + def prepare(self, default_time_to_live=None): self.session1.execute("DROP TABLE IF EXISTS ttl_table;") query = """ @@ -366,11 +360,10 @@ class TestDistributedTTL(Tester): self.session1.execute(query) - def ttl_is_replicated_test(self): + def test_ttl_is_replicated(self): """ Test that the ttl setting is replicated properly on all nodes """ - self.prepare(default_time_to_live=5) session1 = self.patient_exclusive_cql_connection(self.node1) session2 = self.patient_exclusive_cql_connection(self.node2) @@ -392,15 +385,14 @@ class TestDistributedTTL(Tester): # since the two queries are not executed simultaneously, the remaining # TTLs can differ by one second - self.assertLessEqual(abs(ttl_session1[0][0] - ttl_session2[0][0]), 1) + assert abs(ttl_session1[0][0] - ttl_session2[0][0]) <= 1 time.sleep(7) assert_none(session1, "SELECT * FROM ttl_table;", cl=ConsistencyLevel.ALL) - def ttl_is_respected_on_delayed_replication_test(self): + def test_ttl_is_respected_on_delayed_replication(self): """ Test that ttl is respected on delayed replication """ - self.prepare() self.node2.stop() self.session1.execute(""" @@ -437,13 +429,12 @@ class TestDistributedTTL(Tester): ttl_1 = self.session1.execute('SELECT ttl(col1) FROM ttl_table;')[0][0] ttl_2 = session2.execute('SELECT ttl(col1) FROM ttl_table;')[0][0] - debug("ttl_1 is {}:".format(ttl_1)) - debug("ttl_2 is {}:".format(ttl_2)) - self.assertLessEqual(abs(ttl_1 - ttl_2), 1) + logger.debug("ttl_1 is {}:".format(ttl_1)) + logger.debug("ttl_2 is {}:".format(ttl_2)) + assert abs(ttl_1 - ttl_2) <= 1 - def ttl_is_respected_on_repair_test(self): + def test_ttl_is_respected_on_repair(self): """ Test that ttl is respected on repair """ - self.prepare() self.session1.execute(""" ALTER KEYSPACE ks WITH REPLICATION = http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/udtencoding_test.py ---------------------------------------------------------------------- diff --git a/udtencoding_test.py b/udtencoding_test.py index 59c0e48..eb5929e 100644 --- a/udtencoding_test.py +++ b/udtencoding_test.py @@ -1,12 +1,15 @@ import time +import logging from tools.assertions import assert_invalid from dtest import Tester, create_ks +logger = logging.getLogger(__name__) + class TestUDTEncoding(Tester): - def udt_test(self): + def test_udt(self): """ Test (somewhat indirectly) that user queries involving UDT's are properly encoded (due to driver not recognizing UDT syntax) """ cluster = self.cluster http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/upgrade_crc_check_chance_test.py ---------------------------------------------------------------------- diff --git a/upgrade_crc_check_chance_test.py b/upgrade_crc_check_chance_test.py index ec758c2..3ad1b59 100644 --- a/upgrade_crc_check_chance_test.py +++ b/upgrade_crc_check_chance_test.py @@ -1,21 +1,26 @@ -from unittest import skipIf +import pytest -from dtest import OFFHEAP_MEMTABLES, Tester, debug +from dtest import Tester from tools.assertions import assert_crc_check_chance_equal, assert_one -from tools.decorators import since +since = pytest.mark.since + +@pytest.mark.upgrade_test @since('3.0') class TestCrcCheckChanceUpgrade(Tester): - ignore_log_patterns = ( - # This one occurs if we do a non-rolling upgrade, the node - # it's trying to send the migration to hasn't started yet, - # and when it does, it gets replayed and everything is fine. - r'Can\'t send migration request: node.*is down', - ) - - @skipIf(OFFHEAP_MEMTABLES, 'offheap_objects are not available in 3.0') - def crc_check_chance_upgrade_test(self): + + @pytest.fixture(autouse=True) + def fixture_add_additional_log_patterns(self, fixture_dtest_setup): + fixture_dtest_setup.ignore_log_patterns = ( + # This one occurs if we do a non-rolling upgrade, the node + # it's trying to send the migration to hasn't started yet, + # and when it does, it gets replayed and everything is fine. + r'Can\'t send migration request: node.*is down', + ) + + @pytest.mark.no_offheap_memtables + def test_crc_check_chance_upgrade(self): """ Tests behavior of compression property crc_check_chance after upgrade to 3.0, when it was promoted to a top-level property @@ -93,46 +98,46 @@ class TestCrcCheckChanceUpgrade(Tester): assert_one(session, "SELECT * FROM ks.cf1 WHERE id=7", [7, 0]) session.shutdown() - debug('Test completed successfully') + logger.debug('Test completed successfully') def verify_old_crc_check_chance(self, node): session = self.patient_exclusive_cql_connection(node) session.cluster.refresh_schema_metadata(0) meta = session.cluster.metadata.keyspaces['ks'].tables['cf1'] - debug(meta.options['compression_parameters']) - self.assertEqual('{"crc_check_chance":"0.6","sstable_compression":"org.apache.cassandra.io.compress.DeflateCompressor","chunk_length_kb":"256"}', - meta.options['compression_parameters']) + logger.debug(meta.options['compression_parameters']) + assert '{"crc_check_chance":"0.6","sstable_compression":"org.apache.cassandra.io.compress.DeflateCompressor","chunk_length_kb":"256"}' \ + == meta.options['compression_parameters'] session.shutdown() def verify_new_crc_check_chance(self, node): session = self.patient_exclusive_cql_connection(node) session.cluster.refresh_schema_metadata(0) meta = session.cluster.metadata.keyspaces['ks'].tables['cf1'] - self.assertEqual('org.apache.cassandra.io.compress.DeflateCompressor', meta.options['compression']['class']) - self.assertEqual('256', meta.options['compression']['chunk_length_in_kb']) + assert 'org.apache.cassandra.io.compress.DeflateCompressor' == meta.options['compression']['class'] + assert '256' == meta.options['compression']['chunk_length_in_kb'] assert_crc_check_chance_equal(session, "cf1", 0.6) session.shutdown() def upgrade_to_version(self, tag, node): format_args = {'node': node.name, 'tag': tag} - debug('Upgrading node {node} to {tag}'.format(**format_args)) + logger.debug('Upgrading node {node} to {tag}'.format(**format_args)) # drain and shutdown node.drain() node.watch_log_for("DRAINED") node.stop(wait_other_notice=False) - debug('{node} stopped'.format(**format_args)) + logger.debug('{node} stopped'.format(**format_args)) # Update Cassandra Directory - debug('Updating version to tag {tag}'.format(**format_args)) + logger.debug('Updating version to tag {tag}'.format(**format_args)) - debug('Set new cassandra dir for {node}: {tag}'.format(**format_args)) + logger.debug('Set new cassandra dir for {node}: {tag}'.format(**format_args)) node.set_install_dir(version='git:' + tag, verbose=True) # Restart node on new version - debug('Starting {node} on new version ({tag})'.format(**format_args)) + logger.debug('Starting {node} on new version ({tag})'.format(**format_args)) # Setup log4j / logback again (necessary moving from 2.0 -> 2.1): node.set_log_level("INFO") node.start(wait_other_notice=True, wait_for_binary_proto=True) - debug('Running upgradesstables') + logger.debug('Running upgradesstables') node.nodetool('upgradesstables -a') - debug('Upgrade of {node} complete'.format(**format_args)) + logger.debug('Upgrade of {node} complete'.format(**format_args)) http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/upgrade_internal_auth_test.py ---------------------------------------------------------------------- diff --git a/upgrade_internal_auth_test.py b/upgrade_internal_auth_test.py index 62acac9..a4de1ca 100644 --- a/upgrade_internal_auth_test.py +++ b/upgrade_internal_auth_test.py @@ -1,33 +1,47 @@ import time -from unittest import skipIf +import pytest +import logging from cassandra import Unauthorized from ccmlib.common import is_win from ccmlib.node import Node -from dtest import OFFHEAP_MEMTABLES, Tester, debug +from dtest_setup_overrides import DTestSetupOverrides + +from dtest import Tester from tools.assertions import assert_all, assert_invalid -from tools.decorators import since from tools.misc import ImmutableMapping +since = pytest.mark.since +logger = logging.getLogger(__name__) + +@pytest.mark.upgrade_test @since('2.2') class TestAuthUpgrade(Tester): - cluster_options = ImmutableMapping({'authenticator': 'PasswordAuthenticator', - 'authorizer': 'CassandraAuthorizer'}) - ignore_log_patterns = ( - # This one occurs if we do a non-rolling upgrade, the node - # it's trying to send the migration to hasn't started yet, - # and when it does, it gets replayed and everything is fine. - r'Can\'t send migration request: node.*is down', + + @pytest.fixture(scope='function', autouse=True) + def fixture_dtest_setup_overrides(self): + dtest_setup_overrides = DTestSetupOverrides() + dtest_setup_overrides.cluster_options = ImmutableMapping({'authenticator': 'PasswordAuthenticator', + 'authorizer': 'CassandraAuthorizer'}) + return dtest_setup_overrides + + @pytest.fixture(autouse=True) + def fixture_add_additional_log_patterns(self, fixture_dtest_setup): + fixture_dtest_setup.ignore_log_patterns = ( + # This one occurs if we do a non-rolling upgrade, the node + # it's trying to send the migration to hasn't started yet, + # and when it does, it gets replayed and everything is fine. + r'Can\'t send migration request: node.*is down', ) - def upgrade_to_22_test(self): + def test_upgrade_to_22(self): self.do_upgrade_with_internal_auth("github:apache/cassandra-2.2") @since('3.0') - @skipIf(OFFHEAP_MEMTABLES, 'offheap_objects are not available in 3.0') - def upgrade_to_30_test(self): + @pytest.mark.no_offheap_memtables + def test_upgrade_to_30(self): self.do_upgrade_with_internal_auth("github:apache/cassandra-3.0") @since('2.2', max_version='3.X') @@ -72,8 +86,10 @@ class TestAuthUpgrade(Tester): replacement_address = node1.address() replacement_node = Node('replacement', cluster=self.cluster, auto_bootstrap=True, - thrift_interface=(replacement_address, 9160), storage_interface=(replacement_address, 7000), - jmx_port='7400', remote_debug_port='0', initial_token=None, binary_interface=(replacement_address, 9042)) + thrift_interface=(replacement_address, 9160), + storage_interface=(replacement_address, 7000), + jmx_port='7400', remote_debug_port='0', initial_token=None, + binary_interface=(replacement_address, 9042)) self.set_node_to_current_version(replacement_node) cluster.add(replacement_node, True) @@ -150,7 +166,7 @@ class TestAuthUpgrade(Tester): session.execute('DROP TABLE system_auth.permissions', timeout=60) # and we should still be able to authenticate and check authorization self.check_permissions(node1, True) - debug('Test completed successfully') + logger.debug('Test completed successfully') def check_permissions(self, node, upgraded): # use an exclusive connection to ensure we only talk to the specified node @@ -185,32 +201,32 @@ class TestAuthUpgrade(Tester): def upgrade_to_version(self, tag, node): format_args = {'node': node.name, 'tag': tag} - debug('Upgrading node {node} to {tag}'.format(**format_args)) + logger.debug('Upgrading node {node} to {tag}'.format(**format_args)) # drain and shutdown node.drain() node.watch_log_for("DRAINED") node.stop(wait_other_notice=False) - debug('{node} stopped'.format(**format_args)) + logger.debug('{node} stopped'.format(**format_args)) # Ignore errors before upgrade on Windows if is_win(): node.mark_log_for_errors() # Update Cassandra Directory - debug('Updating version to tag {tag}'.format(**format_args)) + logger.debug('Updating version to tag {tag}'.format(**format_args)) node.set_install_dir(version=tag, verbose=True) - debug('Set new cassandra dir for {node}: {tag}'.format(**format_args)) + logger.debug('Set new cassandra dir for {node}: {tag}'.format(**format_args)) # Restart node on new version - debug('Starting {node} on new version ({tag})'.format(**format_args)) + logger.debug('Starting {node} on new version ({tag})'.format(**format_args)) # Setup log4j / logback again (necessary moving from 2.0 -> 2.1): node.set_log_level("INFO") node.start(wait_other_notice=True) # wait for the conversion of legacy data to either complete or fail # (because not enough upgraded nodes are available yet) - debug('Waiting for conversion of legacy data to complete or fail') + logger.debug('Waiting for conversion of legacy data to complete or fail') node.watch_log_for('conversion of legacy permissions') - debug('Running upgradesstables') + logger.debug('Running upgradesstables') node.nodetool('upgradesstables -a') - debug('Upgrade of {node} complete'.format(**format_args)) + logger.debug('Upgrade of {node} complete'.format(**format_args)) http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/upgrade_tests/bootstrap_upgrade_test.py ---------------------------------------------------------------------- diff --git a/upgrade_tests/bootstrap_upgrade_test.py b/upgrade_tests/bootstrap_upgrade_test.py index 43f735a..efe8ae4 100644 --- a/upgrade_tests/bootstrap_upgrade_test.py +++ b/upgrade_tests/bootstrap_upgrade_test.py @@ -1,9 +1,12 @@ -from bootstrap_test import BaseBootstrapTest -from tools.decorators import since, no_vnodes +import pytest +from bootstrap_test import TestBootstrap -class TestBootstrapUpgrade(BaseBootstrapTest): - __test__ = True +since = pytest.mark.since + + +@pytest.mark.upgrade_test +class TestBootstrapUpgrade(TestBootstrap): """ @jira_ticket CASSANDRA-11841 @@ -11,7 +14,7 @@ class TestBootstrapUpgrade(BaseBootstrapTest): In particular, we want to test that keep-alive is not sent to a node with version < 3.10 """ - @no_vnodes() + @pytest.mark.no_vnodes @since('3.10', max_version='3.99') - def simple_bootstrap_test_mixed_versions(self): + def test_simple_bootstrap_mixed_versions(self): self._base_bootstrap_test(bootstrap_from_version="3.5") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org