http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/batch_test.py ---------------------------------------------------------------------- diff --git a/batch_test.py b/batch_test.py index f8b9881..7e7b6b2 100644 --- a/batch_test.py +++ b/batch_test.py @@ -1,22 +1,24 @@ import sys import time -from unittest import skipIf -from nose.tools import assert_greater_equal +import pytest +import logging from cassandra import ConsistencyLevel, Timeout, Unavailable from cassandra.query import SimpleStatement -from dtest import Tester, create_ks, debug +from dtest import Tester, create_ks from tools.assertions import (assert_all, assert_invalid, assert_one, assert_unavailable) -from tools.decorators import since from tools.jmxutils import (JolokiaAgent, make_mbean, remove_perf_disable_shared_mem) +since = pytest.mark.since +logger = logging.getLogger(__name__) + class TestBatch(Tester): - def empty_batch_throws_no_error_test(self): + def test_empty_batch_throws_no_error(self): """ @jira_ticket CASSANDRA-10711 """ @@ -26,9 +28,9 @@ class TestBatch(Tester): APPLY BATCH; """) for node in self.cluster.nodelist(): - self.assertEquals(0, len(node.grep_log_for_errors())) + assert 0 == len(node.grep_log_for_errors()) - def counter_batch_accepts_counter_mutations_test(self): + def test_counter_batch_accepts_counter_mutations(self): """ Test that counter batch accepts counter mutations """ session = self.prepare() session.execute(""" @@ -40,7 +42,7 @@ class TestBatch(Tester): """) assert_all(session, "SELECT total FROM clicks", [[1], [1], [1]]) - def counter_batch_rejects_regular_mutations_test(self): + def test_counter_batch_rejects_regular_mutations(self): """ Test that counter batch rejects non-counter mutations """ session = self.prepare() err = "Cannot include non-counter statement in a counter batch" @@ -54,7 +56,7 @@ class TestBatch(Tester): APPLY BATCH """, matching=err) - def logged_batch_accepts_regular_mutations_test(self): + def test_logged_batch_accepts_regular_mutations(self): """ Test that logged batch accepts regular mutations """ session = self.prepare() session.execute(""" @@ -63,10 +65,10 @@ class TestBatch(Tester): INSERT INTO users (id, firstname, lastname) VALUES (1, 'Will', 'Turner') APPLY BATCH """) - assert_all(session, "SELECT * FROM users", [[1, u'Will', u'Turner'], [0, u'Jack', u'Sparrow']]) + assert_all(session, "SELECT * FROM users", [[1, 'Will', 'Turner'], [0, 'Jack', 'Sparrow']]) @since('3.0') - def logged_batch_gcgs_below_threshold_single_table_test(self): + def test_logged_batch_gcgs_below_threshold_single_table(self): """ Test that logged batch accepts regular mutations """ session = self.prepare() @@ -84,11 +86,11 @@ class TestBatch(Tester): "batchlog entries, so setting gc_grace_seconds too low on tables " "involved in an atomic batch might cause batchlog entries to expire " "before being replayed.") - debug(warning) - self.assertEquals(1, len(warning), "Cannot find the gc_grace_seconds warning message.") + logger.debug(warning) + assert 1 == len(warning), "Cannot find the gc_grace_seconds warning message." @since('3.0') - def logged_batch_gcgs_below_threshold_multi_table_test(self): + def test_logged_batch_gcgs_below_threshold_multi_table(self): """ Test that logged batch accepts regular mutations """ session = self.prepare() session.execute("ALTER TABLE users WITH gc_grace_seconds = 0") @@ -111,11 +113,11 @@ class TestBatch(Tester): "batchlog entries, so setting gc_grace_seconds too low on tables " "involved in an atomic batch might cause batchlog entries to expire " "before being replayed.") - debug(warning) - self.assertEquals(1, len(warning), "Cannot find the gc_grace_seconds warning message.") + logger.debug(warning) + assert 1 == len(warning), "Cannot find the gc_grace_seconds warning message." @since('3.0') - def unlogged_batch_gcgs_below_threshold_should_not_print_warning_test(self): + def test_unlogged_batch_gcgs_below_threshold_should_not_print_warning(self): """ Test that logged batch accepts regular mutations """ session = self.prepare() session.execute("ALTER TABLE users WITH gc_grace_seconds = 0") @@ -127,10 +129,10 @@ class TestBatch(Tester): """) node1 = self.cluster.nodelist()[0] warning = node1.grep_log("setting a too low gc_grace_seconds on tables involved in an atomic batch") - debug(warning) - self.assertEquals(0, len(warning), "Cannot find the gc_grace_seconds warning message.") + logger.debug(warning) + assert 0 == len(warning), "Cannot find the gc_grace_seconds warning message." - def logged_batch_rejects_counter_mutations_test(self): + def test_logged_batch_rejects_counter_mutations(self): """ Test that logged batch rejects counter mutations """ session = self.prepare() err = "Cannot include a counter statement in a logged batch" @@ -143,7 +145,7 @@ class TestBatch(Tester): APPLY BATCH """, matching=err) - def unlogged_batch_accepts_regular_mutations_test(self): + def test_unlogged_batch_accepts_regular_mutations(self): """ Test that unlogged batch accepts regular mutations """ session = self.prepare() session.execute(""" @@ -152,9 +154,9 @@ class TestBatch(Tester): INSERT INTO users (id, firstname, lastname) VALUES (2, 'Elizabeth', 'Swann') APPLY BATCH """) - assert_all(session, "SELECT * FROM users", [[0, u'Jack', u'Sparrow'], [2, u'Elizabeth', u'Swann']]) + assert_all(session, "SELECT * FROM users", [[0, 'Jack', 'Sparrow'], [2, 'Elizabeth', 'Swann']]) - def unlogged_batch_rejects_counter_mutations_test(self): + def test_unlogged_batch_rejects_counter_mutations(self): """ Test that unlogged batch rejects counter mutations """ session = self.prepare() err = "Counter and non-counter mutations cannot exist in the same batch" @@ -167,7 +169,7 @@ class TestBatch(Tester): APPLY BATCH """, matching=err) - def logged_batch_throws_uae_test(self): + def test_logged_batch_throws_uae(self): """ Test that logged batch throws UAE if there aren't enough live nodes """ session = self.prepare(nodes=3) [node.stop(wait_other_notice=True) for node in self.cluster.nodelist()[1:]] @@ -179,7 +181,7 @@ class TestBatch(Tester): APPLY BATCH """) - def logged_batch_doesnt_throw_uae_test(self): + def test_logged_batch_doesnt_throw_uae(self): """ Test that logged batch DOES NOT throw UAE if there are at least 2 live nodes """ session = self.prepare(nodes=3) self.cluster.nodelist()[-1].stop(wait_other_notice=True) @@ -192,10 +194,10 @@ class TestBatch(Tester): session.execute(query) self.cluster.nodelist()[-1].start(wait_for_binary_proto=True, wait_other_notice=True) - assert_all(session, "SELECT * FROM users", [[1, u'Will', u'Turner'], [0, u'Jack', u'Sparrow']], + assert_all(session, "SELECT * FROM users", [[1, 'Will', 'Turner'], [0, 'Jack', 'Sparrow']], cl=ConsistencyLevel.ALL) - def acknowledged_by_batchlog_not_set_when_batchlog_write_fails_test(self): + def test_acknowledged_by_batchlog_not_set_when_batchlog_write_fails(self): """ Test that acknowledged_by_batchlog is False if batchlog can't be written """ session = self.prepare(nodes=3, compression=False) # kill 2 of the 3 nodes (all the batchlog write candidates). @@ -207,7 +209,7 @@ class TestBatch(Tester): APPLY BATCH """, ConsistencyLevel.ONE, received_responses=0) - def acknowledged_by_batchlog_set_when_batchlog_write_succeeds_test(self): + def test_acknowledged_by_batchlog_set_when_batchlog_write_succeeds(self): """ Test that acknowledged_by_batchlog is True if batchlog can be written """ session = self.prepare(nodes=3, compression=False) # kill one of the nodes so that batchlog will be written, but the write will fail. @@ -219,7 +221,7 @@ class TestBatch(Tester): APPLY BATCH """, ConsistencyLevel.THREE, received_responses=2) - def batch_uses_proper_timestamp_test(self): + def test_batch_uses_proper_timestamp(self): """ Test that each statement will be executed with provided BATCH timestamp """ session = self.prepare() session.execute(""" @@ -231,7 +233,7 @@ class TestBatch(Tester): query = "SELECT id, writetime(firstname), writetime(lastname) FROM users" assert_all(session, query, [[1, 1111111111111111, 1111111111111111], [0, 1111111111111111, 1111111111111111]]) - def only_one_timestamp_is_valid_test(self): + def test_only_one_timestamp_is_valid(self): """ Test that TIMESTAMP must not be used in the statements within the batch. """ session = self.prepare() assert_invalid(session, """ @@ -241,7 +243,7 @@ class TestBatch(Tester): APPLY BATCH """, matching="Timestamp must be set either on BATCH or individual statements") - def each_statement_in_batch_uses_proper_timestamp_test(self): + def test_each_statement_in_batch_uses_proper_timestamp(self): """ Test that each statement will be executed with its own timestamp """ session = self.prepare() session.execute(""" @@ -254,9 +256,8 @@ class TestBatch(Tester): query = "SELECT id, writetime(firstname), writetime(lastname) FROM users" assert_all(session, query, [[1, 1111111111111112, 1111111111111112], [0, 1111111111111111, 1111111111111111]]) - def multi_table_batch_for_10554_test(self): + def test_multi_table_batch_for_10554(self): """ Test a batch on 2 tables having different columns, restarting the node afterwards, to reproduce CASSANDRA-10554 """ - session = self.prepare() # prepare() adds users and clicks but clicks is a counter table, so adding a random other table for this test. @@ -289,7 +290,7 @@ class TestBatch(Tester): assert_one(session, "SELECT * FROM dogs", [0, 'Pluto']) @since('3.0', max_version='3.x') - def logged_batch_compatibility_1_test(self): + def test_logged_batch_compatibility_1(self): """ @jira_ticket CASSANDRA-9673, test that logged batches still work with a mixed version cluster. @@ -298,7 +299,7 @@ class TestBatch(Tester): self._logged_batch_compatibility_test(0, 1, 'github:apache/cassandra-2.2', 2, 4) @since('3.0', max_version='3.x') - def batchlog_replay_compatibility_1_test(self): + def test_batchlog_replay_compatibility_1(self): """ @jira_ticket CASSANDRA-9673, test that logged batches still work with a mixed version cluster. @@ -307,8 +308,8 @@ class TestBatch(Tester): self._batchlog_replay_compatibility_test(0, 1, 'github:apache/cassandra-2.2', 2, 4) @since('3.0', max_version='3.x') - @skipIf(sys.platform == 'win32', 'Windows production support only on 2.2+') - def logged_batch_compatibility_2_test(self): + @pytest.mark.skipif(sys.platform == 'win32', reason='Windows production support only on 2.2+') + def test_logged_batch_compatibility_2(self): """ @jira_ticket CASSANDRA-9673, test that logged batches still work with a mixed version cluster. @@ -317,8 +318,8 @@ class TestBatch(Tester): self._logged_batch_compatibility_test(0, 1, 'github:apache/cassandra-2.1', 2, 3) @since('3.0', max_version='3.x') - @skipIf(sys.platform == 'win32', 'Windows production support only on 2.2+') - def logged_batch_compatibility_3_test(self): + @pytest.mark.skipif(sys.platform == 'win32', reason='Windows production support only on 2.2+') + def test_logged_batch_compatibility_3(self): """ @jira_ticket CASSANDRA-9673, test that logged batches still work with a mixed version cluster. @@ -327,7 +328,7 @@ class TestBatch(Tester): self._logged_batch_compatibility_test(0, 2, 'github:apache/cassandra-2.1', 1, 3) @since('3.0', max_version='3.x') - def logged_batch_compatibility_4_test(self): + def test_logged_batch_compatibility_4(self): """ @jira_ticket CASSANDRA-9673, test that logged batches still work with a mixed version cluster. @@ -336,7 +337,7 @@ class TestBatch(Tester): self._logged_batch_compatibility_test(2, 2, 'github:apache/cassandra-2.2', 1, 4) @since('3.0', max_version='3.x') - def batchlog_replay_compatibility_4_test(self): + def test_batchlog_replay_compatibility_4(self): """ @jira_ticket CASSANDRA-9673, test that logged batches still work with a mixed version cluster. @@ -345,8 +346,8 @@ class TestBatch(Tester): self._batchlog_replay_compatibility_test(2, 2, 'github:apache/cassandra-2.2', 1, 4) @since('3.0', max_version='3.x') - @skipIf(sys.platform == 'win32', 'Windows production support only on 2.2+') - def logged_batch_compatibility_5_test(self): + @pytest.mark.skipif(sys.platform == 'win32', reason='Windows production support only on 2.2+') + def test_logged_batch_compatibility_5(self): """ @jira_ticket CASSANDRA-9673, test that logged batches still work with a mixed version cluster. @@ -365,7 +366,7 @@ class TestBatch(Tester): session.execute(query) rows = session.execute("SELECT id, firstname, lastname FROM users") res = sorted(rows) - self.assertEquals([[0, 'Jack', 'Sparrow'], [1, 'Will', 'Turner']], [list(res[0]), list(res[1])]) + assert [[0, 'Jack', 'Sparrow'], [1, 'Will', 'Turner']], [list(res[0]) == list(res[1])] def _batchlog_replay_compatibility_test(self, coordinator_idx, current_nodes, previous_version, previous_nodes, protocol_version): session = self.prepare_mixed(coordinator_idx, current_nodes, previous_version, previous_nodes, @@ -373,7 +374,7 @@ class TestBatch(Tester): coordinator = self.cluster.nodelist()[coordinator_idx] coordinator.byteman_submit(['./byteman/fail_after_batchlog_write.btm']) - debug("Injected byteman scripts to enable batchlog replay {}".format(coordinator.name)) + logger.debug("Injected byteman scripts to enable batchlog replay {}".format(coordinator.name)) query = """ BEGIN BATCH @@ -387,7 +388,7 @@ class TestBatch(Tester): # 2 * write_request_timeout_in_ms ms: 1x timeout for all mutations to be written, # and another 1x timeout for batch remove mutation to be received. delay = 2 * coordinator.get_conf_option('write_request_timeout_in_ms') / 1000.0 + 1 - debug('Sleeping for {}s for the batches to not be skipped'.format(delay)) + logger.debug('Sleeping for {}s for the batches to not be skipped'.format(delay)) time.sleep(delay) total_batches_replayed = 0 @@ -398,18 +399,18 @@ class TestBatch(Tester): continue with JolokiaAgent(n) as jmx: - debug('Forcing batchlog replay for {}'.format(n.name)) + logger.debug('Forcing batchlog replay for {}'.format(n.name)) jmx.execute_method(blm, 'forceBatchlogReplay') batches_replayed = jmx.read_attribute(blm, 'TotalBatchesReplayed') - debug('{} batches replayed on node {}'.format(batches_replayed, n.name)) + logger.debug('{} batches replayed on node {}'.format(batches_replayed, n.name)) total_batches_replayed += batches_replayed - assert_greater_equal(total_batches_replayed, 2) + assert total_batches_replayed >= 2 for node in self.cluster.nodelist(): session = self.patient_exclusive_cql_connection(node, protocol_version=protocol_version) rows = sorted(session.execute('SELECT id, firstname, lastname FROM ks.users')) - self.assertEqual([[0, 'Jack', 'Sparrow'], [1, 'Will', 'Turner']], [list(rows[0]), list(rows[1])]) + assert [[0, 'Jack', 'Sparrow'], [1, 'Will', 'Turner']], [list(rows[0]) == list(rows[1])] def assert_timedout(self, session, query, cl, acknowledged_by=None, received_responses=None): @@ -420,12 +421,12 @@ class TestBatch(Tester): if received_responses is not None: msg = "Expecting received_responses to be {}, got: {}".format( received_responses, e.received_responses,) - self.assertEqual(e.received_responses, received_responses, msg) + assert e.received_responses == received_responses, msg except Unavailable as e: if received_responses is not None: msg = "Expecting alive_replicas to be {}, got: {}".format( received_responses, e.alive_replicas,) - self.assertEqual(e.alive_replicas, received_responses, msg) + assert e.alive_replicas == received_responses, msg except Exception as e: assert False, "Expecting TimedOutException, got:" + str(e) else: @@ -434,7 +435,7 @@ class TestBatch(Tester): def prepare(self, nodes=1, compression=True, version=None, protocol_version=None, install_byteman=False): if version: self.cluster.set_install_dir(version=version) - debug("Set cassandra dir to {}".format(self.cluster.get_install_dir())) + logger.debug("Set cassandra dir to {}".format(self.cluster.get_install_dir())) self.cluster.populate(nodes, install_byteman=install_byteman) @@ -449,7 +450,7 @@ class TestBatch(Tester): return session def create_schema(self, session, rf): - debug('Creating schema...') + logger.debug('Creating schema...') create_ks(session, 'ks', rf) session.execute(""" @@ -472,19 +473,22 @@ class TestBatch(Tester): time.sleep(.5) - def prepare_mixed(self, coordinator_idx, current_nodes, previous_version, previous_nodes, compression=True, protocol_version=None, install_byteman=False): - debug("Testing with {} node(s) at version '{}', {} node(s) at current version" + def prepare_mixed(self, coordinator_idx, current_nodes, previous_version, previous_nodes, compression=True, + protocol_version=None, install_byteman=False): + logger.debug("Testing with {} node(s) at version '{}', {} node(s) at current version" .format(previous_nodes, previous_version, current_nodes)) # start a cluster using the previous version - self.prepare(previous_nodes + current_nodes, compression, previous_version, protocol_version=protocol_version, install_byteman=install_byteman) + self.prepare(previous_nodes + current_nodes, compression, previous_version, protocol_version=protocol_version, + install_byteman=install_byteman) # then upgrade the current nodes to the current version but not the previous nodes - for i in xrange(current_nodes): + for i in range(current_nodes): node = self.cluster.nodelist()[i] self.upgrade_node(node) - session = self.patient_exclusive_cql_connection(self.cluster.nodelist()[coordinator_idx], protocol_version=protocol_version) + session = self.patient_exclusive_cql_connection(self.cluster.nodelist()[coordinator_idx], + protocol_version=protocol_version) session.execute('USE ks') return session @@ -492,13 +496,13 @@ class TestBatch(Tester): """ Upgrade a node to the current version """ - debug('Upgrading {} to the current version'.format(node.name)) - debug('Shutting down {}'.format(node.name)) + logger.debug('Upgrading {} to the current version'.format(node.name)) + logger.debug('Shutting down {}'.format(node.name)) node.stop(wait_other_notice=False) self.set_node_to_current_version(node) - debug("Set cassandra dir for {} to {}".format(node.name, node.get_install_dir())) + logger.debug("Set cassandra dir for {} to {}".format(node.name, node.get_install_dir())) # needed for jmx remove_perf_disable_shared_mem(node) # Restart nodes on new version - debug('Starting {} on new version ({})'.format(node.name, node.get_cassandra_version())) + logger.debug('Starting {} on new version ({})'.format(node.name, node.get_cassandra_version())) node.start(wait_other_notice=True, wait_for_binary_proto=True)
http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/bin/collect_known_failures.py ---------------------------------------------------------------------- diff --git a/bin/collect_known_failures.py b/bin/collect_known_failures.py deleted file mode 100644 index cf66cd1..0000000 --- a/bin/collect_known_failures.py +++ /dev/null @@ -1,58 +0,0 @@ -""" -A script that runs the tests with --collect-only, but instead of just printing -the tests' names, prints the information added by the tools.decorators.known_failure -decorator. - -This is basically a wrapper around the `nosetests` command, so it takes the -same arguments, though it appends some arguments to sys.argv. In particular, -if you want to look at particular kinds of known failures, use the `-a` -parameter on this script as you would for any of the known_failures attributes. -In addition, you should call it from the same directory from which you'd call -`nosetests`. -""" - -import json -import os -import sys -from functools import partial - -import nose - - -class PrintJiraURLPlugin(nose.plugins.Plugin): - enabled = True - - def options(self, parser, env): - super(PrintJiraURLPlugin, self).configure(parser, env) - - def testName(self, test): - _, test_module, test_name = test.address() - test_method_name = test_name.split('.')[-1] - test_method = getattr(test.test, test_method_name) - - get_attr_for_current_method = partial( - nose.plugins.attrib.get_method_attr, - method=test_method, - cls=test.test, - ) - - failure_annotations = get_attr_for_current_method(attr_name='known_failure') - - return json.dumps({ - 'module': test_module, - 'name': test_name, - 'failure_annotations': failure_annotations - }) - - -if __name__ == '__main__': - argv = sys.argv + ['--collect-only', '-v'] - - # The tests need a CASSANDRA_VERSION or CASSANDRA_DIR environment variable - # to run at all, so we specify it here. However, we have to do so by - # modifying os.environ, rather than using the env parameter to nose.main, - # because env does not do what you think it does: - # http://stackoverflow.com/a/28611124 - os.environ['CASSANDRA_VERSION'] = 'git:trunk' - - nose.main(addplugins=[PrintJiraURLPlugin()], argv=argv) http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/bootstrap_test.py ---------------------------------------------------------------------- diff --git a/bootstrap_test.py b/bootstrap_test.py index efa84ec..22dddcd 100644 --- a/bootstrap_test.py +++ b/bootstrap_test.py @@ -5,34 +5,40 @@ import shutil import tempfile import threading import time +import logging +import signal from cassandra import ConsistencyLevel from cassandra.concurrent import execute_concurrent_with_args from ccmlib.node import NodeError -from dtest import DISABLE_VNODES, Tester, debug, create_ks, create_cf +import pytest + +from dtest import Tester, create_ks, create_cf from tools.assertions import (assert_almost_equal, assert_bootstrap_state, assert_not_running, assert_one, assert_stderr_clean) from tools.data import query_c1c2 -from tools.decorators import no_vnodes, since from tools.intervention import InterruptBootstrap, KillOnBootstrap from tools.misc import new_node -from tools.misc import generate_ssl_stores - - -class BaseBootstrapTest(Tester): - __test__ = False - - allow_log_errors = True - ignore_log_patterns = ( - # This one occurs when trying to send the migration to a - # node that hasn't started yet, and when it does, it gets - # replayed and everything is fine. - r'Can\'t send migration request: node.*is down', - # ignore streaming error during bootstrap - r'Exception encountered during startup', - r'Streaming error occurred' - ) +from tools.misc import generate_ssl_stores, retry_till_success + +since = pytest.mark.since +logger = logging.getLogger(__name__) + +class TestBootstrap(Tester): + + @pytest.fixture(autouse=True) + def fixture_add_additional_log_patterns(self, fixture_dtest_setup): + fixture_dtest_setup.allow_log_errors = True + fixture_dtest_setup.ignore_log_patterns = ( + # This one occurs when trying to send the migration to a + # node that hasn't started yet, and when it does, it gets + # replayed and everything is fine. + r'Can\'t send migration request: node.*is down', + # ignore streaming error during bootstrap + r'Exception encountered during startup', + r'Streaming error occurred' + ) def _base_bootstrap_test(self, bootstrap=None, bootstrap_from_version=None, enable_ssl=None): @@ -48,14 +54,14 @@ class BaseBootstrapTest(Tester): cluster = self.cluster if enable_ssl: - debug("***using internode ssl***") - generate_ssl_stores(self.test_path) - cluster.enable_internode_ssl(self.test_path) + logger.debug("***using internode ssl***") + generate_ssl_stores(self.fixture_dtest_setup.test_path) + cluster.enable_internode_ssl(self.fixture_dtest_setup.test_path) tokens = cluster.balanced_tokens(2) cluster.set_configuration_options(values={'num_tokens': 1}) - debug("[node1, node2] tokens: %r" % (tokens,)) + logger.debug("[node1, node2] tokens: %r" % (tokens,)) keys = 10000 @@ -63,7 +69,7 @@ class BaseBootstrapTest(Tester): cluster.populate(1) node1 = cluster.nodelist()[0] if bootstrap_from_version: - debug("starting source node on version {}".format(bootstrap_from_version)) + logger.debug("starting source node on version {}".format(bootstrap_from_version)) node1.set_install_dir(version=bootstrap_from_version) node1.set_configuration_options(values={'initial_token': tokens[0]}) cluster.start(wait_other_notice=True) @@ -74,7 +80,7 @@ class BaseBootstrapTest(Tester): # record the size before inserting any of our own data empty_size = node1.data_size() - debug("node1 empty size : %s" % float(empty_size)) + logger.debug("node1 empty size : %s" % float(empty_size)) insert_statement = session.prepare("INSERT INTO ks.cf (key, c1, c2) VALUES (?, 'value1', 'value2')") execute_concurrent_with_args(session, insert_statement, [['k%d' % k] for k in range(keys)]) @@ -82,25 +88,23 @@ class BaseBootstrapTest(Tester): node1.flush() node1.compact() initial_size = node1.data_size() - debug("node1 size before bootstrapping node2: %s" % float(initial_size)) + logger.debug("node1 size before bootstrapping node2: %s" % float(initial_size)) # Reads inserted data all during the bootstrap process. We shouldn't # get any error - reader = self.go(lambda _: query_c1c2(session, random.randint(0, keys - 1), ConsistencyLevel.ONE)) + query_c1c2(session, random.randint(0, keys - 1), ConsistencyLevel.ONE) + session.shutdown() # Bootstrapping a new node in the current version node2 = bootstrap(cluster, tokens[1]) node2.compact() - reader.check() node1.cleanup() - debug("node1 size after cleanup: %s" % float(node1.data_size())) + logger.debug("node1 size after cleanup: %s" % float(node1.data_size())) node1.compact() - debug("node1 size after compacting: %s" % float(node1.data_size())) - time.sleep(.5) - reader.check() + logger.debug("node1 size after compacting: %s" % float(node1.data_size())) - debug("node2 size after compacting: %s" % float(node2.data_size())) + logger.debug("node2 size after compacting: %s" % float(node2.data_size())) size1 = float(node1.data_size()) size2 = float(node2.data_size()) @@ -108,40 +112,34 @@ class BaseBootstrapTest(Tester): assert_almost_equal(float(initial_size - empty_size), 2 * (size1 - float(empty_size))) assert_bootstrap_state(self, node2, 'COMPLETED') - if bootstrap_from_version: - self.assertTrue(node2.grep_log('does not support keep-alive', filename='debug.log')) - -class TestBootstrap(BaseBootstrapTest): - __test__ = True - - @no_vnodes() - def simple_bootstrap_test_with_ssl(self): + @pytest.mark.no_vnodes + def test_simple_bootstrap_with_ssl(self): self._base_bootstrap_test(enable_ssl=True) - @no_vnodes() - def simple_bootstrap_test(self): + @pytest.mark.no_vnodes + def test_simple_bootstrap(self): self._base_bootstrap_test() - @no_vnodes() - def bootstrap_on_write_survey_test(self): + @pytest.mark.no_vnodes + def test_bootstrap_on_write_survey(self): def bootstrap_on_write_survey_and_join(cluster, token): node2 = new_node(cluster) node2.set_configuration_options(values={'initial_token': token}) node2.start(jvm_args=["-Dcassandra.write_survey=true"], wait_for_binary_proto=True) - self.assertTrue(len(node2.grep_log('Startup complete, but write survey mode is active, not becoming an active ring member.'))) + assert len(node2.grep_log('Startup complete, but write survey mode is active, not becoming an active ring member.')) assert_bootstrap_state(self, node2, 'IN_PROGRESS') node2.nodetool("join") - self.assertTrue(len(node2.grep_log('Leaving write survey mode and joining ring at operator request'))) + assert len(node2.grep_log('Leaving write survey mode and joining ring at operator request')) return node2 self._base_bootstrap_test(bootstrap_on_write_survey_and_join) @since('3.10') - @no_vnodes() - def simple_bootstrap_test_small_keepalive_period(self): + @pytest.mark.no_vnodes + def test_simple_bootstrap_small_keepalive_period(self): """ @jira_ticket CASSANDRA-11841 Test that bootstrap completes if it takes longer than streaming_socket_timeout_in_ms or @@ -157,7 +155,7 @@ class TestBootstrap(BaseBootstrapTest): cluster.populate(1) node1 = cluster.nodelist()[0] - debug("Setting up byteman on {}".format(node1.name)) + logger.debug("Setting up byteman on {}".format(node1.name)) # set up byteman node1.byteman_port = '8100' node1.import_config_files() @@ -169,7 +167,7 @@ class TestBootstrap(BaseBootstrapTest): 'compaction(strategy=SizeTieredCompactionStrategy, enabled=false)']) cluster.flush() - debug("Submitting byteman script to {} to".format(node1.name)) + logger.debug("Submitting byteman script to {} to".format(node1.name)) # Sleep longer than streaming_socket_timeout_in_ms to make sure the node will not be killed node1.byteman_submit(['./byteman/stream_5s_sleep.btm']) @@ -181,16 +179,15 @@ class TestBootstrap(BaseBootstrapTest): assert_bootstrap_state(self, node2, 'COMPLETED') for node in cluster.nodelist(): - self.assertTrue(node.grep_log('Scheduling keep-alive task with 2s period.', filename='debug.log')) - self.assertTrue(node.grep_log('Sending keep-alive', filename='debug.log')) - self.assertTrue(node.grep_log('Received keep-alive', filename='debug.log')) + assert node.grep_log('Scheduling keep-alive task with 2s period.', filename='debug.log') + assert node.grep_log('Sending keep-alive', filename='debug.log') + assert node.grep_log('Received keep-alive', filename='debug.log') - def simple_bootstrap_test_nodata(self): + def test_simple_bootstrap_nodata(self): """ @jira_ticket CASSANDRA-11010 Test that bootstrap completes if streaming from nodes with no data """ - cluster = self.cluster # Create a two-node cluster cluster.populate(2) @@ -202,7 +199,7 @@ class TestBootstrap(BaseBootstrapTest): assert_bootstrap_state(self, node3, 'COMPLETED') - def read_from_bootstrapped_node_test(self): + def test_read_from_bootstrapped_node(self): """ Test bootstrapped node sees existing data @jira_ticket CASSANDRA-6648 @@ -223,18 +220,18 @@ class TestBootstrap(BaseBootstrapTest): session = self.patient_exclusive_cql_connection(node4) new_rows = list(session.execute("SELECT * FROM %s" % (stress_table,))) - self.assertEquals(original_rows, new_rows) + assert original_rows == new_rows - def consistent_range_movement_true_with_replica_down_should_fail_test(self): + def test_consistent_range_movement_true_with_replica_down_should_fail(self): self._bootstrap_test_with_replica_down(True) - def consistent_range_movement_false_with_replica_down_should_succeed_test(self): + def test_consistent_range_movement_false_with_replica_down_should_succeed(self): self._bootstrap_test_with_replica_down(False) - def consistent_range_movement_true_with_rf1_should_fail_test(self): + def test_consistent_range_movement_true_with_rf1_should_fail(self): self._bootstrap_test_with_replica_down(True, rf=1) - def consistent_range_movement_false_with_rf1_should_succeed_test(self): + def test_consistent_range_movement_false_with_rf1_should_succeed(self): self._bootstrap_test_with_replica_down(False, rf=1) def _bootstrap_test_with_replica_down(self, consistent_range_movement, rf=2): @@ -249,10 +246,10 @@ class TestBootstrap(BaseBootstrapTest): node3_token = None # Make token assignment deterministic - if DISABLE_VNODES: + if not self.dtest_config.use_vnodes: cluster.set_configuration_options(values={'num_tokens': 1}) tokens = cluster.balanced_tokens(3) - debug("non-vnode tokens: %r" % (tokens,)) + logger.debug("non-vnode tokens: %r" % (tokens,)) node1.set_configuration_options(values={'initial_token': tokens[0]}) node2.set_configuration_options(values={'initial_token': tokens[2]}) node3_token = tokens[1] # Add node 3 between node1 and node2 @@ -283,7 +280,7 @@ class TestBootstrap(BaseBootstrapTest): # with rf=1 and cassandra.consistent.rangemovement=false, missing sources are ignored if not consistent_range_movement and rf == 1: node3.watch_log_for("Unable to find sufficient sources for streaming range") - self.assertTrue(node3.is_running()) + assert node3.is_running() assert_bootstrap_state(self, node3, 'COMPLETED') else: if consistent_range_movement: @@ -293,11 +290,10 @@ class TestBootstrap(BaseBootstrapTest): assert_not_running(node3) @since('2.2') - def resumable_bootstrap_test(self): + def test_resumable_bootstrap(self): """ Test resuming bootstrap after data streaming failure """ - cluster = self.cluster cluster.populate(2) @@ -323,7 +319,7 @@ class TestBootstrap(BaseBootstrapTest): node3.watch_log_for("Starting listening for CQL clients") mark = node3.mark_log() # check if node3 is still in bootstrap mode - assert_bootstrap_state(self, node3, 'IN_PROGRESS') + retry_till_success(assert_bootstrap_state, tester=self, node=node3, expected_bootstrap_state='IN_PROGRESS', timeout=120) # bring back node1 and invoke nodetool bootstrap to resume bootstrapping node3.nodetool('bootstrap resume') @@ -334,17 +330,16 @@ class TestBootstrap(BaseBootstrapTest): # cleanup to guarantee each node will only have sstables of its ranges cluster.cleanup() - debug("Check data is present") + logger.debug("Check data is present") # Let's check stream bootstrap completely transferred data stdout, stderr, _ = node3.stress(['read', 'n=1k', 'no-warmup', '-schema', 'replication(factor=2)', '-rate', 'threads=8']) if stdout is not None: - self.assertNotIn("FAILURE", stdout) + assert "FAILURE" not in stdout.decode("utf-8") @since('2.2') - def bootstrap_with_reset_bootstrap_state_test(self): + def test_bootstrap_with_reset_bootstrap_state(self): """Test bootstrap with resetting bootstrap progress""" - cluster = self.cluster cluster.set_configuration_options(values={'stream_throughput_outbound_megabits_per_sec': 1}) cluster.populate(2).start(wait_other_notice=True) @@ -367,7 +362,7 @@ class TestBootstrap(BaseBootstrapTest): node1.start() # restart node3 bootstrap with resetting bootstrap progress - node3.stop() + node3.stop(signal_event=signal.SIGKILL) mark = node3.mark_log() node3.start(jvm_args=["-Dcassandra.reset_bootstrap_progress=true"]) # check if we reset bootstrap state @@ -378,7 +373,7 @@ class TestBootstrap(BaseBootstrapTest): # check if 2nd bootstrap succeeded assert_bootstrap_state(self, node3, 'COMPLETED') - def manual_bootstrap_test(self): + def test_manual_bootstrap(self): """ Test adding a new node and bootstrapping it manually. No auto_bootstrap. This test also verify that all data are OK after the addition of the new node. @@ -403,14 +398,13 @@ class TestBootstrap(BaseBootstrapTest): node1.cleanup() current_rows = list(session.execute("SELECT * FROM %s" % stress_table)) - self.assertEquals(original_rows, current_rows) + assert original_rows == current_rows - def local_quorum_bootstrap_test(self): + def test_local_quorum_bootstrap(self): """ Test that CL local_quorum works while a node is bootstrapping. @jira_ticket CASSANDRA-8058 """ - cluster = self.cluster cluster.populate([1, 1]) cluster.start() @@ -453,16 +447,16 @@ class TestBootstrap(BaseBootstrapTest): '-rate', 'threads=5', '-errors', 'retries=2']) - debug(out) + logger.debug(out) assert_stderr_clean(err) regex = re.compile("Operation.+error inserting key.+Exception") - failure = regex.search(out) - self.assertIsNone(failure, "Error during stress while bootstrapping") + failure = regex.search(str(out)) + assert failure is None, "Error during stress while bootstrapping" - def shutdown_wiped_node_cannot_join_test(self): + def test_shutdown_wiped_node_cannot_join(self): self._wiped_node_cannot_join_test(gently=True) - def killed_wiped_node_cannot_join_test(self): + def test_killed_wiped_node_cannot_join(self): self._wiped_node_cannot_join_test(gently=False) def _wiped_node_cannot_join_test(self, gently): @@ -490,7 +484,7 @@ class TestBootstrap(BaseBootstrapTest): node4.start(wait_for_binary_proto=True) session = self.patient_cql_connection(node4) - self.assertEquals(original_rows, list(session.execute("SELECT * FROM {}".format(stress_table,)))) + assert original_rows == list(session.execute("SELECT * FROM {}".format(stress_table,))) # Stop the new node and wipe its data node4.stop(gently=gently) @@ -500,7 +494,7 @@ class TestBootstrap(BaseBootstrapTest): node4.start(no_wait=True, wait_other_notice=False) node4.watch_log_for("A node with address {} already exists, cancelling join".format(node4.address_for_current_version_slashy()), from_mark=mark) - def decommissioned_wiped_node_can_join_test(self): + def test_decommissioned_wiped_node_can_join(self): """ @jira_ticket CASSANDRA-9765 Test that if we decommission a node and then wipe its data, it can join the cluster. @@ -523,7 +517,7 @@ class TestBootstrap(BaseBootstrapTest): node4.start(wait_for_binary_proto=True, wait_other_notice=True) session = self.patient_cql_connection(node4) - self.assertEquals(original_rows, list(session.execute("SELECT * FROM {}".format(stress_table,)))) + assert original_rows == list(session.execute("SELECT * FROM {}".format(stress_table,))) # Decommission the new node and wipe its data node4.decommission() @@ -534,7 +528,7 @@ class TestBootstrap(BaseBootstrapTest): node4.start(wait_other_notice=True) node4.watch_log_for("JOINING:", from_mark=mark) - def decommissioned_wiped_node_can_gossip_to_single_seed_test(self): + def test_decommissioned_wiped_node_can_gossip_to_single_seed(self): """ @jira_ticket CASSANDRA-8072 @jira_ticket CASSANDRA-8422 @@ -559,26 +553,26 @@ class TestBootstrap(BaseBootstrapTest): session.execute("ALTER KEYSPACE system_traces WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':'1'};") # Decommision the new node and kill it - debug("Decommissioning & stopping node2") + logger.debug("Decommissioning & stopping node2") node2.decommission() node2.stop(wait_other_notice=False) # Wipe its data for data_dir in node2.data_directories(): - debug("Deleting {}".format(data_dir)) + logger.debug("Deleting {}".format(data_dir)) shutil.rmtree(data_dir) commitlog_dir = os.path.join(node2.get_path(), 'commitlogs') - debug("Deleting {}".format(commitlog_dir)) + logger.debug("Deleting {}".format(commitlog_dir)) shutil.rmtree(commitlog_dir) # Now start it, it should be allowed to join mark = node2.mark_log() - debug("Restarting wiped node2") + logger.debug("Restarting wiped node2") node2.start(wait_other_notice=False) node2.watch_log_for("JOINING:", from_mark=mark) - def failed_bootstrap_wiped_node_can_join_test(self): + def test_failed_bootstrap_wiped_node_can_join(self): """ @jira_ticket CASSANDRA-9765 Test that if a node fails to bootstrap, it can join the cluster even if the data is wiped. @@ -607,7 +601,7 @@ class TestBootstrap(BaseBootstrapTest): node2.start() t.join() - self.assertFalse(node2.is_running()) + assert not node2.is_running() # wipe any data for node2 self._cleanup(node2) @@ -617,7 +611,7 @@ class TestBootstrap(BaseBootstrapTest): node2.watch_log_for("JOINING:", from_mark=mark) @since('2.1.1') - def simultaneous_bootstrap_test(self): + def test_simultaneous_bootstrap(self): """ Attempt to bootstrap two nodes at once, to assert the second bootstrapped node fails, and does not interfere. @@ -660,7 +654,7 @@ class TestBootstrap(BaseBootstrapTest): # Repeat the select count(*) query, to help catch # bugs like 9484, where count(*) fails at higher # data loads. - for _ in xrange(5): + for _ in range(5): assert_one(session, "SELECT count(*) from keyspace1.standard1", [500000], cl=ConsistencyLevel.ONE) def test_cleanup(self): @@ -673,7 +667,7 @@ class TestBootstrap(BaseBootstrapTest): cluster.populate(1) cluster.start(wait_for_binary_proto=True) node1, = cluster.nodelist() - for x in xrange(0, 5): + for x in range(0, 5): node1.stress(['write', 'n=100k', 'no-warmup', '-schema', 'compaction(strategy=SizeTieredCompactionStrategy,enabled=false)', 'replication(factor=1)', '-rate', 'threads=10']) node1.flush() node2 = new_node(cluster) @@ -682,20 +676,21 @@ class TestBootstrap(BaseBootstrapTest): failed = threading.Event() jobs = 1 thread = threading.Thread(target=self._monitor_datadir, args=(node1, event, len(node1.get_sstables("keyspace1", "standard1")), jobs, failed)) + thread.setDaemon(True) thread.start() node1.nodetool("cleanup -j {} keyspace1 standard1".format(jobs)) event.set() thread.join() - self.assertFalse(failed.is_set()) + assert not failed.is_set() def _monitor_datadir(self, node, event, basecount, jobs, failed): while True: sstables = [s for s in node.get_sstables("keyspace1", "standard1") if "tmplink" not in s] - debug("---") + logger.debug("---") for sstable in sstables: - debug(sstable) + logger.debug(sstable) if len(sstables) > basecount + jobs: - debug("Current count is {}, basecount was {}".format(len(sstables), basecount)) + logger.debug("Current count is {}, basecount was {}".format(len(sstables), basecount)) failed.set() return if event.is_set(): @@ -705,6 +700,6 @@ class TestBootstrap(BaseBootstrapTest): def _cleanup(self, node): commitlog_dir = os.path.join(node.get_path(), 'commitlogs') for data_dir in node.data_directories(): - debug("Deleting {}".format(data_dir)) + logger.debug("Deleting {}".format(data_dir)) shutil.rmtree(data_dir) shutil.rmtree(commitlog_dir) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org