http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/repair_tests/repair_test.py ---------------------------------------------------------------------- diff --git a/repair_tests/repair_test.py b/repair_tests/repair_test.py index 238871f..59910e0 100644 --- a/repair_tests/repair_test.py +++ b/repair_tests/repair_test.py @@ -3,18 +3,21 @@ import os.path import threading import time import re +import pytest +import logging + from collections import namedtuple from threading import Thread -from unittest import skip, skipIf from cassandra import ConsistencyLevel from cassandra.query import SimpleStatement from ccmlib.node import ToolError -from nose.plugins.attrib import attr -from dtest import CASSANDRA_VERSION_FROM_BUILD, FlakyRetryPolicy, Tester, debug, create_ks, create_cf +from dtest import CASSANDRA_VERSION_FROM_BUILD, FlakyRetryPolicy, Tester, create_ks, create_cf from tools.data import insert_c1c2, query_c1c2 -from tools.decorators import no_vnodes, since + +since = pytest.mark.since +logger = logging.getLogger(__name__) def _repair_options(version, ks='', cf=None, sequential=True): @@ -46,7 +49,6 @@ def _repair_options(version, ks='', cf=None, sequential=True): class BaseRepairTest(Tester): - __test__ = False def check_rows_on_node(self, node_to_check, rows, found=None, missings=None, restart=True): """ @@ -64,14 +66,14 @@ class BaseRepairTest(Tester): missings = [] stopped_nodes = [] - for node in self.cluster.nodes.values(): + for node in list(self.cluster.nodes.values()): if node.is_running() and node is not node_to_check: stopped_nodes.append(node) node.stop(wait_other_notice=True) session = self.patient_exclusive_cql_connection(node_to_check, 'ks') - result = list(session.execute("SELECT * FROM cf LIMIT {}".format(rows * 2))) - self.assertEqual(len(result), rows) + result = list(session.execute("SELECT * FROM cf LIMIT {}".format(rows * 2), timeout=10)) + assert len(result) == rows for k in found: query_c1c2(session, k, ConsistencyLevel.ONE) @@ -79,7 +81,7 @@ class BaseRepairTest(Tester): for k in missings: query = SimpleStatement("SELECT c1, c2 FROM cf WHERE key='k{}'".format(k), consistency_level=ConsistencyLevel.ONE) res = list(session.execute(query)) - self.assertEqual(len(filter(lambda x: len(x) != 0, res)), 0, res) + assert len([x for x in res if len(x) != 0]) == 0, res if restart: for node in stopped_nodes: @@ -92,7 +94,7 @@ class BaseRepairTest(Tester): # interfere with the test (this must be after the populate) cluster.set_configuration_options(values={'hinted_handoff_enabled': False}) cluster.set_batch_commitlog(enabled=True) - debug("Starting cluster..") + logger.debug("Starting cluster..") cluster.populate(3).start() node1, node2, node3 = cluster.nodelist() @@ -101,13 +103,13 @@ class BaseRepairTest(Tester): create_cf(session, 'cf', read_repair=0.0, columns={'c1': 'text', 'c2': 'text'}) # Insert 1000 keys, kill node 3, insert 1 key, restart node 3, insert 1000 more keys - debug("Inserting data...") + logger.debug("Inserting data...") insert_c1c2(session, n=1000, consistency=ConsistencyLevel.ALL) node3.flush() node3.stop(wait_other_notice=True) insert_c1c2(session, keys=(1000, ), consistency=ConsistencyLevel.TWO) node3.start(wait_other_notice=True, wait_for_binary_proto=True) - insert_c1c2(session, keys=range(1001, 2001), consistency=ConsistencyLevel.ALL) + insert_c1c2(session, keys=list(range(1001, 2001)), consistency=ConsistencyLevel.ALL) cluster.flush() @@ -116,46 +118,46 @@ class BaseRepairTest(Tester): node1, node2, node3 = cluster.nodelist() # Verify that node3 has only 2000 keys - debug("Checking data on node3...") + logger.debug("Checking data on node3...") self.check_rows_on_node(node3, 2000, missings=[1000]) # Verify that node1 has 2001 keys - debug("Checking data on node1...") + logger.debug("Checking data on node1...") self.check_rows_on_node(node1, 2001, found=[1000]) # Verify that node2 has 2001 keys - debug("Checking data on node2...") + logger.debug("Checking data on node2...") self.check_rows_on_node(node2, 2001, found=[1000]) time.sleep(10) # see CASSANDRA-4373 # Run repair start = time.time() - debug("starting repair...") + logger.debug("starting repair...") node1.repair(_repair_options(self.cluster.version(), ks='ks', sequential=sequential)) - debug("Repair time: {end}".format(end=time.time() - start)) + logger.debug("Repair time: {end}".format(end=time.time() - start)) # Validate that only one range was transfered out_of_sync_logs = node1.grep_log("{} and {} have ([0-9]+) range\(s\) out of sync".format(cluster.address_regex(), cluster.address_regex())) - self.assertEqual(len(out_of_sync_logs), 2, "Lines matching: " + str([elt[0] for elt in out_of_sync_logs])) + assert len(out_of_sync_logs) == 2, "Lines matching: " + str([elt[0] for elt in out_of_sync_logs]) valid_out_of_sync_pairs = [{node1.address(), node3.address()}, {node2.address(), node3.address()}] for line, m in out_of_sync_logs: num_out_of_sync_ranges, out_of_sync_nodes = m.group(3), {m.group(1), m.group(2)} - self.assertEqual(int(num_out_of_sync_ranges), 1, "Expecting 1 range out of sync for {}, but saw {}".format(out_of_sync_nodes, line)) - self.assertIn(out_of_sync_nodes, valid_out_of_sync_pairs, str(out_of_sync_nodes)) + assert int(num_out_of_sync_ranges) == 1, \ + "Expecting 1 range out of sync for {}, but saw {}".format(out_of_sync_nodes, num_out_of_sync_ranges) + assert out_of_sync_nodes, valid_out_of_sync_pairs in str(out_of_sync_nodes) # Check node3 now has the key self.check_rows_on_node(node3, 2001, found=[1000], restart=False) class TestRepair(BaseRepairTest): - __test__ = True - @since('2.2.1', '4') - def no_anticompaction_after_dclocal_repair_test(self): + @since('2.2.1', max_version='4') + def test_no_anticompaction_after_dclocal_repair(self): """ * Launch a four node, two DC cluster * Start a -local repair on node1 in dc1 @@ -166,44 +168,44 @@ class TestRepair(BaseRepairTest): @jira_ticket CASSANDRA-10422 """ cluster = self.cluster - debug("Starting cluster..") + logger.debug("Starting cluster..") cluster.populate([2, 2]).start(wait_for_binary_proto=True) node1_1, node2_1, node1_2, node2_2 = cluster.nodelist() node1_1.stress(stress_options=['write', 'n=50K', 'no-warmup', 'cl=ONE', '-schema', 'replication(factor=4)', '-rate', 'threads=50']) node1_1.nodetool("repair -local keyspace1 standard1") - self.assertTrue(node1_1.grep_log("Not a global repair")) - self.assertTrue(node2_1.grep_log("Not a global repair")) + assert node1_1.grep_log("Not a global repair") + assert node2_1.grep_log("Not a global repair") # dc2 should not see these messages: - self.assertFalse(node1_2.grep_log("Not a global repair")) - self.assertFalse(node2_2.grep_log("Not a global repair")) + assert not node1_2.grep_log("Not a global repair") + assert not node2_2.grep_log("Not a global repair") # and no nodes should do anticompaction: for node in cluster.nodelist(): - self.assertFalse(node.grep_log("Starting anticompaction")) + assert not node.grep_log("Starting anticompaction") - @skipIf(CASSANDRA_VERSION_FROM_BUILD == '3.9', "Test doesn't run on 3.9") - def nonexistent_table_repair_test(self): + @pytest.mark.skipif(CASSANDRA_VERSION_FROM_BUILD == '3.9', reason="Test doesn't run on 3.9") + def test_nonexistent_table_repair(self): """ * Check that repairing a non-existent table fails @jira_ticket CASSANDRA-12279 """ - self.ignore_log_patterns = [r'Unknown keyspace/cf pair'] + self.fixture_dtest_setup.ignore_log_patterns = [r'Unknown keyspace/cf pair'] cluster = self.cluster - debug('Starting nodes') + logger.debug('Starting nodes') cluster.populate(2).start(wait_for_binary_proto=True) node1, _ = cluster.nodelist() - debug('Creating keyspace and tables') + logger.debug('Creating keyspace and tables') node1.stress(stress_options=['write', 'n=1', 'no-warmup', 'cl=ONE', '-schema', 'replication(factor=2)', '-rate', 'threads=1']) - debug('Repairing non-existent table') + logger.debug('Repairing non-existent table') def repair_non_existent_table(): global nodetool_error try: node1.nodetool('repair keyspace1 standard2') - except Exception, e: + except Exception as e: nodetool_error = e # Launch in a external thread so it does not hang process @@ -211,20 +213,20 @@ class TestRepair(BaseRepairTest): t.start() t.join(timeout=60) - self.assertFalse(t.isAlive(), 'Repair thread on inexistent table is still running') + assert not t.is_alive(), 'Repair thread on inexistent table is still running' if self.cluster.version() >= '2.2': node1.watch_log_for("Unknown keyspace/cf pair", timeout=60) # Repair only finishes with error status after CASSANDRA-12508 on 3.0+ if self.cluster.version() >= '3.0': - self.assertTrue('nodetool_error' in globals() and isinstance(nodetool_error, ToolError), - 'Repair thread on inexistent table did not throw exception') - debug(nodetool_error.message) - self.assertTrue('Unknown keyspace/cf pair' in nodetool_error.message, - 'Repair thread on inexistent table did not detect inexistent table.') + assert 'nodetool_error' in globals() and isinstance(nodetool_error, ToolError), \ + 'Repair thread on inexistent table did not throw exception' + logger.debug(repr(nodetool_error)) + assert 'Unknown keyspace/cf pair' in repr(nodetool_error),\ + 'Repair thread on inexistent table did not detect inexistent table.' - @since('2.2.1', '4') - def no_anticompaction_after_hostspecific_repair_test(self): + @since('2.2.1', max_version='4') + def test_no_anticompaction_after_hostspecific_repair(self): """ * Launch a four node, two DC cluster * Start a repair on all nodes, by enumerating with -hosts @@ -234,18 +236,18 @@ class TestRepair(BaseRepairTest): @jira_ticket CASSANDRA-10422 """ cluster = self.cluster - debug("Starting cluster..") + logger.debug("Starting cluster..") cluster.populate([2, 2]).start(wait_for_binary_proto=True) node1_1, node2_1, node1_2, node2_2 = cluster.nodelist() node1_1.stress(stress_options=['write', 'n=100K', 'no-warmup', 'cl=ONE', '-schema', 'replication(factor=4)', '-rate', 'threads=50']) node1_1.nodetool("repair -hosts 127.0.0.1,127.0.0.2,127.0.0.3,127.0.0.4 keyspace1 standard1") for node in cluster.nodelist(): - self.assertTrue(node.grep_log("Not a global repair")) + assert node.grep_log("Not a global repair") for node in cluster.nodelist(): - self.assertFalse(node.grep_log("Starting anticompaction")) + assert not node.grep_log("Starting anticompaction") - @since('2.2.4', '4') - def no_anticompaction_after_subrange_repair_test(self): + @since('2.2.4', max_version='4') + def test_no_anticompaction_after_subrange_repair(self): """ * Launch a three node, two DC cluster * Start a repair on a token range @@ -255,15 +257,15 @@ class TestRepair(BaseRepairTest): @jira_ticket CASSANDRA-10422 """ cluster = self.cluster - debug("Starting cluster..") + logger.debug("Starting cluster..") cluster.populate(3).start(wait_for_binary_proto=True) node1, node2, node3 = cluster.nodelist() node1.stress(stress_options=['write', 'n=50K', 'no-warmup', 'cl=ONE', '-schema', 'replication(factor=3)', '-rate', 'threads=50']) node1.nodetool("repair -st 0 -et 1000 keyspace1 standard1") for node in cluster.nodelist(): - self.assertTrue(node.grep_log("Not a global repair")) + assert node.grep_log("Not a global repair") for node in cluster.nodelist(): - self.assertFalse(node.grep_log("Starting anticompaction")) + assert not node.grep_log("Starting anticompaction") def _get_repaired_data(self, node, keyspace): """ @@ -276,17 +278,17 @@ class TestRepair(BaseRepairTest): out = node.run_sstablemetadata(keyspace=keyspace).stdout def matches(pattern): - return filter(None, [pattern.match(l) for l in out.split('\n')]) + return [_f for _f in [pattern.match(l) for l in out.decode("utf-8").split('\n')] if _f] names = [m.group(1) for m in matches(_sstable_name)] repaired_times = [int(m.group(1)) for m in matches(_repaired_at)] - self.assertTrue(names) - self.assertTrue(repaired_times) + assert names + assert repaired_times return [_sstable_data(*a) for a in zip(names, repaired_times)] - @since('2.2.10', '4') - def no_anticompaction_of_already_repaired_test(self): + @since('2.2.10', max_version='4') + def test_no_anticompaction_of_already_repaired(self): """ * Launch three node cluster and stress with RF2 * Do incremental repair to have all sstables flagged as repaired @@ -294,9 +296,8 @@ class TestRepair(BaseRepairTest): * Verify that none of the already repaired sstables have been anti-compacted again @jira_ticket CASSANDRA-13153 """ - cluster = self.cluster - debug("Starting cluster..") + logger.debug("Starting cluster..") # disable JBOD conf since the test expects sstables to be on the same disk cluster.set_datadir_count(1) cluster.populate(3).start(wait_for_binary_proto=True) @@ -309,7 +310,7 @@ class TestRepair(BaseRepairTest): node1.nodetool("repair keyspace1 standard1") meta = self._get_repaired_data(node1, 'keyspace1') repaired = set([m for m in meta if m.repaired > 0]) - self.assertEquals(len(repaired), len(meta)) + assert len(repaired) == len(meta) # stop node2, stress and start full repair to find out how synced ranges affect repairedAt values node2.stop(wait_other_notice=True) @@ -320,10 +321,10 @@ class TestRepair(BaseRepairTest): meta = self._get_repaired_data(node1, 'keyspace1') repairedAfterFull = set([m for m in meta if m.repaired > 0]) # already repaired sstables must remain untouched - self.assertEquals(repaired.intersection(repairedAfterFull), repaired) + assert repaired.intersection(repairedAfterFull) == repaired @since('2.2.1', '4') - def anticompaction_after_normal_repair_test(self): + def test_anticompaction_after_normal_repair(self): """ * Launch a four node, two DC cluster * Start a normal repair @@ -331,80 +332,80 @@ class TestRepair(BaseRepairTest): @jira_ticket CASSANDRA-10422 """ cluster = self.cluster - debug("Starting cluster..") + logger.debug("Starting cluster..") cluster.populate([2, 2]).start(wait_for_binary_proto=True) node1_1, node2_1, node1_2, node2_2 = cluster.nodelist() node1_1.stress(stress_options=['write', 'n=50K', 'no-warmup', 'cl=ONE', '-schema', 'replication(factor=4)']) node1_1.nodetool("repair keyspace1 standard1") for node in cluster.nodelist(): - self.assertTrue("Starting anticompaction") + assert "Starting anticompaction" - def simple_sequential_repair_test(self): + def test_simple_sequential_repair(self): """ Calls simple repair test with a sequential repair """ self._simple_repair(sequential=True) - def simple_parallel_repair_test(self): + def test_simple_parallel_repair(self): """ Calls simple repair test with a parallel repair """ self._simple_repair(sequential=False) - def empty_vs_gcable_sequential_repair_test(self): + def test_empty_vs_gcable_sequential_repair(self): """ Calls empty_vs_gcable repair test with a sequential repair """ self._empty_vs_gcable_no_repair(sequential=True) - def empty_vs_gcable_parallel_repair_test(self): + def test_empty_vs_gcable_parallel_repair(self): """ Calls empty_vs_gcable repair test with a parallel repair """ self._empty_vs_gcable_no_repair(sequential=False) - def range_tombstone_digest_sequential_repair_test(self): + def test_range_tombstone_digest_sequential_repair(self): """ Calls range_tombstone_digest with a sequential repair """ self._range_tombstone_digest(sequential=True) - def range_tombstone_digest_parallel_repair_test(self): + def test_range_tombstone_digest_parallel_repair(self): """ Calls range_tombstone_digest with a parallel repair """ self._range_tombstone_digest(sequential=False) @since('2.1') - def shadowed_cell_digest_sequential_repair_test(self): + def test_shadowed_cell_digest_sequential_repair(self): """ Calls _cell_shadowed_by_range_tombstone with sequential repair """ self._cell_shadowed_by_range_tombstone(sequential=True) @since('2.1') - def shadowed_cell_digest_parallel_repair_test(self): + def test_shadowed_cell_digest_parallel_repair(self): """ Calls _cell_shadowed_by_range_tombstone with parallel repair """ self._cell_shadowed_by_range_tombstone(sequential=False) @since('3.0') - def shadowed_range_tombstone_digest_sequential_repair_test(self): + def test_shadowed_range_tombstone_digest_sequential_repair(self): """ Calls _range_tombstone_shadowed_by_range_tombstone with sequential repair """ self._range_tombstone_shadowed_by_range_tombstone(sequential=True) @since('3.0') - def shadowed_range_tombstone_digest_parallel_repair_test(self): + def test_shadowed_range_tombstone_digest_parallel_repair(self): """ Calls _range_tombstone_shadowed_by_range_tombstone with parallel repair """ self._range_tombstone_shadowed_by_range_tombstone(sequential=False) - @no_vnodes() - def simple_repair_order_preserving_test(self): + @pytest.mark.no_vnodes + def test_simple_repair_order_preserving(self): """ Calls simple repair test with OPP and sequential repair @jira_ticket CASSANDRA-5220 @@ -484,18 +485,18 @@ class TestRepair(BaseRepairTest): node2.stop(wait_other_notice=True) for cf in ['cf1', 'cf2']: # insert some data - for i in xrange(0, 10): - for j in xrange(0, 1000): + for i in range(0, 10): + for j in range(0, 1000): query = SimpleStatement("INSERT INTO {} (key, c1, c2) VALUES ('k{}', 'v{}', 'value')".format(cf, i, j), consistency_level=ConsistencyLevel.ONE) session.execute(query) node1.flush() # delete those data, half with row tombstone, and the rest with cell range tombstones - for i in xrange(0, 5): + for i in range(0, 5): query = SimpleStatement("DELETE FROM {} WHERE key='k{}'".format(cf, i), consistency_level=ConsistencyLevel.ONE) session.execute(query) node1.flush() - for i in xrange(5, 10): - for j in xrange(0, 1000): + for i in range(5, 10): + for j in range(0, 1000): query = SimpleStatement("DELETE FROM {} WHERE key='k{}' AND c1='v{}'".format(cf, i, j), consistency_level=ConsistencyLevel.ONE) session.execute(query) node1.flush() @@ -509,17 +510,17 @@ class TestRepair(BaseRepairTest): # check no rows will be returned for cf in ['cf1', 'cf2']: - for i in xrange(0, 10): + for i in range(0, 10): query = SimpleStatement("SELECT c1, c2 FROM {} WHERE key='k{}'".format(cf, i), consistency_level=ConsistencyLevel.ALL) res = list(session.execute(query)) - self.assertEqual(len(filter(lambda x: len(x) != 0, res)), 0, res) + assert len([x for x in res if len(x) != 0]) == 0, res # check log for no repair happened for gcable data out_of_sync_logs = node2.grep_log("{} and {} have ([0-9]+) range\(s\) out of sync for cf1".format(cluster.address_regex(), cluster.address_regex())) - self.assertEqual(len(out_of_sync_logs), 0, "GC-able data does not need to be repaired with empty data: " + str([elt[0] for elt in out_of_sync_logs])) + assert len(out_of_sync_logs) == 0, "GC-able data does not need to be repaired with empty data: " + str([elt[0] for elt in out_of_sync_logs]) # check log for actual repair for non gcable data out_of_sync_logs = node2.grep_log("{} and {} have ([0-9]+) range\(s\) out of sync for cf2".format(cluster.address_regex(), cluster.address_regex())) - self.assertGreater(len(out_of_sync_logs), 0, "Non GC-able data should be repaired") + assert len(out_of_sync_logs) > 0, "Non GC-able data should be repaired" def _range_tombstone_digest(self, sequential): """ @@ -595,9 +596,9 @@ class TestRepair(BaseRepairTest): # check log for no repair happened for gcable data out_of_sync_logs = node2.grep_log("{} and {} have ([0-9]+) range\(s\) out of sync for table1".format(cluster.address_regex(), cluster.address_regex())) - self.assertEqual(len(out_of_sync_logs), 0, "Digest mismatch for range tombstone: {}".format(str([elt[0] for elt in out_of_sync_logs]))) + assert len(out_of_sync_logs) == 0, "Digest mismatch for range tombstone: {}".format(str([elt[0] for elt in out_of_sync_logs])) - def local_dc_repair_test(self): + def test_local_dc_repair(self): """ * Set up a multi DC cluster * Perform a -local repair on one DC @@ -607,25 +608,25 @@ class TestRepair(BaseRepairTest): node1 = cluster.nodes["node1"] node2 = cluster.nodes["node2"] - debug("starting repair...") + logger.debug("starting repair...") opts = ["-local"] opts += _repair_options(self.cluster.version(), ks="ks") node1.repair(opts) # Verify that only nodes in dc1 are involved in repair out_of_sync_logs = node1.grep_log("{} and {} have ([0-9]+) range\(s\) out of sync".format(cluster.address_regex(), cluster.address_regex())) - self.assertEqual(len(out_of_sync_logs), 1, "Lines matching: {}".format(len(out_of_sync_logs))) + assert len(out_of_sync_logs) == 1, "Lines matching: {}".format(len(out_of_sync_logs)) line, m = out_of_sync_logs[0] num_out_of_sync_ranges, out_of_sync_nodes = m.group(3), {m.group(1), m.group(2)} - self.assertEqual(int(num_out_of_sync_ranges), 1, "Expecting 1 range out of sync for {}, but saw {}".format(out_of_sync_nodes, line)) + assert int(num_out_of_sync_ranges) == 1, "Expecting 1 range out of sync for {}, but saw {}".format(out_of_sync_nodes, num_out_of_sync_ranges) valid_out_of_sync_pairs = {node1.address(), node2.address()} - self.assertEqual(out_of_sync_nodes, valid_out_of_sync_pairs, "Unrelated node found in local repair: {}, expected {}".format(out_of_sync_nodes, valid_out_of_sync_pairs)) + assert out_of_sync_nodes == valid_out_of_sync_pairs, "Unrelated node found in local repair: {}, expected {}".format(out_of_sync_nodes, valid_out_of_sync_pairs) # Check node2 now has the key self.check_rows_on_node(node2, 2001, found=[1000], restart=False) - def dc_repair_test(self): + def test_dc_repair(self): """ * Set up a multi DC cluster * Perform a -dc repair on two dc's @@ -636,26 +637,26 @@ class TestRepair(BaseRepairTest): node2 = cluster.nodes["node2"] node3 = cluster.nodes["node3"] - debug("starting repair...") + logger.debug("starting repair...") opts = ["-dc", "dc1", "-dc", "dc2"] opts += _repair_options(self.cluster.version(), ks="ks") node1.repair(opts) # Verify that only nodes in dc1 and dc2 are involved in repair out_of_sync_logs = node1.grep_log("{} and {} have ([0-9]+) range\(s\) out of sync".format(cluster.address_regex(), cluster.address_regex())) - self.assertEqual(len(out_of_sync_logs), 2, "Lines matching: " + str([elt[0] for elt in out_of_sync_logs])) + assert len(out_of_sync_logs) == 2, "Lines matching: " + str([elt[0] for elt in out_of_sync_logs]) valid_out_of_sync_pairs = [{node1.address(), node2.address()}, {node2.address(), node3.address()}] for line, m in out_of_sync_logs: num_out_of_sync_ranges, out_of_sync_nodes = m.group(3), {m.group(1), m.group(2)} - self.assertEqual(int(num_out_of_sync_ranges), 1, "Expecting 1 range out of sync for {}, but saw {}".format(out_of_sync_nodes, line)) - self.assertIn(out_of_sync_nodes, valid_out_of_sync_pairs, str(out_of_sync_nodes)) + assert int(num_out_of_sync_ranges) == 1, "Expecting 1 range out of sync for {}, but saw {}".format(out_of_sync_nodes , num_out_of_sync_ranges) + assert out_of_sync_nodes, valid_out_of_sync_pairs in str(out_of_sync_nodes) # Check node2 now has the key self.check_rows_on_node(node2, 2001, found=[1000], restart=False) - def dc_parallel_repair_test(self): + def test_dc_parallel_repair(self): """ * Set up a multi DC cluster * Perform a -dc repair on two dc's, with -dcpar @@ -666,30 +667,30 @@ class TestRepair(BaseRepairTest): node2 = cluster.nodes["node2"] node3 = cluster.nodes["node3"] - debug("starting repair...") + logger.debug("starting repair...") opts = ["-dc", "dc1", "-dc", "dc2", "-dcpar"] opts += _repair_options(self.cluster.version(), ks="ks", sequential=False) node1.repair(opts) # Verify that only nodes in dc1 and dc2 are involved in repair out_of_sync_logs = node1.grep_log("{} and {} have ([0-9]+) range\(s\) out of sync".format(cluster.address_regex(), cluster.address_regex())) - self.assertEqual(len(out_of_sync_logs), 2, "Lines matching: " + str([elt[0] for elt in out_of_sync_logs])) + assert len(out_of_sync_logs) == 2, "Lines matching: " + str([elt[0] for elt in out_of_sync_logs]) valid_out_of_sync_pairs = [{node1.address(), node2.address()}, {node2.address(), node3.address()}] for line, m in out_of_sync_logs: num_out_of_sync_ranges, out_of_sync_nodes = m.group(3), {m.group(1), m.group(2)} - self.assertEqual(int(num_out_of_sync_ranges), 1, "Expecting 1 range out of sync for {}, but saw {}".format(out_of_sync_nodes, line)) - self.assertIn(out_of_sync_nodes, valid_out_of_sync_pairs, str(out_of_sync_nodes)) + assert int(num_out_of_sync_ranges) == 1, "Expecting 1 range out of sync for {}, but saw {}".format(out_of_sync_nodes, num_out_of_sync_ranges) + assert out_of_sync_nodes, valid_out_of_sync_pairs in str(out_of_sync_nodes) # Check node2 now has the key self.check_rows_on_node(node2, 2001, found=[1000], restart=False) # Check the repair was a dc parallel repair if self.cluster.version() >= '2.2': - self.assertEqual(len(node1.grep_log('parallelism: dc_parallel')), 1, str(node1.grep_log('parallelism'))) + assert len(node1.grep_log('parallelism: dc_parallel')) == 1, str(node1.grep_log('parallelism')) else: - self.assertEqual(len(node1.grep_log('parallelism=PARALLEL')), 1, str(node1.grep_log('parallelism'))) + assert len(node1.grep_log('parallelism=PARALLEL')) == 1, str(node1.grep_log('parallelism')) def _setup_multi_dc(self): """ @@ -702,7 +703,7 @@ class TestRepair(BaseRepairTest): # interfer with the test (this must be after the populate) cluster.set_configuration_options(values={'hinted_handoff_enabled': False}) cluster.set_batch_commitlog(enabled=True) - debug("Starting cluster..") + logger.debug("Starting cluster..") # populate 2 nodes in dc1, and one node each in dc2 and dc3 cluster.populate([2, 1, 1]).start(wait_for_binary_proto=True) @@ -713,19 +714,19 @@ class TestRepair(BaseRepairTest): create_cf(session, 'cf', read_repair=0.0, columns={'c1': 'text', 'c2': 'text'}) # Insert 1000 keys, kill node 2, insert 1 key, restart node 2, insert 1000 more keys - debug("Inserting data...") + logger.debug("Inserting data...") insert_c1c2(session, n=1000, consistency=ConsistencyLevel.ALL) node2.flush() node2.stop(wait_other_notice=True) insert_c1c2(session, keys=(1000, ), consistency=ConsistencyLevel.THREE) node2.start(wait_for_binary_proto=True, wait_other_notice=True) node1.watch_log_for_alive(node2) - insert_c1c2(session, keys=range(1001, 2001), consistency=ConsistencyLevel.ALL) + insert_c1c2(session, keys=list(range(1001, 2001)), consistency=ConsistencyLevel.ALL) cluster.flush() # Verify that only node2 has only 2000 keys and others have 2001 keys - debug("Checking data...") + logger.debug("Checking data...") self.check_rows_on_node(node2, 2000, missings=[1000]) for node in [node1, node3, node4]: self.check_rows_on_node(node, 2001, found=[1000]) @@ -739,7 +740,7 @@ class TestRepair(BaseRepairTest): Tests that multiple parallel repairs on the same table isn't causing reference leaks. """ - self.ignore_log_patterns = [ + self.fixture_dtest_setup.ignore_log_patterns = [ "Cannot start multiple repair sessions over the same sstables", # The message we are expecting "Validation failed in", # Expecting validation to fail "RMI Runtime", # JMX Repair failures @@ -749,7 +750,7 @@ class TestRepair(BaseRepairTest): ] cluster = self.cluster - debug("Starting cluster..") + logger.debug("Starting cluster..") cluster.populate([3]).start(wait_for_binary_proto=True) node1, node2, node3 = cluster.nodelist() node1.stress(stress_options=['write', 'n=10k', 'no-warmup', 'cl=ONE', '-schema', 'replication(factor=3)', '-rate', 'threads=50']) @@ -771,10 +772,10 @@ class TestRepair(BaseRepairTest): if len(node.grep_log("Cannot start multiple repair sessions over the same sstables")) > 0: found_message = True break - self.assertTrue(found_message) + assert found_message - @no_vnodes() - def token_range_repair_test(self): + @pytest.mark.no_vnodes + def test_token_range_repair(self): """ Test repair using the -st and -et options * Launch a three node cluster @@ -786,15 +787,15 @@ class TestRepair(BaseRepairTest): cluster = self.cluster cluster.set_configuration_options(values={'hinted_handoff_enabled': False}) cluster.set_batch_commitlog(enabled=True) - debug("Starting cluster..") + logger.debug("Starting cluster..") cluster.populate(3).start(wait_for_binary_proto=True) node1, node2, node3 = cluster.nodelist() self._parameterized_range_repair(repair_opts=['-st', str(node3.initial_token), '-et', str(node1.initial_token)]) - @no_vnodes() - def token_range_repair_test_with_cf(self): + @pytest.mark.no_vnodes + def test_token_range_repair_with_cf(self): """ @jira_ticket CASSANDRA-11866 @@ -810,13 +811,13 @@ class TestRepair(BaseRepairTest): cluster = self.cluster cluster.set_configuration_options(values={'hinted_handoff_enabled': False}) cluster.set_batch_commitlog(enabled=True) - debug("Starting cluster..") + logger.debug("Starting cluster..") cluster.populate(3).start(wait_for_binary_proto=True) node1, node2, node3 = cluster.nodelist() # Insert data, kill node 2, insert more data, restart node 2, insert another set of data - debug("Inserting data...") + logger.debug("Inserting data...") node1.stress(['write', 'n=1k', 'no-warmup', 'cl=ALL', '-schema', 'replication(factor=2)', '-rate', 'threads=30']) node2.flush() node2.stop(wait_other_notice=True) @@ -829,25 +830,25 @@ class TestRepair(BaseRepairTest): opts = ['-st', str(node3.initial_token), '-et', str(node1.initial_token), ] opts += _repair_options(self.cluster.version(), ks='keyspace1', cf='counter1', sequential=False) node1.repair(opts) - self.assertEqual(len(node1.grep_log('are consistent for standard1')), 0, "Nodes 1 and 2 should not be consistent.") - self.assertEqual(len(node3.grep_log('Repair command')), 0, "Node 3 should not have been involved in the repair.") + assert len(node1.grep_log('are consistent for standard1')) == 0, "Nodes 1 and 2 should not be consistent." + assert len(node3.grep_log('Repair command')) == 0, "Node 3 should not have been involved in the repair." out_of_sync_logs = node1.grep_log("{} and {} have ([0-9]+) range\(s\) out of sync".format(cluster.address_regex(), cluster.address_regex())) - self.assertEqual(len(out_of_sync_logs), 0, "We repaired the wrong CF, so things should still be broke") + assert len(out_of_sync_logs) == 0, "We repaired the wrong CF == so things should still be broke" # Repair only the range node 1 owns on the right CF, assert everything is fixed opts = ['-st', str(node3.initial_token), '-et', str(node1.initial_token), ] opts += _repair_options(self.cluster.version(), ks='keyspace1', cf='standard1', sequential=False) node1.repair(opts) - self.assertEqual(len(node1.grep_log('are consistent for standard1')), 0, "Nodes 1 and 2 should not be consistent.") - self.assertEqual(len(node3.grep_log('Repair command')), 0, "Node 3 should not have been involved in the repair.") + assert len(node1.grep_log('are consistent for standard1')) == 0, "Nodes 1 and 2 should not be consistent." + assert len(node3.grep_log('Repair command')) == 0, "Node 3 should not have been involved in the repair." out_of_sync_logs = node1.grep_log("{} and {} have ([0-9]+) range\(s\) out of sync".format(cluster.address_regex(), cluster.address_regex())) _, matches = out_of_sync_logs[0] out_of_sync_nodes = {matches.group(1), matches.group(2)} valid_out_of_sync_pairs = [{node1.address(), node2.address()}] - self.assertIn(out_of_sync_nodes, valid_out_of_sync_pairs, str(out_of_sync_nodes)) + assert out_of_sync_nodes, valid_out_of_sync_pairs in str(out_of_sync_nodes) - @no_vnodes() - def partitioner_range_repair_test(self): + @pytest.mark.no_vnodes + def test_partitioner_range_repair(self): """ Test repair using the -pr option * Launch a three node cluster @@ -859,7 +860,7 @@ class TestRepair(BaseRepairTest): cluster = self.cluster cluster.set_configuration_options(values={'hinted_handoff_enabled': False}) cluster.set_batch_commitlog(enabled=True) - debug("Starting cluster..") + logger.debug("Starting cluster..") cluster.populate(3).start(wait_for_binary_proto=True) node1, node2, node3 = cluster.nodelist() @@ -867,8 +868,8 @@ class TestRepair(BaseRepairTest): self._parameterized_range_repair(repair_opts=['-pr']) @since('3.10') - @no_vnodes() - def pull_repair_test(self): + @pytest.mark.no_vnodes + def test_pull_repair(self): """ Test repair using the --pull option @jira_ticket CASSANDRA-9876 @@ -883,7 +884,7 @@ class TestRepair(BaseRepairTest): cluster = self.cluster cluster.set_configuration_options(values={'hinted_handoff_enabled': False}) cluster.set_batch_commitlog(enabled=True) - debug("Starting cluster..") + logger.debug("Starting cluster..") cluster.populate(3).start(wait_for_binary_proto=True) node1, node2, node3 = cluster.nodelist() @@ -894,14 +895,14 @@ class TestRepair(BaseRepairTest): self._parameterized_range_repair(repair_opts=['--pull', '--in-hosts', node1_address + ',' + node2_address, '-st', str(node3.initial_token), '-et', str(node1.initial_token)]) # Node 1 should only receive files (as we ran a pull repair on node1) - self.assertTrue(len(node1.grep_log("Receiving [1-9][0-9]* files")) > 0) - self.assertEqual(len(node1.grep_log("sending [1-9][0-9]* files")), 0) - self.assertTrue(len(node1.grep_log("sending 0 files")) > 0) + assert len(node1.grep_log("Receiving [1-9][0-9]* files")) > 0 + assert len(node1.grep_log("sending [1-9][0-9]* files")) == 0 + assert len(node1.grep_log("sending 0 files")) > 0 # Node 2 should only send files (as we ran a pull repair on node1) - self.assertEqual(len(node2.grep_log("Receiving [1-9][0-9]* files")), 0) - self.assertTrue(len(node2.grep_log("Receiving 0 files")) > 0) - self.assertTrue(len(node2.grep_log("sending [1-9][0-9]* files")) > 0) + assert len(node2.grep_log("Receiving [1-9][0-9]* files")) == 0 + assert len(node2.grep_log("Receiving 0 files")) > 0 + assert len(node2.grep_log("sending [1-9][0-9]* files")) > 0 def _parameterized_range_repair(self, repair_opts): """ @@ -916,7 +917,7 @@ class TestRepair(BaseRepairTest): node1, node2, node3 = cluster.nodelist() # Insert data, kill node 2, insert more data, restart node 2, insert another set of data - debug("Inserting data...") + logger.debug("Inserting data...") node1.stress(['write', 'n=20K', 'no-warmup', 'cl=ALL', '-schema', 'replication(factor=2)', '-rate', 'threads=30']) node2.flush() @@ -934,8 +935,8 @@ class TestRepair(BaseRepairTest): opts += _repair_options(self.cluster.version(), ks='keyspace1', cf='standard1', sequential=False) node1.repair(opts) - self.assertEqual(len(node1.grep_log('are consistent for standard1')), 0, "Nodes 1 and 2 should not be consistent.") - self.assertEqual(len(node3.grep_log('Repair command')), 0, "Node 3 should not have been involved in the repair.") + assert len(node1.grep_log('are consistent for standard1')) == 0, "Nodes 1 and 2 should not be consistent." + assert len(node3.grep_log('Repair command')) == 0, "Node 3 should not have been involved in the repair." out_of_sync_logs = node1.grep_log("{} and {} have ([0-9]+) range\(s\) out of sync".format(cluster.address_regex(), cluster.address_regex())) _, matches = out_of_sync_logs[0] @@ -943,10 +944,10 @@ class TestRepair(BaseRepairTest): valid_out_of_sync_pairs = [{node1.address(), node2.address()}] - self.assertIn(out_of_sync_nodes, valid_out_of_sync_pairs, str(out_of_sync_nodes)) + assert out_of_sync_nodes, valid_out_of_sync_pairs in str(out_of_sync_nodes) @since('2.2') - def trace_repair_test(self): + def test_trace_repair(self): """ * Launch a three node cluster * Insert some data at RF 2 @@ -957,12 +958,12 @@ class TestRepair(BaseRepairTest): cluster = self.cluster cluster.set_configuration_options(values={'hinted_handoff_enabled': False}) cluster.set_batch_commitlog(enabled=True) - debug("Starting cluster..") + logger.debug("Starting cluster..") cluster.populate(3).start(wait_for_binary_proto=True) node1, node2, node3 = cluster.nodelist() - debug("Inserting data...") + logger.debug("Inserting data...") node1.stress(['write', 'n=20K', 'no-warmup', 'cl=ALL', '-schema', 'replication(factor=2)', '-rate', 'threads=30']) node2.flush() @@ -984,12 +985,11 @@ class TestRepair(BaseRepairTest): rows = list(session.execute("SELECT activity FROM system_traces.events")) # This check assumes that the only (or at least first) thing to write to `system_traces.events.activity` is # the repair task triggered in the test. - self.assertIn('job threads: {}'.format(job_thread_count), - rows[0][0], - 'Expected {} job threads in repair options. Instead we saw {}'.format(job_thread_count, rows[0][0])) + assert 'job threads: {}'.format(job_thread_count) in rows[0][0], \ + 'Expected {} job threads in repair options. Instead we saw {}'.format(job_thread_count, rows[0][0]) @since('2.2') - def thread_count_repair_test(self): + def test_thread_count_repair(self): """ * Launch a three node cluster * Insert some data at RF 2 @@ -1001,14 +1001,14 @@ class TestRepair(BaseRepairTest): cluster = self.cluster cluster.set_configuration_options(values={'hinted_handoff_enabled': False}) cluster.set_batch_commitlog(enabled=True) - debug("Starting cluster..") + logger.debug("Starting cluster..") cluster.populate(3).start(wait_for_binary_proto=True) node1, node2, node3 = cluster.nodelist() # Valid job thread counts: 1, 2, 3, and 4 for job_thread_count in range(1, 5): - debug("Inserting data...") + logger.debug("Inserting data...") node1.stress(['write', 'n=2K', 'no-warmup', 'cl=ALL', '-schema', 'replication(factor=2)', '-rate', 'threads=30', '-pop', 'seq={}..{}K'.format(2 * (job_thread_count - 1), 2 * job_thread_count)]) @@ -1032,11 +1032,10 @@ class TestRepair(BaseRepairTest): rows = list(session.execute("SELECT activity FROM system_traces.events")) # This check assumes that the only (or at least first) thing to write to `system_traces.events.activity` is # the repair task triggered in the test. - self.assertIn('job threads: {}'.format(job_thread_count), - rows[0][0], - 'Expected {} job threads in repair options. Instead we saw {}'.format(job_thread_count, rows[0][0])) + assert 'job threads: {}'.format(job_thread_count) in rows[0][0], \ + 'Expected {} job threads in repair options. Instead we saw {}'.format(job_thread_count, rows[0][0]) - @no_vnodes() + @pytest.mark.no_vnodes def test_multiple_concurrent_repairs(self): """ @jira_ticket CASSANDRA-11451 @@ -1061,7 +1060,7 @@ class TestRepair(BaseRepairTest): node1.stop(wait_other_notice=True) node3.stop(wait_other_notice=True) _, _, rc = node2.stress(['read', 'n=1M', 'no-warmup', '-rate', 'threads=30'], whitelist=True) - self.assertEqual(rc, 0) + assert rc == 0 @since('4.0') def test_wide_row_repair(self): @@ -1075,7 +1074,7 @@ class TestRepair(BaseRepairTest): node1, node2 = cluster.nodelist() node2.stop(wait_other_notice=True) profile_path = os.path.join(os.getcwd(), 'stress_profiles/repair_wide_rows.yaml') - print("yaml = " + profile_path) + logger.info(("yaml = " + profile_path)) node1.stress(['user', 'profile=' + profile_path, 'n=50', 'ops(insert=1)', 'no-warmup', '-rate', 'threads=8', '-insert', 'visits=FIXED(100K)', 'revisit=FIXED(100K)']) node2.start(wait_for_binary_proto=True) @@ -1101,12 +1100,12 @@ class TestRepair(BaseRepairTest): node1.watch_log_for('requesting merkle trees', filename='system.log') time.sleep(2) - debug("stopping node1") + logger.debug("stopping node1") node1.stop(gently=False, wait_other_notice=True) t1.join() - debug("starting node1 - first repair should have failed") + logger.debug("starting node1 - first repair should have failed") node1.start(wait_for_binary_proto=True, wait_other_notice=True) - debug("running second repair") + logger.debug("running second repair") if cluster.version() >= "2.2": node1.repair() else: @@ -1126,7 +1125,7 @@ class TestRepair(BaseRepairTest): """ self._test_failure_during_repair(phase='sync', initiator=False,) - @since('2.2', '4') + @since('2.2', max_version='4') def test_failure_during_anticompaction(self): """ @jira_ticket CASSANDRA-12901 @@ -1144,48 +1143,49 @@ class TestRepair(BaseRepairTest): cluster = self.cluster # We are not interested in specific errors, but # that the repair session finishes on node failure without hanging - self.ignore_log_patterns = [ + self.fixture_dtest_setup.ignore_log_patterns = [ "Endpoint .* died", "Streaming error occurred", "StreamReceiveTask", "Stream failed", "Session completed with the following error", "Repair session .* for range .* failed with error", - "Sync failed between .* and .*" + "Sync failed between .* and .*", + "failed to send a stream message/file to peer" ] # Disable hinted handoff and set batch commit log so this doesn't # interfere with the test (this must be after the populate) cluster.set_configuration_options(values={'hinted_handoff_enabled': False}) cluster.set_batch_commitlog(enabled=True) - debug("Setting up cluster..") + logger.debug("Setting up cluster..") cluster.populate(3) node1, node2, node3 = cluster.nodelist() node_to_kill = node2 if (phase == 'sync' and initiator) else node3 - debug("Setting up byteman on {}".format(node_to_kill.name)) + logger.debug("Setting up byteman on {}".format(node_to_kill.name)) # set up byteman node_to_kill.byteman_port = '8100' node_to_kill.import_config_files() - debug("Starting cluster..") + logger.debug("Starting cluster..") cluster.start(wait_other_notice=True) - debug("stopping node3") + logger.debug("stopping node3") node3.stop(gently=False, wait_other_notice=True) self.patient_exclusive_cql_connection(node1) - debug("inserting data while node3 is down") + logger.debug("inserting data while node3 is down") node1.stress(stress_options=['write', 'n=1k', 'no-warmup', 'cl=ONE', '-schema', 'replication(factor=3)', '-rate', 'threads=10']) - debug("bring back node3") + logger.debug("bring back node3") node3.start(wait_other_notice=True, wait_for_binary_proto=True) script = 'stream_sleep.btm' if phase == 'sync' else 'repair_{}_sleep.btm'.format(phase) - debug("Submitting byteman script to {}".format(node_to_kill.name)) + logger.debug("Submitting byteman script to {}".format(node_to_kill.name)) # Sleep on anticompaction/stream so there will be time for node to be killed node_to_kill.byteman_submit(['./byteman/{}'.format(script)]) @@ -1193,15 +1193,15 @@ class TestRepair(BaseRepairTest): global nodetool_error try: node1.nodetool('repair keyspace1 standard1') - except Exception, e: + except Exception as e: nodetool_error = e - debug("repair node1") + logger.debug("repair node1") # Launch in a external thread so it does not hang process t = Thread(target=node1_repair) t.start() - debug("Will kill {} in middle of {}".format(node_to_kill.name, phase)) + logger.debug("Will kill {} in middle of {}".format(node_to_kill.name, phase)) msg_to_wait = 'streaming plan for Repair' if phase == 'anticompaction': msg_to_wait = 'Got anticompaction request' @@ -1210,10 +1210,10 @@ class TestRepair(BaseRepairTest): node_to_kill.watch_log_for(msg_to_wait, filename='debug.log') node_to_kill.stop(gently=False, wait_other_notice=True) - debug("Killed {}, now waiting repair to finish".format(node_to_kill.name)) + logger.debug("Killed {}, now waiting repair to finish".format(node_to_kill.name)) t.join(timeout=60) - self.assertFalse(t.isAlive(), 'Repair still running after sync {} was killed' - .format("initiator" if initiator else "participant")) + assert not t.is_alive(), 'Repair still running after sync {} was killed'\ + .format("initiator" if initiator else "participant") if cluster.version() < '4.0' or phase != 'sync': # the log entry we're watching for in the sync task came from the @@ -1227,7 +1227,7 @@ RepairTableContents = namedtuple('RepairTableContents', @since('2.2') -@attr("resource-intensive") +@pytest.mark.resource_intensive class TestRepairDataSystemTable(Tester): """ @jira_ticket CASSANDRA-5839 @@ -1237,21 +1237,19 @@ class TestRepairDataSystemTable(Tester): to a cluster, then ensuring these tables are in valid states before and after running repair. """ - - def setUp(self): + @pytest.fixture(scope='function', autouse=True) + def fixture_set_cluster_settings(self, fixture_dtest_setup): """ Prepares a cluster for tests of the repair history tables by starting a 5-node cluster, then inserting 5000 values with RF=3. """ - - Tester.setUp(self) - self.cluster.populate(5).start(wait_for_binary_proto=True) + fixture_dtest_setup.cluster.populate(5).start(wait_for_binary_proto=True) self.node1 = self.cluster.nodelist()[0] - self.session = self.patient_cql_connection(self.node1) + self.session = fixture_dtest_setup.patient_cql_connection(self.node1) self.node1.stress(stress_options=['write', 'n=5K', 'no-warmup', 'cl=ONE', '-schema', 'replication(factor=3)']) - self.cluster.flush() + fixture_dtest_setup.cluster.flush() def repair_table_contents(self, node, include_system_keyspaces=True): """ @@ -1281,15 +1279,15 @@ class TestRepairDataSystemTable(Tester): return RepairTableContents(parent_repair_history=parent_repair_history, repair_history=repair_history) - @skip('hangs CI') - def initial_empty_repair_tables_test(self): - debug('repair tables:') - debug(self.repair_table_contents(node=self.node1, include_system_keyspaces=False)) + @pytest.mark.skip(reason='hangs CI') + def test_initial_empty_repair_tables(self): + logger.debug('repair tables:') + logger.debug(self.repair_table_contents(node=self.node1, include_system_keyspaces=False)) repair_tables_dict = self.repair_table_contents(node=self.node1, include_system_keyspaces=False)._asdict() - for table_name, table_contents in repair_tables_dict.items(): - self.assertFalse(table_contents, '{} is non-empty'.format(table_name)) + for table_name, table_contents in list(repair_tables_dict.items()): + assert not table_contents, '{} is non-empty'.format(table_name) - def repair_parent_table_test(self): + def test_repair_parent_table(self): """ Test that `system_distributed.parent_repair_history` is properly populated after repair by: @@ -1299,9 +1297,9 @@ class TestRepairDataSystemTable(Tester): """ self.node1.repair() parent_repair_history, _ = self.repair_table_contents(node=self.node1, include_system_keyspaces=False) - self.assertTrue(len(parent_repair_history)) + assert len(parent_repair_history) - def repair_table_test(self): + def test_repair_table(self): """ Test that `system_distributed.repair_history` is properly populated after repair by: @@ -1311,4 +1309,4 @@ class TestRepairDataSystemTable(Tester): """ self.node1.repair() _, repair_history = self.repair_table_contents(node=self.node1, include_system_keyspaces=False) - self.assertTrue(len(repair_history)) + assert len(repair_history)
http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/replace_address_test.py ---------------------------------------------------------------------- diff --git a/replace_address_test.py b/replace_address_test.py index 2b60077..31c1394 100644 --- a/replace_address_test.py +++ b/replace_address_test.py @@ -1,18 +1,24 @@ import os import tempfile +import pytest +import logging +import time + +from flaky import flaky + from itertools import chain from shutil import rmtree -from unittest import skipIf from cassandra import ConsistencyLevel, ReadTimeout, Unavailable from cassandra.query import SimpleStatement from ccmlib.node import Node -from nose.plugins.attrib import attr -from dtest import CASSANDRA_VERSION_FROM_BUILD, DISABLE_VNODES, Tester, debug +from dtest import CASSANDRA_VERSION_FROM_BUILD, Tester from tools.assertions import assert_bootstrap_state, assert_all, assert_not_running from tools.data import rows_to_list -from tools.decorators import since + +since = pytest.mark.since +logger = logging.getLogger(__name__) class NodeUnavailable(Exception): @@ -20,24 +26,26 @@ class NodeUnavailable(Exception): class BaseReplaceAddressTest(Tester): - __test__ = False - replacement_node = None - ignore_log_patterns = ( - # This one occurs when trying to send the migration to a - # node that hasn't started yet, and when it does, it gets - # replayed and everything is fine. - r'Can\'t send migration request: node.*is down', - r'Migration task failed to complete', # 10978 - # ignore streaming error during bootstrap - r'Streaming error occurred', - r'failed stream session' - ) + + @pytest.fixture(autouse=True) + def fixture_add_additional_log_patterns(self, fixture_dtest_setup): + fixture_dtest_setup.ignore_log_patterns = ( + # This one occurs when trying to send the migration to a + # node that hasn't started yet, and when it does, it gets + # replayed and everything is fine. + r'Can\'t send migration request: node.*is down', + r'Migration task failed to complete', # 10978 + # ignore streaming error during bootstrap + r'Streaming error occurred', + r'failed stream session', + r'Failed to properly handshake with peer' + ) def _setup(self, n=3, opts=None, enable_byteman=False, mixed_versions=False): - debug("Starting cluster with {} nodes.".format(n)) - self.cluster.populate(n, use_vnodes=not DISABLE_VNODES) + logger.debug("Starting cluster with {} nodes.".format(n)) + self.cluster.populate(n) if opts is not None: - debug("Setting cluster options: {}".format(opts)) + logger.debug("Setting cluster options: {}".format(opts)) self.cluster.set_configuration_options(opts) self.cluster.set_batch_commitlog(enabled=True) @@ -46,7 +54,7 @@ class BaseReplaceAddressTest(Tester): self.cluster.seeds.remove(self.replaced_node) NUM_TOKENS = os.environ.get('NUM_TOKENS', '256') - if DISABLE_VNODES: + if not self.dtest_config.use_vnodes: self.cluster.set_configuration_options(values={'initial_token': None, 'num_tokens': 1}) else: self.cluster.set_configuration_options(values={'initial_token': None, 'num_tokens': NUM_TOKENS}) @@ -57,7 +65,7 @@ class BaseReplaceAddressTest(Tester): self.query_node.import_config_files() if mixed_versions: - debug("Starting nodes on version 2.2.4") + logger.debug("Starting nodes on version 2.2.4") self.cluster.set_install_dir(version="2.2.4") self.cluster.start() @@ -83,12 +91,12 @@ class BaseReplaceAddressTest(Tester): replacement_address = self.replaced_node.address() self.cluster.remove(self.replaced_node) - debug("Starting replacement node {} with jvm_option '{}={}'".format(replacement_address, jvm_option, replace_address)) + logger.debug("Starting replacement node {} with jvm_option '{}={}'".format(replacement_address, jvm_option, replace_address)) self.replacement_node = Node('replacement', cluster=self.cluster, auto_bootstrap=True, thrift_interface=None, storage_interface=(replacement_address, 7000), jmx_port='7400', remote_debug_port='0', initial_token=None, binary_interface=(replacement_address, 9042)) if opts is not None: - debug("Setting options on replacement node: {}".format(opts)) + logger.debug("Setting options on replacement node: {}".format(opts)) self.replacement_node.set_configuration_options(opts) self.cluster.add(self.replacement_node, False, data_center=data_center) @@ -107,39 +115,41 @@ class BaseReplaceAddressTest(Tester): def _stop_node_to_replace(self, gently=False, table='keyspace1.standard1', cl=ConsistencyLevel.THREE): if self.replaced_node.is_running(): - debug("Stopping {}".format(self.replaced_node.name)) + logger.debug("Stopping {}".format(self.replaced_node.name)) self.replaced_node.stop(gently=gently, wait_other_notice=True) - debug("Testing node stoppage (query should fail).") - with self.assertRaises((Unavailable, ReadTimeout)): + logger.debug("Testing node stoppage (query should fail).") + with pytest.raises((Unavailable, ReadTimeout)): session = self.patient_cql_connection(self.query_node) query = SimpleStatement('select * from {}'.format(table), consistency_level=cl) session.execute(query) def _insert_data(self, n='1k', rf=3, whitelist=False): - debug("Inserting {} entries with rf={} with stress...".format(n, rf)) - self.query_node.stress(['write', 'n={}'.format(n), 'no-warmup', '-schema', 'replication(factor={})'.format(rf)], + logger.debug("Inserting {} entries with rf={} with stress...".format(n, rf)) + self.query_node.stress(['write', 'n={}'.format(n), 'no-warmup', '-schema', 'replication(factor={})'.format(rf), + '-rate', 'threads=10'], whitelist=whitelist) self.cluster.flush() + time.sleep(20) def _fetch_initial_data(self, table='keyspace1.standard1', cl=ConsistencyLevel.THREE, limit=10000): - debug("Fetching initial data from {} on {} with CL={} and LIMIT={}".format(table, self.query_node.name, cl, limit)) + logger.debug("Fetching initial data from {} on {} with CL={} and LIMIT={}".format(table, self.query_node.name, cl, limit)) session = self.patient_cql_connection(self.query_node) query = SimpleStatement('select * from {} LIMIT {}'.format(table, limit), consistency_level=cl) - return rows_to_list(session.execute(query)) + return rows_to_list(session.execute(query, timeout=20)) def _verify_data(self, initial_data, table='keyspace1.standard1', cl=ConsistencyLevel.ONE, limit=10000, restart_nodes=False): - self.assertGreater(len(initial_data), 0, "Initial data must be greater than 0") + assert len(initial_data) > 0, "Initial data must be greater than 0" # query should work again - debug("Stopping old nodes") + logger.debug("Stopping old nodes") for node in self.cluster.nodelist(): if node.is_running() and node != self.replacement_node: - debug("Stopping {}".format(node.name)) + logger.debug("Stopping {}".format(node.name)) node.stop(gently=False, wait_other_notice=True) - debug("Verifying {} on {} with CL={} and LIMIT={}".format(table, self.replacement_node.address(), cl, limit)) + logger.debug("Verifying {} on {} with CL={} and LIMIT={}".format(table, self.replacement_node.address(), cl, limit)) session = self.patient_exclusive_cql_connection(self.replacement_node) assert_all(session, 'select * from {} LIMIT {}'.format(table, limit), expected=initial_data, @@ -166,22 +176,22 @@ class BaseReplaceAddressTest(Tester): timeout=60) def _verify_tokens_migrated_successfully(self, previous_log_size=None): - if DISABLE_VNODES: + if not self.dtest_config.use_vnodes: num_tokens = 1 else: # a little hacky but grep_log returns the whole line... num_tokens = int(self.replacement_node.get_conf_option('num_tokens')) - debug("Verifying {} tokens migrated sucessfully".format(num_tokens)) + logger.debug("Verifying {} tokens migrated sucessfully".format(num_tokens)) logs = self.replacement_node.grep_log(r"Token (.*?) changing ownership from /{} to /{}" .format(self.replaced_node.address(), self.replacement_node.address())) if (previous_log_size is not None): - self.assertEquals(len(logs), previous_log_size) + assert len(logs) == previous_log_size moved_tokens = set([l[1].group(1) for l in logs]) - debug("number of moved tokens: {}".format(len(moved_tokens))) - self.assertEquals(len(moved_tokens), num_tokens) + logger.debug("number of moved tokens: {}".format(len(moved_tokens))) + assert len(moved_tokens) == num_tokens return len(logs) @@ -197,11 +207,11 @@ class BaseReplaceAddressTest(Tester): self._stop_node_to_replace() if mixed_versions: - debug("Upgrading all except {} to current version".format(self.query_node.address())) + logger.debug("Upgrading all except {} to current version".format(self.query_node.address())) self.cluster.set_install_dir(install_dir=default_install_dir) for node in self.cluster.nodelist(): if node.is_running() and node != self.query_node: - debug("Upgrading {} to current version".format(node.address())) + logger.debug("Upgrading {} to current version".format(node.address())) node.stop(gently=True, wait_other_notice=True) node.start(wait_other_notice=True, wait_for_binary_proto=True) @@ -216,7 +226,7 @@ class BaseReplaceAddressTest(Tester): if not same_address and not mixed_versions: initial_data = self._fetch_initial_data(cl=ConsistencyLevel.TWO) - debug("Joining replaced node") + logger.debug("Joining replaced node") self.replacement_node.nodetool("join") if not same_address: @@ -229,33 +239,32 @@ class BaseReplaceAddressTest(Tester): class TestReplaceAddress(BaseReplaceAddressTest): - __test__ = True - @attr('resource-intensive') - def replace_stopped_node_test(self): + @pytest.mark.resource_intensive + def test_replace_stopped_node(self): """ Test that we can replace a node that is not shutdown gracefully. """ self._test_replace_node(gently=False) - @attr('resource-intensive') - def replace_shutdown_node_test(self): + @pytest.mark.resource_intensive + def test_replace_shutdown_node(self): """ @jira_ticket CASSANDRA-9871 Test that we can replace a node that is shutdown gracefully. """ self._test_replace_node(gently=True) - @attr('resource-intensive') - def replace_stopped_node_same_address_test(self): + @pytest.mark.resource_intensive + def test_replace_stopped_node_same_address(self): """ @jira_ticket CASSANDRA-8523 Test that we can replace a node with the same address correctly """ self._test_replace_node(gently=False, same_address=True) - @attr('resource-intensive') - def replace_first_boot_test(self): + @pytest.mark.resource_intensive + def test_replace_first_boot(self): self._test_replace_node(jvm_option='replace_address_first_boot') def _test_replace_node(self, gently=False, jvm_option='replace_address', same_address=False): @@ -288,32 +297,35 @@ class TestReplaceAddress(BaseReplaceAddressTest): self._verify_data(initial_data) - @attr('resource-intensive') - def replace_active_node_test(self): - self.ignore_log_patterns = list(self.ignore_log_patterns) + [r'Exception encountered during startup'] + @pytest.mark.resource_intensive + def test_replace_active_node(self): + self.fixture_dtest_setup.ignore_log_patterns = list(self.fixture_dtest_setup.ignore_log_patterns) + [ + r'Exception encountered during startup'] + self._setup(n=3) self._do_replace(wait_for_binary_proto=False) - debug("Waiting for replace to fail") + logger.debug("Waiting for replace to fail") self.replacement_node.watch_log_for("java.lang.UnsupportedOperationException: Cannot replace a live node...") assert_not_running(self.replacement_node) - @attr('resource-intensive') - def replace_nonexistent_node_test(self): - self.ignore_log_patterns = list(self.ignore_log_patterns) + [ + @pytest.mark.resource_intensive + def test_replace_nonexistent_node(self): + self.fixture_dtest_setup.ignore_log_patterns = list(self.fixture_dtest_setup.ignore_log_patterns) + [ # This is caused by starting a node improperly (replacing active/nonexistent) r'Exception encountered during startup', # This is caused by trying to replace a nonexistent node r'Exception in thread Thread'] + self._setup(n=3) self._do_replace(replace_address='127.0.0.5', wait_for_binary_proto=False) - debug("Waiting for replace to fail") + logger.debug("Waiting for replace to fail") self.replacement_node.watch_log_for("java.lang.RuntimeException: Cannot replace_address /127.0.0.5 because it doesn't exist in gossip") assert_not_running(self.replacement_node) @since('3.6') - def fail_without_replace_test(self): + def test_fail_without_replace(self): """ When starting a node from a clean slate with the same address as an existing down node, the node should error out even when @@ -321,14 +333,16 @@ class TestReplaceAddress(BaseReplaceAddressTest): to use replace_address. @jira_ticket CASSANDRA-10134 """ - self.ignore_log_patterns = list(self.ignore_log_patterns) + [r'Exception encountered during startup'] + self.fixture_dtest_setup.ignore_log_patterns = list(self.fixture_dtest_setup.ignore_log_patterns) + [ + r'Exception encountered during startup'] + self._setup(n=3) self._insert_data() node1, node2, node3 = self.cluster.nodelist() mark = None for auto_bootstrap in (True, False): - debug("Stopping node 3.") + logger.debug("Stopping node 3.") node3.stop(gently=False) # completely delete the data, commitlog, and saved caches @@ -339,13 +353,13 @@ class TestReplaceAddress(BaseReplaceAddressTest): rmtree(d) node3.set_configuration_options(values={'auto_bootstrap': auto_bootstrap}) - debug("Starting node 3 with auto_bootstrap = {val}".format(val=auto_bootstrap)) + logger.debug("Starting node 3 with auto_bootstrap = {val}".format(val=auto_bootstrap)) node3.start(wait_other_notice=False) node3.watch_log_for('Use cassandra.replace_address if you want to replace this node', from_mark=mark, timeout=20) mark = node3.mark_log() @since('3.6') - def unsafe_replace_test(self): + def test_unsafe_replace(self): """ To handle situations such as failed disk in a JBOD, it may be desirable to replace a node without bootstrapping. In such scenarios best practice @@ -359,14 +373,16 @@ class TestReplaceAddress(BaseReplaceAddressTest): @jira_ticket CASSANDRA-10134 """ - self.ignore_log_patterns = list(self.ignore_log_patterns) + [r'Exception encountered during startup'] + self.fixture_dtest_setup.ignore_log_patterns = list(self.fixture_dtest_setup.ignore_log_patterns) + [ + r'Exception encountered during startup'] + self._setup(n=3) self._insert_data() initial_data = self._fetch_initial_data() self.replacement_node = self.replaced_node for set_allow_unsafe_flag in [False, True]: - debug("Stopping {}".format(self.replaced_node.name)) + logger.debug("Stopping {}".format(self.replaced_node.name)) self.replaced_node.stop(gently=False) # completely delete the system keyspace data plus commitlog and saved caches @@ -384,27 +400,27 @@ class TestReplaceAddress(BaseReplaceAddressTest): mark = self.replacement_node.mark_log() if set_allow_unsafe_flag: - debug('Starting replacement node with auto_bootstrap = false and replace_address = {} and allow_unsafe_replace = true'.format(self.replaced_node.address())) + logger.debug('Starting replacement node with auto_bootstrap = false and replace_address = {} and allow_unsafe_replace = true'.format(self.replaced_node.address())) self._do_replace(extra_jvm_args=['-Dcassandra.allow_unsafe_replace=true']) self._verify_data(initial_data) else: - debug('Starting replacement node with auto_bootstrap = false and replace_address = {}'.format(self.replaced_node.address())) + logger.debug('Starting replacement node with auto_bootstrap = false and replace_address = {}'.format(self.replaced_node.address())) self._do_replace(wait_for_binary_proto=False) self.replacement_node.watch_log_for('To perform this operation, please restart with -Dcassandra.allow_unsafe_replace=true', from_mark=mark, timeout=20) - @skipIf(CASSANDRA_VERSION_FROM_BUILD == '3.9', "Test doesn't run on 3.9") + @pytest.mark.skipif(CASSANDRA_VERSION_FROM_BUILD == '3.9', reason="Test doesn't run on 3.9") @since('2.2') - def insert_data_during_replace_same_address_test(self): + def test_insert_data_during_replace_same_address(self): """ Test that replacement node with same address DOES NOT receive writes during replacement @jira_ticket CASSANDRA-8523 """ self._test_insert_data_during_replace(same_address=True) - @skipIf(CASSANDRA_VERSION_FROM_BUILD == '3.9', "Test doesn't run on 3.9") + @pytest.mark.skipif(CASSANDRA_VERSION_FROM_BUILD == '3.9', reason="Test doesn't run on 3.9") @since('2.2') - def insert_data_during_replace_different_address_test(self): + def test_insert_data_during_replace_different_address(self): """ Test that replacement node with different address DOES receive writes during replacement @jira_ticket CASSANDRA-8523 @@ -412,8 +428,8 @@ class TestReplaceAddress(BaseReplaceAddressTest): self._test_insert_data_during_replace(same_address=False) @since('2.2') - @attr('resource-intensive') - def resume_failed_replace_test(self): + @pytest.mark.resource_intensive + def test_resume_failed_replace(self): """ Test resumable bootstrap while replacing node. Feature introduced in 2.2 with ticket https://issues.apache.org/jira/browse/CASSANDRA-8838 @@ -423,21 +439,23 @@ class TestReplaceAddress(BaseReplaceAddressTest): self._test_restart_failed_replace(mode='resume') @since('2.2') - @attr('resource-intensive') - def restart_failed_replace_with_reset_resume_state_test(self): + @pytest.mark.resource_intensive + def test_restart_failed_replace_with_reset_resume_state(self): """Test replace with resetting bootstrap progress""" self._test_restart_failed_replace(mode='reset_resume_state') @since('2.2') - @attr('resource-intensive') - def restart_failed_replace_test(self): + @pytest.mark.resource_intensive + def test_restart_failed_replace(self): """ Test that if a node fails to replace, it can join the cluster even if the data is wiped. """ self._test_restart_failed_replace(mode='wipe') def _test_restart_failed_replace(self, mode): - self.ignore_log_patterns = list(self.ignore_log_patterns) + [r'Error while waiting on bootstrap to complete'] + self.fixture_dtest_setup.ignore_log_patterns = list(self.fixture_dtest_setup.ignore_log_patterns) + [ + r'Error while waiting on bootstrap to complete'] + self._setup(n=3, enable_byteman=True) self._insert_data(n="1k") @@ -445,7 +463,7 @@ class TestReplaceAddress(BaseReplaceAddressTest): self._stop_node_to_replace() - debug("Submitting byteman script to make stream fail") + logger.debug("Submitting byteman script to make stream fail") if self.cluster.version() < '4.0': self.query_node.byteman_submit(['./byteman/pre4.0/stream_failure.btm']) @@ -460,7 +478,7 @@ class TestReplaceAddress(BaseReplaceAddressTest): if mode == 'reset_resume_state': mark = self.replacement_node.mark_log() - debug("Restarting replacement node with -Dcassandra.reset_bootstrap_progress=true") + logger.debug("Restarting replacement node with -Dcassandra.reset_bootstrap_progress=true") # restart replacement node with resetting bootstrap state self.replacement_node.stop() self.replacement_node.start(jvm_args=[ @@ -471,7 +489,7 @@ class TestReplaceAddress(BaseReplaceAddressTest): # check if we reset bootstrap state self.replacement_node.watch_log_for("Resetting bootstrap progress to start fresh", from_mark=mark) elif mode == 'resume': - debug("Resuming failed bootstrap") + logger.debug("Resuming failed bootstrap") self.replacement_node.nodetool('bootstrap resume') # check if we skipped already retrieved ranges self.replacement_node.watch_log_for("already available. Skipping streaming.") @@ -479,37 +497,39 @@ class TestReplaceAddress(BaseReplaceAddressTest): elif mode == 'wipe': self.replacement_node.stop() - debug("Waiting other nodes to detect node stopped") - self.query_node.watch_log_for("FatClient /{} has been silent for 30000ms, removing from gossip".format(self.replacement_node.address()), timeout=60) - self.query_node.watch_log_for("Node /{} failed during replace.".format(self.replacement_node.address()), timeout=60, filename='debug.log') + logger.debug("Waiting other nodes to detect node stopped") + self.query_node.watch_log_for("FatClient /{} has been silent for 30000ms, removing from gossip".format(self.replacement_node.address()), timeout=120) + self.query_node.watch_log_for("Node /{} failed during replace.".format(self.replacement_node.address()), timeout=120, filename='debug.log') - debug("Restarting node after wiping data") + logger.debug("Restarting node after wiping data") self._cleanup(self.replacement_node) self.replacement_node.start(jvm_args=["-Dcassandra.replace_address_first_boot={}".format(self.replaced_node.address())], wait_for_binary_proto=True) else: - raise RuntimeError('invalid mode value {mode}'.format(mode)) + raise RuntimeError('invalid mode value {mode}'.format(mode=mode)) # check if bootstrap succeeded assert_bootstrap_state(self, self.replacement_node, 'COMPLETED') - debug("Bootstrap finished successully, verifying data.") + logger.debug("Bootstrap finished successully, verifying data.") self._verify_data(initial_data) - def replace_with_insufficient_replicas_test(self): + def test_replace_with_insufficient_replicas(self): """ Test that replace fails when there are insufficient replicas @jira_ticket CASSANDRA-11848 """ - self.ignore_log_patterns = list(self.ignore_log_patterns) + [r'Unable to find sufficient sources for streaming range'] + self.fixture_dtest_setup.ignore_log_patterns = list(self.fixture_dtest_setup.ignore_log_patterns) + [ + r'Unable to find sufficient sources for streaming range'] + self._setup(n=3) self._insert_data(rf=2) self._stop_node_to_replace() # stop other replica - debug("Stopping other replica") + logger.debug("Stopping other replica") self.query_node.stop(wait_other_notice=True) self._do_replace(wait_for_binary_proto=False, wait_other_notice=False) @@ -518,7 +538,9 @@ class TestReplaceAddress(BaseReplaceAddressTest): self.replacement_node.watch_log_for("Unable to find sufficient sources for streaming range") assert_not_running(self.replacement_node) - def multi_dc_replace_with_rf1_test(self): + @flaky + @pytest.mark.vnodes + def test_multi_dc_replace_with_rf1(self): """ Test that multi-dc replace works when rf=1 on each dc """ @@ -550,7 +572,11 @@ class TestReplaceAddress(BaseReplaceAddressTest): stress_config.write(yaml_config) stress_config.flush() self.query_node.stress(['user', 'profile=' + stress_config.name, 'n=10k', 'no-warmup', - 'ops(insert=1)', '-rate', 'threads=50']) + 'ops(insert=1)', '-rate', 'threads=5']) + # need to sleep for a bit to try and let things catch up as we frequently do a lot of + # GC after the stress invocation above causing the next step of the test to timeout. + # and then flush to make sure we really are fully caught up + time.sleep(30) # Save initial data table_name = 'keyspace1.users' @@ -563,13 +589,13 @@ class TestReplaceAddress(BaseReplaceAddressTest): assert_bootstrap_state(self, self.replacement_node, 'COMPLETED') # Check that keyspace was replicated from dc1 to dc2 - self.assertFalse(self.replacement_node.grep_log("Unable to find sufficient sources for streaming range")) + assert not self.replacement_node.grep_log("Unable to find sufficient sources for streaming range") self._verify_data(initial_data, table=table_name, cl=ConsistencyLevel.LOCAL_ONE) def _cleanup(self, node): commitlog_dir = os.path.join(node.get_path(), 'commitlogs') for data_dir in node.data_directories(): - debug("Deleting {}".format(data_dir)) + logger.debug("Deleting {}".format(data_dir)) rmtree(data_dir) rmtree(commitlog_dir) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org