http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/read_repair_test.py ---------------------------------------------------------------------- diff --git a/read_repair_test.py b/read_repair_test.py index 57fbf40..7e8d405 100644 --- a/read_repair_test.py +++ b/read_repair_test.py @@ -1,23 +1,28 @@ import time +import pytest +import logging from cassandra import ConsistencyLevel from cassandra.query import SimpleStatement +from dtest import Tester, create_ks from tools.assertions import assert_one -from dtest import PRINT_DEBUG, Tester, debug, create_ks from tools.data import rows_to_list -from tools.decorators import since +from tools.misc import retry_till_success + +since = pytest.mark.since +logger = logging.getLogger(__name__) class TestReadRepair(Tester): - def setUp(self): - Tester.setUp(self) - self.cluster.set_configuration_options(values={'hinted_handoff_enabled': False}) - self.cluster.populate(3).start(wait_for_binary_proto=True) + @pytest.fixture(scope='function', autouse=True) + def fixture_set_cluster_settings(self, fixture_dtest_setup): + fixture_dtest_setup.cluster.set_configuration_options(values={'hinted_handoff_enabled': False}) + fixture_dtest_setup.cluster.populate(3).start(wait_for_binary_proto=True) @since('3.0') - def alter_rf_and_run_read_repair_test(self): + def test_alter_rf_and_run_read_repair(self): """ @jira_ticket CASSANDRA-10655 @jira_ticket CASSANDRA-10657 @@ -25,90 +30,127 @@ class TestReadRepair(Tester): Test that querying only a subset of all the columns in a row doesn't confuse read-repair to avoid the problem described in CASSANDRA-10655. """ - self._test_read_repair() + + # session is only used to setup & do schema modification. Actual data queries are done directly on + # each node, using an exclusive connection and CL.ONE + session = self.patient_cql_connection(self.cluster.nodelist()[0]) + initial_replica, non_replicas = self.do_initial_setup(session) + + # Execute a query at CL.ALL on one of the nodes which was *not* the initial replica. It should trigger a + # read repair and propagate the data to all 3 nodes. + # Note: result of the read repair contains only the selected column (a), not all columns + logger.debug("Executing 'SELECT a...' on non-initial replica to trigger read repair " + non_replicas[0].name) + read_repair_session = self.patient_exclusive_cql_connection(non_replicas[0]) + assert_one(read_repair_session, "SELECT a FROM alter_rf_test.t1 WHERE k=1", [1], cl=ConsistencyLevel.ALL) + + # The read repair should have repaired the replicas, at least partially (see CASSANDRA-10655) + # verify by querying each replica in turn. + self.check_data_on_each_replica(expect_fully_repaired=False, initial_replica=initial_replica) + + # Now query again at CL.ALL but this time selecting all columns, which should ensure that 'b' also gets repaired + query = "SELECT * FROM alter_rf_test.t1 WHERE k=1" + logger.debug("Executing 'SELECT *...' on non-initial replica to trigger read repair " + non_replicas[0].name) + assert_one(read_repair_session, query, [1, 1, 1], cl=ConsistencyLevel.ALL) + + # Check each replica individually again now that we expect the data to be fully repaired + self.check_data_on_each_replica(expect_fully_repaired=True, initial_replica=initial_replica) def test_read_repair_chance(self): """ @jira_ticket CASSANDRA-12368 """ - self._test_read_repair(cl_all=False) - - def _test_read_repair(self, cl_all=True): + # session is only used to setup & do schema modification. Actual data queries are done directly on + # each node, using an exclusive connection and CL.ONE session = self.patient_cql_connection(self.cluster.nodelist()[0]) + initial_replica, non_replicas = self.do_initial_setup(session) + + # To ensure read repairs are triggered, set the table property to 100% + logger.debug("Setting table read repair chance to 1") + session.execute("""ALTER TABLE alter_rf_test.t1 WITH read_repair_chance = 1;""") + + # Execute a query at CL.ONE on one of the nodes which was *not* the initial replica. It should trigger a + # read repair because read_repair_chance == 1, and propagate the data to all 3 nodes. + # Note: result of the read repair contains only the selected column (a), not all columns, so we won't expect + # 'b' to have been fully repaired afterwards. + logger.debug("Executing 'SELECT a...' on non-initial replica to trigger read repair " + non_replicas[0].name) + read_repair_session = self.patient_exclusive_cql_connection(non_replicas[0]) + read_repair_session.execute(SimpleStatement("SELECT a FROM alter_rf_test.t1 WHERE k=1", + consistency_level=ConsistencyLevel.ONE)) + + # Query each replica individually to ensure that read repair was triggered. We should expect that only + # the initial replica has data for both the 'a' and 'b' columns. The read repair should only have affected + # the selected column, so the other two replicas should only have that data. + # Note: we need to temporarily set read_repair_chance to 0 while we perform this check. + logger.debug("Setting table read repair chance to 0 while we verify each replica's data") + session.execute("""ALTER TABLE alter_rf_test.t1 WITH read_repair_chance = 0;""") + # The read repair is run in the background, so we spin while checking that the repair has completed + retry_till_success(self.check_data_on_each_replica, + expect_fully_repaired=False, + initial_replica=initial_replica, + timeout=30, + bypassed_exception=NotRepairedException) + + # Re-enable global read repair and perform another query on a non-replica. This time the query selects all + # columns so we also expect the value for 'b' to be repaired. + logger.debug("Setting table read repair chance to 1") + session.execute("""ALTER TABLE alter_rf_test.t1 WITH read_repair_chance = 1;""") + logger.debug("Executing 'SELECT *...' on non-initial replica to trigger read repair " + non_replicas[0].name) + read_repair_session = self.patient_exclusive_cql_connection(non_replicas[0]) + read_repair_session.execute(SimpleStatement("SELECT * FROM alter_rf_test.t1 WHERE k=1", + consistency_level=ConsistencyLevel.ONE)) + + # Query each replica again to ensure that second read repair was triggered. This time, we expect the + # data to be fully repaired (both 'a' and 'b' columns) by virtue of the query being 'SELECT *...' + # As before, we turn off read repair before doing this check. + logger.debug("Setting table read repair chance to 0 while we verify each replica's data") + session.execute("""ALTER TABLE alter_rf_test.t1 WITH read_repair_chance = 0;""") + retry_till_success(self.check_data_on_each_replica, + expect_fully_repaired=True, + initial_replica=initial_replica, + timeout=30, + bypassed_exception=NotRepairedException) + + def do_initial_setup(self, session): + """ + Create a keyspace with rf=1 and a table containing a single row with 2 non-primary key columns. + Insert 1 row, placing the data on a single initial replica. Then, alter the keyspace to rf=3, but don't + repair. Tests will execute various reads on the replicas and assert the effects of read repair. + :param session: Used to perform the schema setup & insert the data + :return: a tuple containing the node which initially acts as the replica, and a list of the other two nodes + """ + # Disable speculative retry to make it clear that we only query additional nodes because of read_repair_chance session.execute("""CREATE KEYSPACE alter_rf_test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};""") - session.execute("CREATE TABLE alter_rf_test.t1 (k int PRIMARY KEY, a int, b int);") + session.execute("CREATE TABLE alter_rf_test.t1 (k int PRIMARY KEY, a int, b int) WITH speculative_retry='NONE';") session.execute("INSERT INTO alter_rf_test.t1 (k, a, b) VALUES (1, 1, 1);") - cl_one_stmt = SimpleStatement("SELECT * FROM alter_rf_test.t1 WHERE k=1", - consistency_level=ConsistencyLevel.ONE) # identify the initial replica and trigger a flush to ensure reads come from sstables - initial_replica, non_replicas = self.identify_initial_placement('alter_rf_test', 't1', 1) - debug("At RF=1 replica for data is " + initial_replica.name) + initial_replica, non_replicas = self.identify_initial_placement() + logger.debug("At RF=1 replica for data is " + initial_replica.name) initial_replica.flush() + # Just some basic validation. # At RF=1, it shouldn't matter which node we query, as the actual data should always come from the # initial replica when reading at CL ONE for n in self.cluster.nodelist(): - debug("Checking " + n.name) + logger.debug("Checking " + n.name) session = self.patient_exclusive_cql_connection(n) assert_one(session, "SELECT * FROM alter_rf_test.t1 WHERE k=1", [1, 1, 1], cl=ConsistencyLevel.ONE) - # Alter so RF=n but don't repair, then execute a query which selects only a subset of the columns. Run this at - # CL ALL on one of the nodes which doesn't currently have the data, triggering a read repair. - # The expectation will be that every replicas will have been repaired for that column (but we make no assumptions - # on the other columns). - debug("Changing RF from 1 to 3") + # Alter so RF=n but don't repair, calling tests will execute queries to exercise read repair, + # either at CL.ALL or after setting read_repair_chance to 100%. + logger.debug("Changing RF from 1 to 3") session.execute("""ALTER KEYSPACE alter_rf_test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3};""") - if not cl_all: - debug("Setting table read repair chance to 1") - session.execute("""ALTER TABLE alter_rf_test.t1 WITH read_repair_chance = 1;""") - - cl = ConsistencyLevel.ALL if cl_all else ConsistencyLevel.ONE - - debug("Executing SELECT on non-initial replica to trigger read repair " + non_replicas[0].name) - read_repair_session = self.patient_exclusive_cql_connection(non_replicas[0]) - - if cl_all: - # result of the read repair query at cl=ALL contains only the selected column - assert_one(read_repair_session, "SELECT a FROM alter_rf_test.t1 WHERE k=1", [1], cl=cl) - else: - # With background read repair at CL=ONE, result may or may not be correct - stmt = SimpleStatement("SELECT a FROM alter_rf_test.t1 WHERE k=1", consistency_level=cl) - session.execute(stmt) - - # Check the results of the read repair by querying each replica again at CL ONE - debug("Re-running SELECTs at CL ONE to verify read repair") - for n in self.cluster.nodelist(): - debug("Checking " + n.name) - session = self.patient_exclusive_cql_connection(n) - res = rows_to_list(session.execute(cl_one_stmt)) - # Column a must be 1 everywhere, and column b must be either 1 or None everywhere - self.assertIn(res[0][:2], [[1, 1], [1, None]]) - - # Now query selecting all columns - query = "SELECT * FROM alter_rf_test.t1 WHERE k=1" - debug("Executing SELECT on non-initial replica to trigger read repair " + non_replicas[0].name) - read_repair_session = self.patient_exclusive_cql_connection(non_replicas[0]) - - if cl_all: - # result of the read repair query at cl=ALL should contain all columns - assert_one(session, query, [1, 1, 1], cl=cl) - else: - # With background read repair at CL=ONE, result may or may not be correct - stmt = SimpleStatement(query, consistency_level=cl) - session.execute(stmt) - - # Check all replica is fully up to date - debug("Re-running SELECTs at CL ONE to verify read repair") - for n in self.cluster.nodelist(): - debug("Checking " + n.name) - session = self.patient_exclusive_cql_connection(n) - assert_one(session, query, [1, 1, 1], cl=ConsistencyLevel.ONE) + return initial_replica, non_replicas - def identify_initial_placement(self, keyspace, table, key): + def identify_initial_placement(self): + """ + Identify which node in the 3 node cluster contains the specific key at the point that the test keyspace has + rf=1. + :return: tuple containing the initial replica, plus a list of the other 2 replicas. + """ nodes = self.cluster.nodelist() out, _, _ = nodes[0].nodetool("getendpoints alter_rf_test t1 1") address = out.split('\n')[-2] @@ -120,12 +162,31 @@ class TestReadRepair(Tester): else: non_replicas.append(node) - self.assertIsNotNone(initial_replica, "Couldn't identify initial replica") + assert initial_replica is not None, "Couldn't identify initial replica" return initial_replica, non_replicas + def check_data_on_each_replica(self, expect_fully_repaired, initial_replica): + """ + Perform a SELECT * query at CL.ONE on each replica in turn. If expect_fully_repaired is True, we verify that + each replica returns the full row being queried. If not, then we only verify that the 'a' column has been + repaired. + """ + stmt = SimpleStatement("SELECT * FROM alter_rf_test.t1 WHERE k=1", consistency_level=ConsistencyLevel.ONE) + logger.debug("Checking all if read repair has completed on all replicas") + for n in self.cluster.nodelist(): + logger.debug("Checking {n}, {x}expecting all columns" + .format(n=n.name, x="" if expect_fully_repaired or n == initial_replica else "not ")) + session = self.patient_exclusive_cql_connection(n) + res = rows_to_list(session.execute(stmt)) + logger.debug("Actual result: " + str(res)) + expected = [[1, 1, 1]] if expect_fully_repaired or n == initial_replica else [[1, 1, None]] + if res != expected: + raise NotRepairedException() + + @since('2.0') - def range_slice_query_with_tombstones_test(self): + def test_range_slice_query_with_tombstones(self): """ @jira_ticket CASSANDRA-8989 @jira_ticket CASSANDRA-9502 @@ -174,9 +235,9 @@ class TestReadRepair(Tester): for trace_event in trace.events: # Step 1, find coordinator node: activity = trace_event.description - self.assertNotIn("Appending to commitlog", activity) - self.assertNotIn("Adding to cf memtable", activity) - self.assertNotIn("Acquiring switchLock read lock", activity) + assert "Appending to commitlog" not in activity + assert "Adding to cf memtable" not in activity + assert "Acquiring switchLock read lock" not in activity @since('3.0') def test_gcable_tombstone_resurrection_on_range_slice_query(self): @@ -221,12 +282,21 @@ class TestReadRepair(Tester): self.pprint_trace(trace) for trace_event in trace.events: activity = trace_event.description - self.assertNotIn("Sending READ_REPAIR message", activity) + assert "Sending READ_REPAIR message" not in activity def pprint_trace(self, trace): """Pretty print a trace""" - if PRINT_DEBUG: - print("-" * 40) + if logging.root.level == logging.DEBUG: + print(("-" * 40)) for t in trace.events: - print("%s\t%s\t%s\t%s" % (t.source, t.source_elapsed, t.description, t.thread_name)) - print("-" * 40) + print(("%s\t%s\t%s\t%s" % (t.source, t.source_elapsed, t.description, t.thread_name))) + print(("-" * 40)) + + +class NotRepairedException(Exception): + """ + Thrown to indicate that the data on a replica hasn't been doesn't match what we'd expect if a + specific read repair has run. See check_data_on_each_replica. + """ + pass +
http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/rebuild_test.py ---------------------------------------------------------------------- diff --git a/rebuild_test.py b/rebuild_test.py index 795c945..919bd54 100644 --- a/rebuild_test.py +++ b/rebuild_test.py @@ -1,26 +1,36 @@ +import pytest import time +import logging + +from flaky import flaky + from threading import Thread from cassandra import ConsistencyLevel from ccmlib.node import ToolError -from dtest import Tester, debug, create_ks, create_cf +from dtest import Tester, create_ks, create_cf from tools.data import insert_c1c2, query_c1c2 -from tools.decorators import since, no_vnodes + +since = pytest.mark.since +logger = logging.getLogger(__name__) class TestRebuild(Tester): - ignore_log_patterns = ( - # This one occurs when trying to send the migration to a - # node that hasn't started yet, and when it does, it gets - # replayed and everything is fine. - r'Can\'t send migration request: node.*is down', - # ignore streaming error during bootstrap - r'Exception encountered during startup', - r'Streaming error occurred' - ) - - def simple_rebuild_test(self): + + @pytest.fixture(autouse=True) + def fixture_add_additional_log_patterns(self, fixture_dtest_setup): + fixture_dtest_setup.ignore_log_patterns = ( + # This one occurs when trying to send the migration to a + # node that hasn't started yet, and when it does, it gets + # replayed and everything is fine. + r'Can\'t send migration request: node.*is down', + # ignore streaming error during bootstrap + r'Exception encountered during startup', + r'Streaming error occurred' + ) + + def test_simple_rebuild(self): """ @jira_ticket CASSANDRA-9119 @@ -48,7 +58,7 @@ class TestRebuild(Tester): insert_c1c2(session, n=keys, consistency=ConsistencyLevel.LOCAL_ONE) # check data - for i in xrange(0, keys): + for i in range(0, keys): query_c1c2(session, i, ConsistencyLevel.LOCAL_ONE) session.shutdown() @@ -118,31 +128,35 @@ class TestRebuild(Tester): # manually raise exception from cmd1 thread # see http://stackoverflow.com/a/1854263 if cmd1.thread_exc_info is not None: - raise cmd1.thread_exc_info[1], None, cmd1.thread_exc_info[2] + raise cmd1.thread_exc_info[1].with_traceback(cmd1.thread_exc_info[2]) # exactly 1 of the two nodetool calls should fail # usually it will be the one in the main thread, # but occasionally it wins the race with the one in the secondary thread, # so we check that one succeeded and the other failed - self.assertEqual(self.rebuild_errors, 1, - msg='rebuild errors should be 1, but found {}. Concurrent rebuild should not be allowed, but one rebuild command should have succeeded.'.format(self.rebuild_errors)) + assert self.rebuild_errors == 1, \ + 'rebuild errors should be 1, but found {}. Concurrent rebuild should not be allowed, but one rebuild command should have succeeded.'.format(self.rebuild_errors) # check data - for i in xrange(0, keys): + for i in range(0, keys): query_c1c2(session, i, ConsistencyLevel.LOCAL_ONE) + @flaky @since('2.2') - def resumable_rebuild_test(self): + def test_resumable_rebuild(self): """ @jira_ticket CASSANDRA-10810 Test rebuild operation is resumable """ - self.ignore_log_patterns = list(self.ignore_log_patterns) + [r'Error while rebuilding node', - r'Streaming error occurred on session with peer 127.0.0.3', - r'Remote peer 127.0.0.3 failed stream session', - r'Streaming error occurred on session with peer 127.0.0.3:7000', - r'Remote peer 127.0.0.3:7000 failed stream session'] + self.fixture_dtest_setup.ignore_log_patterns = list(self.fixture_dtest_setup.ignore_log_patterns) + [ + r'Error while rebuilding node', + r'Streaming error occurred on session with peer 127.0.0.3', + r'Remote peer 127.0.0.3 failed stream session', + r'Streaming error occurred on session with peer 127.0.0.3:7000', + r'Remote peer 127.0.0.3:7000 failed stream session' + ] + cluster = self.cluster cluster.set_configuration_options(values={'endpoint_snitch': 'org.apache.cassandra.locator.PropertyFileSnitch'}) @@ -203,32 +217,32 @@ class TestRebuild(Tester): node3.byteman_submit(script) # First rebuild must fail and data must be incomplete - with self.assertRaises(ToolError, msg='Unexpected: SUCCEED'): - debug('Executing first rebuild -> '), + with pytest.raises(ToolError, msg='Unexpected: SUCCEED'): + logger.debug('Executing first rebuild -> '), node3.nodetool('rebuild dc1') - debug('Expected: FAILED') + logger.debug('Expected: FAILED') session.execute('USE ks') - with self.assertRaises(AssertionError, msg='Unexpected: COMPLETE'): - debug('Checking data is complete -> '), - for i in xrange(0, 20000): + with pytest.raises(AssertionError, msg='Unexpected: COMPLETE'): + logger.debug('Checking data is complete -> '), + for i in range(0, 20000): query_c1c2(session, i, ConsistencyLevel.LOCAL_ONE) - debug('Expected: INCOMPLETE') + logger.debug('Expected: INCOMPLETE') - debug('Executing second rebuild -> '), + logger.debug('Executing second rebuild -> '), node3.nodetool('rebuild dc1') - debug('Expected: SUCCEED') + logger.debug('Expected: SUCCEED') # Check all streaming sessions completed, streamed ranges are skipped and verify streamed data node3.watch_log_for('All sessions completed') node3.watch_log_for('Skipping streaming those ranges.') - debug('Checking data is complete -> '), - for i in xrange(0, 20000): + logger.debug('Checking data is complete -> '), + for i in range(0, 20000): query_c1c2(session, i, ConsistencyLevel.LOCAL_ONE) - debug('Expected: COMPLETE') + logger.debug('Expected: COMPLETE') @since('3.6') - def rebuild_ranges_test(self): + def test_rebuild_ranges(self): """ @jira_ticket CASSANDRA-10406 """ @@ -285,16 +299,16 @@ class TestRebuild(Tester): # check data is sent by stopping node1 node1.stop() - for i in xrange(0, keys): + for i in range(0, keys): query_c1c2(session, i, ConsistencyLevel.ONE) # ks2 should not be streamed session.execute('USE ks2') - for i in xrange(0, keys): + for i in range(0, keys): query_c1c2(session, i, ConsistencyLevel.ONE, tolerate_missing=True, must_be_missing=True) @since('3.10') - @no_vnodes() - def disallow_rebuild_nonlocal_range_test(self): + @pytest.mark.no_vnodes + def test_disallow_rebuild_nonlocal_range(self): """ @jira_ticket CASSANDRA-9875 Verifies that nodetool rebuild throws an error when an operator @@ -322,12 +336,12 @@ class TestRebuild(Tester): session = self.patient_exclusive_cql_connection(node1) session.execute("CREATE KEYSPACE ks1 WITH replication = {'class':'SimpleStrategy', 'replication_factor':2};") - with self.assertRaisesRegexp(ToolError, 'is not a range that is owned by this node'): + with pytest.raises(ToolError, match='is not a range that is owned by this node'): node1.nodetool('rebuild -ks ks1 -ts (%s,%s]' % (node1_token, node2_token)) @since('3.10') - @no_vnodes() - def disallow_rebuild_from_nonreplica_test(self): + @pytest.mark.no_vnodes + def test_disallow_rebuild_from_nonreplica(self): """ @jira_ticket CASSANDRA-9875 Verifies that nodetool rebuild throws an error when an operator @@ -358,12 +372,12 @@ class TestRebuild(Tester): session = self.patient_exclusive_cql_connection(node1) session.execute("CREATE KEYSPACE ks1 WITH replication = {'class':'SimpleStrategy', 'replication_factor':2};") - with self.assertRaisesRegexp(ToolError, 'Unable to find sufficient sources for streaming range'): + with pytest.raises(ToolError, message='Unable to find sufficient sources for streaming range'): node1.nodetool('rebuild -ks ks1 -ts (%s,%s] -s %s' % (node3_token, node1_token, node3_address)) @since('3.10') - @no_vnodes() - def rebuild_with_specific_sources_test(self): + @pytest.mark.no_vnodes + def test_rebuild_with_specific_sources(self): """ @jira_ticket CASSANDRA-9875 Verifies that an operator can specify specific sources to use @@ -426,18 +440,18 @@ class TestRebuild(Tester): # verify that node2 streamed to node3 log_matches = node2.grep_log('Session with %s is complete' % node3.address_for_current_version()) - self.assertTrue(len(log_matches) > 0) + assert len(log_matches) > 0 # verify that node1 did not participate log_matches = node1.grep_log('streaming plan for Rebuild') - self.assertEqual(len(log_matches), 0) + assert len(log_matches) == 0 # check data is sent by stopping node1, node2 node1.stop() node2.stop() - for i in xrange(0, keys): + for i in range(0, keys): query_c1c2(session, i, ConsistencyLevel.ONE) # ks2 should not be streamed session.execute('USE ks2') - for i in xrange(0, keys): + for i in range(0, keys): query_c1c2(session, i, ConsistencyLevel.ONE, tolerate_missing=True, must_be_missing=True) http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/repair_tests/deprecated_repair_test.py ---------------------------------------------------------------------- diff --git a/repair_tests/deprecated_repair_test.py b/repair_tests/deprecated_repair_test.py index e438f53..4c60664 100644 --- a/repair_tests/deprecated_repair_test.py +++ b/repair_tests/deprecated_repair_test.py @@ -1,15 +1,20 @@ +import pytest +import logging + from distutils.version import LooseVersion from cassandra import ConsistencyLevel from ccmlib.common import is_win -from dtest import Tester, debug, create_ks, create_cf +from dtest import Tester, create_ks, create_cf from tools.assertions import assert_length_equal from tools.data import insert_c1c2 -from tools.decorators import since from tools.jmxutils import (JolokiaAgent, make_mbean, remove_perf_disable_shared_mem) +since = pytest.mark.since +logger = logging.getLogger(__name__) + @since("2.2", max_version="4") class TestDeprecatedRepairAPI(Tester): @@ -19,7 +24,7 @@ class TestDeprecatedRepairAPI(Tester): Test if deprecated repair JMX API runs with expected parameters """ - def force_repair_async_1_test(self): + def test_force_repair_async_1(self): """ test forceRepairAsync(String keyspace, boolean isSequential, Collection<String> dataCenters, @@ -28,15 +33,15 @@ class TestDeprecatedRepairAPI(Tester): """ opt = self._deprecated_repair_jmx("forceRepairAsync(java.lang.String,boolean,java.util.Collection,java.util.Collection,boolean,boolean,[Ljava.lang.String;)", ['ks', True, [], [], False, False, ["cf"]]) - self.assertEqual(opt["parallelism"], "parallel" if is_win() else "sequential", opt) - self.assertEqual(opt["primary_range"], "false", opt) - self.assertEqual(opt["incremental"], "true", opt) - self.assertEqual(opt["job_threads"], "1", opt) - self.assertEqual(opt["data_centers"], "[]", opt) - self.assertEqual(opt["hosts"], "[]", opt) - self.assertEqual(opt["column_families"], "[cf]", opt) - - def force_repair_async_2_test(self): + assert opt["parallelism"], "parallel" if is_win() else "sequential" == opt + assert opt["primary_range"], "false" == opt + assert opt["incremental"], "true" == opt + assert opt["job_threads"], "1" == opt + assert opt["data_centers"], "[]" == opt + assert opt["hosts"], "[]" == opt + assert opt["column_families"], "[cf]" == opt + + def test_force_repair_async_2(self): """ test forceRepairAsync(String keyspace, int parallelismDegree, Collection<String> dataCenters, @@ -45,15 +50,15 @@ class TestDeprecatedRepairAPI(Tester): """ opt = self._deprecated_repair_jmx("forceRepairAsync(java.lang.String,int,java.util.Collection,java.util.Collection,boolean,boolean,[Ljava.lang.String;)", ['ks', 1, [], [], True, True, []]) - self.assertEqual(opt["parallelism"], "parallel", opt) - self.assertEqual(opt["primary_range"], "true", opt) - self.assertEqual(opt["incremental"], "false", opt) - self.assertEqual(opt["job_threads"], "1", opt) - self.assertEqual(opt["data_centers"], "[]", opt) - self.assertEqual(opt["hosts"], "[]", opt) - self.assertEqual(opt["column_families"], "[]", opt) - - def force_repair_async_3_test(self): + assert opt["parallelism"], "parallel" == opt + assert opt["primary_range"], "true" == opt + assert opt["incremental"], "false" == opt + assert opt["job_threads"], "1" == opt + assert opt["data_centers"], "[]" == opt + assert opt["hosts"], "[]" == opt + assert opt["column_families"], "[]" == opt + + def test_force_repair_async_3(self): """ test forceRepairAsync(String keyspace, boolean isSequential, boolean isLocal, boolean primaryRange, @@ -61,15 +66,15 @@ class TestDeprecatedRepairAPI(Tester): """ opt = self._deprecated_repair_jmx("forceRepairAsync(java.lang.String,boolean,boolean,boolean,boolean,[Ljava.lang.String;)", ['ks', False, False, False, False, ["cf"]]) - self.assertEqual(opt["parallelism"], "parallel", opt) - self.assertEqual(opt["primary_range"], "false", opt) - self.assertEqual(opt["incremental"], "true", opt) - self.assertEqual(opt["job_threads"], "1", opt) - self.assertEqual(opt["data_centers"], "[]", opt) - self.assertEqual(opt["hosts"], "[]", opt) - self.assertEqual(opt["column_families"], "[cf]", opt) - - def force_repair_range_async_1_test(self): + assert opt["parallelism"], "parallel" == opt + assert opt["primary_range"], "false" == opt + assert opt["incremental"], "true" == opt + assert opt["job_threads"], "1" == opt + assert opt["data_centers"], "[]" == opt + assert opt["hosts"], "[]" == opt + assert opt["column_families"], "[cf]" == opt + + def test_force_repair_range_async_1(self): """ test forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, boolean isSequential, @@ -79,16 +84,16 @@ class TestDeprecatedRepairAPI(Tester): """ opt = self._deprecated_repair_jmx("forceRepairRangeAsync(java.lang.String,java.lang.String,java.lang.String,boolean,java.util.Collection,java.util.Collection,boolean,[Ljava.lang.String;)", ["0", "1000", "ks", True, ["dc1"], [], False, ["cf"]]) - self.assertEqual(opt["parallelism"], "parallel" if is_win() else "sequential", opt) - self.assertEqual(opt["primary_range"], "false", opt) - self.assertEqual(opt["incremental"], "true", opt) - self.assertEqual(opt["job_threads"], "1", opt) - self.assertEqual(opt["data_centers"], "[dc1]", opt) - self.assertEqual(opt["hosts"], "[]", opt) - self.assertEqual(opt["ranges"], "1", opt) - self.assertEqual(opt["column_families"], "[cf]", opt) - - def force_repair_range_async_2_test(self): + assert opt["parallelism"], "parallel" if is_win() else "sequential" == opt + assert opt["primary_range"], "false" == opt + assert opt["incremental"], "true" == opt + assert opt["job_threads"], "1" == opt + assert opt["data_centers"], "[dc1]" == opt + assert opt["hosts"], "[]" == opt + assert opt["ranges"], "1" == opt + assert opt["column_families"], "[cf]" == opt + + def test_force_repair_range_async_2(self): """ test forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, int parallelismDegree, @@ -98,16 +103,16 @@ class TestDeprecatedRepairAPI(Tester): """ opt = self._deprecated_repair_jmx("forceRepairRangeAsync(java.lang.String,java.lang.String,java.lang.String,int,java.util.Collection,java.util.Collection,boolean,[Ljava.lang.String;)", ["0", "1000", "ks", 2, [], [], True, ["cf"]]) - self.assertEqual(opt["parallelism"], "parallel" if is_win() else "dc_parallel", opt) - self.assertEqual(opt["primary_range"], "false", opt) - self.assertEqual(opt["incremental"], "false", opt) - self.assertEqual(opt["job_threads"], "1", opt) - self.assertEqual(opt["data_centers"], "[]", opt) - self.assertEqual(opt["hosts"], "[]", opt) - self.assertEqual(opt["ranges"], "1", opt) - self.assertEqual(opt["column_families"], "[cf]", opt) - - def force_repair_range_async_3_test(self): + assert opt["parallelism"], "parallel" if is_win() else "dc_parallel" == opt + assert opt["primary_range"], "false" == opt + assert opt["incremental"], "false" == opt + assert opt["job_threads"], "1" == opt + assert opt["data_centers"], "[]" == opt + assert opt["hosts"], "[]" == opt + assert opt["ranges"], "1" == opt + assert opt["column_families"], "[cf]" == opt + + def test_force_repair_range_async_3(self): """ test forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, boolean isSequential, @@ -116,14 +121,14 @@ class TestDeprecatedRepairAPI(Tester): """ opt = self._deprecated_repair_jmx("forceRepairRangeAsync(java.lang.String,java.lang.String,java.lang.String,boolean,boolean,boolean,[Ljava.lang.String;)", ["0", "1000", "ks", True, True, True, ["cf"]]) - self.assertEqual(opt["parallelism"], "parallel" if is_win() else "sequential", opt) - self.assertEqual(opt["primary_range"], "false", opt) - self.assertEqual(opt["incremental"], "false", opt) - self.assertEqual(opt["job_threads"], "1", opt) - self.assertEqual(opt["data_centers"], "[dc1]", opt) - self.assertEqual(opt["hosts"], "[]", opt) - self.assertEqual(opt["ranges"], "1", opt) - self.assertEqual(opt["column_families"], "[cf]", opt) + assert opt["parallelism"], "parallel" if is_win() else "sequential" == opt + assert opt["primary_range"], "false" == opt + assert opt["incremental"], "false" == opt + assert opt["job_threads"], "1" == opt + assert opt["data_centers"], "[dc1]" == opt + assert opt["hosts"], "[]" == opt + assert opt["ranges"], "1" == opt + assert opt["column_families"], "[cf]" == opt def _deprecated_repair_jmx(self, method, arguments): """ @@ -135,7 +140,7 @@ class TestDeprecatedRepairAPI(Tester): """ cluster = self.cluster - debug("Starting cluster..") + logger.debug("Starting cluster..") cluster.populate([1, 1]) node1, node2 = cluster.nodelist() remove_perf_disable_shared_mem(node1) @@ -152,7 +157,7 @@ class TestDeprecatedRepairAPI(Tester): mbean = make_mbean('db', 'StorageService') with JolokiaAgent(node1) as jmx: # assert repair runs and returns valid cmd number - self.assertEqual(jmx.execute_method(mbean, method, arguments), 1) + assert jmx.execute_method(mbean, method, arguments) == 1 # wait for log to start node1.watch_log_for("Starting repair command") # get repair parameters from the log @@ -165,7 +170,7 @@ class TestDeprecatedRepairAPI(Tester): line, m = line[0] if supports_pull_repair: - self.assertEqual(m.group("pullrepair"), "false", "Pull repair cannot be enabled through the deprecated API so the pull repair option should always be false.") + assert m.group("pullrepair"), "false" == "Pull repair cannot be enabled through the deprecated API so the pull repair option should always be false." return {"parallelism": m.group("parallelism"), "primary_range": m.group("pr"), http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/repair_tests/incremental_repair_test.py ---------------------------------------------------------------------- diff --git a/repair_tests/incremental_repair_test.py b/repair_tests/incremental_repair_test.py index e3017c2..4791e9a 100644 --- a/repair_tests/incremental_repair_test.py +++ b/repair_tests/incremental_repair_test.py @@ -1,8 +1,11 @@ import time +import pytest +import re +import logging + from datetime import datetime from collections import Counter, namedtuple from re import findall, compile -from unittest import skip from uuid import UUID, uuid1 from cassandra import ConsistencyLevel @@ -10,13 +13,14 @@ from cassandra.query import SimpleStatement from cassandra.metadata import Murmur3Token from ccmlib.common import is_win from ccmlib.node import Node, ToolError -from nose.plugins.attrib import attr -from dtest import Tester, debug, create_ks, create_cf +from dtest import Tester, create_ks, create_cf from tools.assertions import assert_almost_equal, assert_one from tools.data import insert_c1c2 -from tools.decorators import since, no_vnodes -from tools.misc import new_node +from tools.misc import new_node, ImmutableMapping + +since = pytest.mark.since +logger = logging.getLogger(__name__) class ConsistentState(object): @@ -29,7 +33,12 @@ class ConsistentState(object): class TestIncRepair(Tester): - ignore_log_patterns = (r'Can\'t send migration request: node.*is down',) + + @pytest.fixture(autouse=True) + def fixture_add_additional_log_patterns(self, fixture_dtest_setup): + fixture_dtest_setup.ignore_log_patterns = ( + r'Can\'t send migration request: node.*is down' + ) @classmethod def _get_repaired_data(cls, node, keyspace): @@ -41,7 +50,7 @@ class TestIncRepair(Tester): out = node.run_sstablemetadata(keyspace=keyspace).stdout def matches(pattern): - return filter(None, [pattern.match(l) for l in out.split('\n')]) + return filter(None, [pattern.match(l) for l in out.decode("utf-8").split('\n')]) names = [m.group(1) for m in matches(_sstable_name)] repaired_times = [int(m.group(1)) for m in matches(_repaired_at)] @@ -57,37 +66,38 @@ class TestIncRepair(Tester): def assertNoRepairedSSTables(self, node, keyspace): """ Checks that no sstables are marked repaired, and none are marked pending repair """ data = self._get_repaired_data(node, keyspace) - self.assertTrue(all([t.repaired == 0 for t in data]), '{}'.format(data)) - self.assertTrue(all([t.pending_id is None for t in data])) + assert all([t.repaired == 0 for t in data]), '{}'.format(data) + assert all([t.pending_id is None for t in data]) def assertAllPendingRepairSSTables(self, node, keyspace, pending_id=None): """ Checks that no sstables are marked repaired, and all are marked pending repair """ data = self._get_repaired_data(node, keyspace) - self.assertTrue(all([t.repaired == 0 for t in data]), '{}'.format(data)) + assert all([t.repaired == 0 for t in data]), '{}'.format(data) if pending_id: - self.assertTrue(all([t.pending_id == pending_id for t in data])) + assert all([t.pending_id == pending_id for t in data]) else: - self.assertTrue(all([t.pending_id is not None for t in data])) + assert all([t.pending_id is not None for t in data]) def assertAllRepairedSSTables(self, node, keyspace): """ Checks that all sstables are marked repaired, and none are marked pending repair """ data = self._get_repaired_data(node, keyspace) - self.assertTrue(all([t.repaired > 0 for t in data]), '{}'.format(data)) - self.assertTrue(all([t.pending_id is None for t in data]), '{}'.format(data)) + assert all([t.repaired > 0 for t in data]), '{}'.format(data) + assert all([t.pending_id is None for t in data]), '{}'.format(data) def assertRepairedAndUnrepaired(self, node, keyspace): """ Checks that a node has both repaired and unrepaired sstables for a given keyspace """ data = self._get_repaired_data(node, keyspace) - self.assertTrue(any([t.repaired > 0 for t in data]), '{}'.format(data)) - self.assertTrue(any([t.repaired == 0 for t in data]), '{}'.format(data)) - self.assertTrue(all([t.pending_id is None for t in data]), '{}'.format(data)) + assert any([t.repaired > 0 for t in data]), '{}'.format(data) + assert any([t.repaired == 0 for t in data]), '{}'.format(data) + assert all([t.pending_id is None for t in data]), '{}'.format(data) @since('4.0') - def consistent_repair_test(self): - cluster = self.cluster - cluster.set_configuration_options(values={'hinted_handoff_enabled': False, 'num_tokens': 1, 'commitlog_sync_period_in_ms': 500}) - cluster.populate(3).start() - node1, node2, node3 = cluster.nodelist() + def test_consistent_repair(self): + self.fixture_dtest_setup.setup_overrides.cluster_options = ImmutableMapping({'hinted_handoff_enabled': 'false', + 'num_tokens': 1, + 'commitlog_sync_period_in_ms': 500}) + self.cluster.populate(3).start() + node1, node2, node3 = self.cluster.nodelist() # make data inconsistent between nodes session = self.patient_exclusive_cql_connection(node3) @@ -115,15 +125,15 @@ class TestIncRepair(Tester): node1.start(wait_other_notice=True, wait_for_binary_proto=True) # flush and check that no sstables are marked repaired - for node in cluster.nodelist(): + for node in self.cluster.nodelist(): node.flush() self.assertNoRepairedSSTables(node, 'ks') session = self.patient_exclusive_cql_connection(node) results = list(session.execute("SELECT * FROM system.repairs")) - self.assertEqual(len(results), 0, str(results)) + assert len(results) == 0, str(results) # disable compaction so we can verify sstables are marked pending repair - for node in cluster.nodelist(): + for node in self.cluster.nodelist(): node.nodetool('disableautocompaction ks tbl') node1.repair(options=['ks']) @@ -131,28 +141,28 @@ class TestIncRepair(Tester): # check that all participating nodes have the repair recorded in their system # table, that all nodes are listed as participants, and that all sstables are # (still) marked pending repair - expected_participants = {n.address() for n in cluster.nodelist()} - expected_participants_wp = {n.address_and_port() for n in cluster.nodelist()} + expected_participants = {n.address() for n in self.cluster.nodelist()} + expected_participants_wp = {n.address_and_port() for n in self.cluster.nodelist()} recorded_pending_ids = set() - for node in cluster.nodelist(): + for node in self.cluster.nodelist(): session = self.patient_exclusive_cql_connection(node) results = list(session.execute("SELECT * FROM system.repairs")) - self.assertEqual(len(results), 1) + assert len(results) == 1 result = results[0] - self.assertEqual(set(result.participants), expected_participants) + assert set(result.participants) == expected_participants if hasattr(result, "participants_wp"): - self.assertEqual(set(result.participants_wp), expected_participants_wp) - self.assertEqual(result.state, ConsistentState.FINALIZED, "4=FINALIZED") + assert set(result.participants_wp) == expected_participants_wp + assert result.state, ConsistentState.FINALIZED == "4=FINALIZED" pending_id = result.parent_id self.assertAllPendingRepairSSTables(node, 'ks', pending_id) recorded_pending_ids.add(pending_id) - self.assertEqual(len(recorded_pending_ids), 1) + assert len(recorded_pending_ids) == 1 # sstables are compacted out of pending repair by a compaction # task, we disabled compaction earlier in the test, so here we # force the compaction and check that all sstables are promoted - for node in cluster.nodelist(): + for node in self.cluster.nodelist(): node.nodetool('compact ks tbl') self.assertAllRepairedSSTables(node, 'ks') @@ -166,7 +176,7 @@ class TestIncRepair(Tester): ranges = {'\x00\x00\x00\x08K\xc2\xed\\<\xd3{X\x00\x00\x00\x08r\x04\x89[j\x81\xc4\xe6', '\x00\x00\x00\x08r\x04\x89[j\x81\xc4\xe6\x00\x00\x00\x08\xd8\xcdo\x9e\xcbl\x83\xd4', '\x00\x00\x00\x08\xd8\xcdo\x9e\xcbl\x83\xd4\x00\x00\x00\x08K\xc2\xed\\<\xd3{X'} - ranges = {buffer(b) for b in ranges} + ranges = {bytes(b, "Latin-1") for b in ranges} for node in self.cluster.nodelist(): session = self.patient_exclusive_cql_connection(node) @@ -177,8 +187,13 @@ class TestIncRepair(Tester): {str(n.address()) + ":7000" for n in self.cluster.nodelist()}, ranges, now, now, ConsistentState.REPAIRING]) # 2=REPAIRING + # as we faked repairs and inserted directly into system.repairs table, the current + # implementation in trunk (LocalSessions) only pulls the sessions via callbacks or + # from the system.repairs table once at startup. we need to stop and start the nodes + # as a way to force the repair sessions to get populated into the correct in-memory objects time.sleep(1) for node in self.cluster.nodelist(): + node.flush() node.stop(gently=False) for node in self.cluster.nodelist(): @@ -187,12 +202,13 @@ class TestIncRepair(Tester): return session_id @since('4.0') - def manual_session_fail_test(self): + def test_manual_session_fail(self): """ check manual failing of repair sessions via nodetool works properly """ - cluster = self.cluster - cluster.set_configuration_options(values={'hinted_handoff_enabled': False, 'num_tokens': 1, 'commitlog_sync_period_in_ms': 500}) - cluster.populate(3).start() - node1, node2, node3 = cluster.nodelist() + self.fixture_dtest_setup.setup_overrides.cluster_options = ImmutableMapping({'hinted_handoff_enabled': 'false', + 'num_tokens': 1, + 'commitlog_sync_period_in_ms': 500}) + self.cluster.populate(3).start() + node1, node2, node3 = self.cluster.nodelist() # make data inconsistent between nodes session = self.patient_exclusive_cql_connection(node3) @@ -201,35 +217,37 @@ class TestIncRepair(Tester): for node in self.cluster.nodelist(): out = node.nodetool('repair_admin') - self.assertIn("no sessions", out.stdout) + assert "no sessions" in out.stdout session_id = self._make_fake_session('ks', 'tbl') for node in self.cluster.nodelist(): out = node.nodetool('repair_admin') lines = out.stdout.split('\n') - self.assertGreater(len(lines), 1) + assert len(lines) > 1 line = lines[1] - self.assertIn(str(session_id), line) - self.assertIn("REPAIRING", line) + assert re.match(str(session_id), line) + assert "REPAIRING" in line node1.nodetool("repair_admin --cancel {}".format(session_id)) for node in self.cluster.nodelist(): out = node.nodetool('repair_admin --all') lines = out.stdout.split('\n') - self.assertGreater(len(lines), 1) + assert len(lines) > 1 line = lines[1] - self.assertIn(str(session_id), line) - self.assertIn("FAILED", line) + assert re.match(str(session_id), line) + assert "FAILED" in line @since('4.0') - def manual_session_cancel_non_coordinator_failure_test(self): + def test_manual_session_cancel_non_coordinator_failure(self): """ check manual failing of repair sessions via a node other than the coordinator fails """ - cluster = self.cluster - cluster.set_configuration_options(values={'hinted_handoff_enabled': False, 'num_tokens': 1, 'commitlog_sync_period_in_ms': 500}) - cluster.populate(3).start() - node1, node2, node3 = cluster.nodelist() + self.fixture_dtest_setup.setup_overrides.cluster_options = ImmutableMapping({'hinted_handoff_enabled': 'false', + 'num_tokens': 1, + 'commitlog_sync_period_in_ms': 500}) + + self.cluster.populate(3).start() + node1, node2, node3 = self.cluster.nodelist() # make data inconsistent between nodes session = self.patient_exclusive_cql_connection(node3) @@ -238,17 +256,17 @@ class TestIncRepair(Tester): for node in self.cluster.nodelist(): out = node.nodetool('repair_admin') - self.assertIn("no sessions", out.stdout) + assert "no sessions" in out.stdout session_id = self._make_fake_session('ks', 'tbl') for node in self.cluster.nodelist(): out = node.nodetool('repair_admin') lines = out.stdout.split('\n') - self.assertGreater(len(lines), 1) + assert len(lines) > 1 line = lines[1] - self.assertIn(str(session_id), line) - self.assertIn("REPAIRING", line) + assert re.match(str(session_id), line) + assert "REPAIRING" in line try: node2.nodetool("repair_admin --cancel {}".format(session_id)) @@ -260,18 +278,19 @@ class TestIncRepair(Tester): for node in self.cluster.nodelist(): out = node.nodetool('repair_admin') lines = out.stdout.split('\n') - self.assertGreater(len(lines), 1) + assert len(lines) > 1 line = lines[1] - self.assertIn(str(session_id), line) - self.assertIn("REPAIRING", line) + assert re.match(str(session_id), line) + assert "REPAIRING" in line @since('4.0') - def manual_session_force_cancel_test(self): + def test_manual_session_force_cancel(self): """ check manual failing of repair sessions via a non-coordinator works if the --force flag is set """ - cluster = self.cluster - cluster.set_configuration_options(values={'hinted_handoff_enabled': False, 'num_tokens': 1, 'commitlog_sync_period_in_ms': 500}) - cluster.populate(3).start() - node1, node2, node3 = cluster.nodelist() + self.fixture_dtest_setup.setup_overrides.cluster_options = ImmutableMapping({'hinted_handoff_enabled': 'false', + 'num_tokens': 1, + 'commitlog_sync_period_in_ms': 500}) + self.cluster.populate(3).start() + node1, node2, node3 = self.cluster.nodelist() # make data inconsistent between nodes session = self.patient_exclusive_cql_connection(node3) @@ -280,29 +299,29 @@ class TestIncRepair(Tester): for node in self.cluster.nodelist(): out = node.nodetool('repair_admin') - self.assertIn("no sessions", out.stdout) + assert "no sessions" in out.stdout session_id = self._make_fake_session('ks', 'tbl') for node in self.cluster.nodelist(): out = node.nodetool('repair_admin') lines = out.stdout.split('\n') - self.assertGreater(len(lines), 1) + assert len(lines) > 1 line = lines[1] - self.assertIn(str(session_id), line) - self.assertIn("REPAIRING", line) + assert re.match(str(session_id), line) + assert "REPAIRING" in line node2.nodetool("repair_admin --cancel {} --force".format(session_id)) for node in self.cluster.nodelist(): out = node.nodetool('repair_admin --all') lines = out.stdout.split('\n') - self.assertGreater(len(lines), 1) + assert len(lines) > 1 line = lines[1] - self.assertIn(str(session_id), line) - self.assertIn("FAILED", line) + assert re.match(str(session_id), line) + assert "FAILED" in line - def sstable_marking_test(self): + def test_sstable_marking(self): """ * Launch a three node cluster * Stop node3 @@ -311,11 +330,10 @@ class TestIncRepair(Tester): * Issue an incremental repair, and wait for it to finish * Run sstablemetadata on every node, assert that all sstables are marked as repaired """ - cluster = self.cluster # hinted handoff can create SSTable that we don't need after node3 restarted - cluster.set_configuration_options(values={'hinted_handoff_enabled': False}) - cluster.populate(3).start() - node1, node2, node3 = cluster.nodelist() + self.fixture_dtest_setup.setup_overrides.cluster_options = ImmutableMapping({'hinted_handoff_enabled': 'false'}) + self.cluster.populate(3).start() + node1, node2, node3 = self.cluster.nodelist() node3.stop(gently=True) @@ -331,20 +349,20 @@ class TestIncRepair(Tester): node3.watch_log_for("Initializing keyspace1.standard1", filename=log_file) # wait for things to settle before starting repair time.sleep(1) - if cluster.version() >= "2.2": + if self.cluster.version() >= "2.2": node3.repair() else: node3.nodetool("repair -par -inc") - if cluster.version() >= '4.0': + if self.cluster.version() >= '4.0': # sstables are compacted out of pending repair by a compaction - for node in cluster.nodelist(): + for node in self.cluster.nodelist(): node.nodetool('compact keyspace1 standard1') - for out in (node.run_sstablemetadata(keyspace='keyspace1').stdout for node in cluster.nodelist()): - self.assertNotIn('Repaired at: 0', out) + for out in (node.run_sstablemetadata(keyspace='keyspace1').stdout for node in self.cluster.nodelist()): + assert 'Repaired at: 0' not in out.decode("utf-8") - def multiple_repair_test(self): + def test_multiple_repair(self): """ * Launch a three node cluster * Create a keyspace with RF 3 and a table @@ -370,21 +388,21 @@ class TestIncRepair(Tester): create_ks(session, 'ks', 3) create_cf(session, 'cf', read_repair=0.0, columns={'c1': 'text', 'c2': 'text'}) - debug("insert data") + logger.debug("insert data") - insert_c1c2(session, keys=range(1, 50), consistency=ConsistencyLevel.ALL) + insert_c1c2(session, keys=list(range(1, 50)), consistency=ConsistencyLevel.ALL) node1.flush() - debug("bringing down node 3") + logger.debug("bringing down node 3") node3.flush() node3.stop(gently=False) - debug("inserting additional data into node 1 and 2") - insert_c1c2(session, keys=range(50, 100), consistency=ConsistencyLevel.TWO) + logger.debug("inserting additional data into node 1 and 2") + insert_c1c2(session, keys=list(range(50, 100)), consistency=ConsistencyLevel.TWO) node1.flush() node2.flush() - debug("restarting and repairing node 3") + logger.debug("restarting and repairing node 3") node3.start(wait_for_binary_proto=True) if cluster.version() >= "2.2": @@ -397,15 +415,15 @@ class TestIncRepair(Tester): if is_win: time.sleep(2) - debug("stopping node 2") + logger.debug("stopping node 2") node2.stop(gently=False) - debug("inserting data in nodes 1 and 3") - insert_c1c2(session, keys=range(100, 150), consistency=ConsistencyLevel.TWO) + logger.debug("inserting data in nodes 1 and 3") + insert_c1c2(session, keys=list(range(100, 150)), consistency=ConsistencyLevel.TWO) node1.flush() node3.flush() - debug("start and repair node 2") + logger.debug("start and repair node 2") node2.start(wait_for_binary_proto=True) if cluster.version() >= "2.2": @@ -413,7 +431,7 @@ class TestIncRepair(Tester): else: node2.nodetool("repair -par -inc") - debug("replace node and check data integrity") + logger.debug("replace node and check data integrity") node3.stop(gently=False) node5 = Node('node5', cluster, True, ('127.0.0.5', 9160), ('127.0.0.5', 7000), '7500', '0', None, ('127.0.0.5', 9042)) cluster.add(node5, False) @@ -421,7 +439,7 @@ class TestIncRepair(Tester): assert_one(session, "SELECT COUNT(*) FROM ks.cf LIMIT 200", [149]) - def sstable_repairedset_test(self): + def test_sstable_repairedset(self): """ * Launch a two node cluster * Insert data with stress @@ -437,10 +455,9 @@ class TestIncRepair(Tester): * Run sstablemetadata on both nodes again, pipe to a new file * Verify repairs occurred and repairedAt was updated """ - cluster = self.cluster - cluster.set_configuration_options(values={'hinted_handoff_enabled': False}) - cluster.populate(2).start() - node1, node2 = cluster.nodelist() + self.fixture_dtest_setup.setup_overrides.cluster_options = ImmutableMapping({'hinted_handoff_enabled': 'false'}) + self.cluster.populate(2).start() + node1, node2 = self.cluster.nodelist() node1.stress(['write', 'n=10K', 'no-warmup', '-schema', 'replication(factor=2)', 'compaction(strategy=SizeTieredCompactionStrategy,enabled=false)', '-rate', 'threads=50']) node1.flush() @@ -451,53 +468,57 @@ class TestIncRepair(Tester): node2.run_sstablerepairedset(keyspace='keyspace1') node2.start(wait_for_binary_proto=True) - initialOut1 = node1.run_sstablemetadata(keyspace='keyspace1').stdout - initialOut2 = node2.run_sstablemetadata(keyspace='keyspace1').stdout + initialOut1 = node1.run_sstablemetadata(keyspace='keyspace1').stdout.decode("utf-8") + initialOut2 = node2.run_sstablemetadata(keyspace='keyspace1').stdout.decode("utf-8") matches = findall('(?<=Repaired at:).*', '\n'.join([initialOut1, initialOut2])) - debug("Repair timestamps are: {}".format(matches)) + logger.debug("Repair timestamps are: {}".format(matches)) uniquematches = set(matches) matchcount = Counter(matches) - self.assertGreaterEqual(len(uniquematches), 2, uniquematches) + assert len(uniquematches) >= 2, uniquematches - self.assertGreaterEqual(max(matchcount), 1, matchcount) + assert len(max(matchcount)) >= 1, matchcount - self.assertIn('Repaired at: 0', '\n'.join([initialOut1, initialOut2])) + assert re.search('Repaired at: 0', '\n'.join([initialOut1, initialOut2])) node1.stop() node2.stress(['write', 'n=15K', 'no-warmup', '-schema', 'replication(factor=2)']) node2.flush() node1.start(wait_for_binary_proto=True) - if cluster.version() >= "2.2": + if self.cluster.version() >= "2.2": node1.repair() else: node1.nodetool("repair -par -inc") - if cluster.version() >= '4.0': + if self.cluster.version() >= '4.0': # sstables are compacted out of pending repair by a compaction - for node in cluster.nodelist(): + for node in self.cluster.nodelist(): node.nodetool('compact keyspace1 standard1') finalOut1 = node1.run_sstablemetadata(keyspace='keyspace1').stdout + if not isinstance(finalOut1, str): + finalOut1 = finalOut1.decode("utf-8") finalOut2 = node2.run_sstablemetadata(keyspace='keyspace1').stdout + if not isinstance(finalOut2, str): + finalOut2 = finalOut2.decode("utf-8") matches = findall('(?<=Repaired at:).*', '\n'.join([finalOut1, finalOut2])) - debug(matches) + logger.debug(matches) uniquematches = set(matches) matchcount = Counter(matches) - self.assertGreaterEqual(len(uniquematches), 2) + assert len(uniquematches) >= 2 - self.assertGreaterEqual(max(matchcount), 2) + assert len(max(matchcount)) >= 2 - self.assertNotIn('Repaired at: 0', '\n'.join([finalOut1, finalOut2])) + assert not re.search('Repaired at: 0', '\n'.join([finalOut1, finalOut2])) - def compaction_test(self): + def test_compaction(self): """ Test we can major compact after an incremental repair * Launch a three node cluster @@ -543,22 +564,22 @@ class TestIncRepair(Tester): assert_one(session, "select val from tab where key =" + str(x), [1]) @since("2.2") - def multiple_full_repairs_lcs_test(self): + def test_multiple_full_repairs_lcs(self): """ @jira_ticket CASSANDRA-11172 - repeated full repairs should not cause infinite loop in getNextBackgroundTask """ cluster = self.cluster cluster.populate(2).start(wait_for_binary_proto=True) node1, node2 = cluster.nodelist() - for x in xrange(0, 10): + for x in range(0, 10): node1.stress(['write', 'n=100k', 'no-warmup', '-rate', 'threads=10', '-schema', 'compaction(strategy=LeveledCompactionStrategy,sstable_size_in_mb=10)', 'replication(factor=2)']) cluster.flush() cluster.wait_for_compactions() node1.nodetool("repair -full keyspace1 standard1") - @attr('long') - @skip('hangs CI') - def multiple_subsequent_repair_test(self): + @pytest.mark.env("long") + @pytest.mark.skip(reason='hangs CI') + def test_multiple_subsequent_repair(self): """ @jira_ticket CASSANDRA-8366 @@ -576,55 +597,55 @@ class TestIncRepair(Tester): cluster.populate(3).start() node1, node2, node3 = cluster.nodelist() - debug("Inserting data with stress") + logger.debug("Inserting data with stress") node1.stress(['write', 'n=5M', 'no-warmup', '-rate', 'threads=10', '-schema', 'replication(factor=3)']) - debug("Flushing nodes") + logger.debug("Flushing nodes") cluster.flush() - debug("Waiting compactions to finish") + logger.debug("Waiting compactions to finish") cluster.wait_for_compactions() if self.cluster.version() >= '2.2': - debug("Repairing node1") + logger.debug("Repairing node1") node1.nodetool("repair") - debug("Repairing node2") + logger.debug("Repairing node2") node2.nodetool("repair") - debug("Repairing node3") + logger.debug("Repairing node3") node3.nodetool("repair") else: - debug("Repairing node1") + logger.debug("Repairing node1") node1.nodetool("repair -par -inc") - debug("Repairing node2") + logger.debug("Repairing node2") node2.nodetool("repair -par -inc") - debug("Repairing node3") + logger.debug("Repairing node3") node3.nodetool("repair -par -inc") - # Using "print" instead of debug() here is on purpose. The compactions + # Using "print" instead of logger.debug() here is on purpose. The compactions # take a long time and don't print anything by default, which can result # in the test being timed out after 20 minutes. These print statements # prevent it from being timed out. - print "compacting node1" + print("compacting node1") node1.compact() - print "compacting node2" + print("compacting node2") node2.compact() - print "compacting node3" + print("compacting node3") node3.compact() # wait some time to be sure the load size is propagated between nodes - debug("Waiting for load size info to be propagated between nodes") + logger.debug("Waiting for load size info to be propagated between nodes") time.sleep(45) - load_size_in_kb = float(sum(map(lambda n: n.data_size(), [node1, node2, node3]))) + load_size_in_kb = float(sum([n.data_size() for n in [node1, node2, node3]])) load_size = load_size_in_kb / 1024 / 1024 - debug("Total Load size: {}GB".format(load_size)) + logger.debug("Total Load size: {}GB".format(load_size)) # There is still some overhead, but it's lot better. We tolerate 25%. expected_load_size = 4.5 # In GB assert_almost_equal(load_size, expected_load_size, error=0.25) - @attr('resource-intensive') - def sstable_marking_test_not_intersecting_all_ranges(self): + @pytest.mark.resource_intensive + def test_sstable_marking_not_intersecting_all_ranges(self): """ @jira_ticket CASSANDRA-10299 * Launch a four node cluster @@ -636,21 +657,21 @@ class TestIncRepair(Tester): cluster.populate(4).start(wait_for_binary_proto=True) node1, node2, node3, node4 = cluster.nodelist() - debug("Inserting data with stress") + logger.debug("Inserting data with stress") node1.stress(['write', 'n=3', 'no-warmup', '-rate', 'threads=1', '-schema', 'replication(factor=3)']) - debug("Flushing nodes") + logger.debug("Flushing nodes") cluster.flush() repair_options = '' if self.cluster.version() >= '2.2' else '-inc -par' - debug("Repairing node 1") + logger.debug("Repairing node 1") node1.nodetool("repair {}".format(repair_options)) - debug("Repairing node 2") + logger.debug("Repairing node 2") node2.nodetool("repair {}".format(repair_options)) - debug("Repairing node 3") + logger.debug("Repairing node 3") node3.nodetool("repair {}".format(repair_options)) - debug("Repairing node 4") + logger.debug("Repairing node 4") node4.nodetool("repair {}".format(repair_options)) if cluster.version() >= '4.0': @@ -659,16 +680,16 @@ class TestIncRepair(Tester): node.nodetool('compact keyspace1 standard1') for out in (node.run_sstablemetadata(keyspace='keyspace1').stdout for node in cluster.nodelist() if len(node.get_sstables('keyspace1', 'standard1')) > 0): - self.assertNotIn('Repaired at: 0', out) + assert 'Repaired at: 0' not in out - @no_vnodes() + @pytest.mark.no_vnodes @since('4.0') - def move_test(self): + def test_move(self): """ Test repaired data remains in sync after a move """ - cluster = self.cluster - cluster.set_configuration_options(values={'hinted_handoff_enabled': False, 'commitlog_sync_period_in_ms': 500}) - cluster.populate(4, tokens=[0, 2**32, 2**48, -(2**32)]).start() - node1, node2, node3, node4 = cluster.nodelist() + self.fixture_dtest_setup.setup_overrides.cluster_options = ImmutableMapping({'hinted_handoff_enabled': 'false', + 'commitlog_sync_period_in_ms': 500}) + self.cluster.populate(4, tokens=[0, 2**32, 2**48, -(2**32)]).start() + node1, node2, node3, node4 = self.cluster.nodelist() session = self.patient_exclusive_cql_connection(node3) session.execute("CREATE KEYSPACE ks WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 2}") @@ -686,25 +707,25 @@ class TestIncRepair(Tester): session.execute(stmt, (v, v)) # everything should be in sync - for node in cluster.nodelist(): + for node in self.cluster.nodelist(): result = node.repair(options=['ks', '--validate']) - self.assertIn("Repaired data is in sync", result.stdout) + assert "Repaired data is in sync" in result.stdout node2.nodetool('move {}'.format(2**16)) # everything should still be in sync - for node in cluster.nodelist(): + for node in self.cluster.nodelist(): result = node.repair(options=['ks', '--validate']) - self.assertIn("Repaired data is in sync", result.stdout) + assert "Repaired data is in sync" in result.stdout - @no_vnodes() + @pytest.mark.no_vnodes @since('4.0') - def decommission_test(self): + def test_decommission(self): """ Test repaired data remains in sync after a decommission """ - cluster = self.cluster - cluster.set_configuration_options(values={'hinted_handoff_enabled': False, 'commitlog_sync_period_in_ms': 500}) - cluster.populate(4).start() - node1, node2, node3, node4 = cluster.nodelist() + self.fixture_dtest_setup.setup_overrides.cluster_options = ImmutableMapping({'hinted_handoff_enabled': 'false', + 'commitlog_sync_period_in_ms': 500}) + self.cluster.populate(4).start() + node1, node2, node3, node4 = self.cluster.nodelist() session = self.patient_exclusive_cql_connection(node3) session.execute("CREATE KEYSPACE ks WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 2}") @@ -722,25 +743,25 @@ class TestIncRepair(Tester): session.execute(stmt, (v, v)) # everything should be in sync - for node in cluster.nodelist(): + for node in self.cluster.nodelist(): result = node.repair(options=['ks', '--validate']) - self.assertIn("Repaired data is in sync", result.stdout) + assert "Repaired data is in sync" in result.stdout node2.nodetool('decommission') # everything should still be in sync for node in [node1, node3, node4]: result = node.repair(options=['ks', '--validate']) - self.assertIn("Repaired data is in sync", result.stdout) + assert "Repaired data is in sync" in result.stdout - @no_vnodes() + @pytest.mark.no_vnodes @since('4.0') - def bootstrap_test(self): + def test_bootstrap(self): """ Test repaired data remains in sync after a bootstrap """ - cluster = self.cluster - cluster.set_configuration_options(values={'hinted_handoff_enabled': False, 'commitlog_sync_period_in_ms': 500}) - cluster.populate(3).start() - node1, node2, node3 = cluster.nodelist() + self.fixture_dtest_setup.setup_overrides.cluster_options = ImmutableMapping({'hinted_handoff_enabled': 'false', + 'commitlog_sync_period_in_ms': 500}) + self.cluster.populate(3).start() + node1, node2, node3 = self.cluster.nodelist() session = self.patient_exclusive_cql_connection(node3) session.execute("CREATE KEYSPACE ks WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 2}") @@ -760,27 +781,28 @@ class TestIncRepair(Tester): # everything should be in sync for node in [node1, node2, node3]: result = node.repair(options=['ks', '--validate']) - self.assertIn("Repaired data is in sync", result.stdout) + assert "Repaired data is in sync" in result.stdout node4 = new_node(self.cluster) node4.start(wait_for_binary_proto=True) - self.assertEqual(len(self.cluster.nodelist()), 4) + assert len(self.cluster.nodelist()) == 4 # everything should still be in sync for node in self.cluster.nodelist(): result = node.repair(options=['ks', '--validate']) - self.assertIn("Repaired data is in sync", result.stdout) + assert "Repaired data is in sync" in result.stdout @since('4.0') - def force_test(self): + def test_force(self): """ forcing an incremental repair should incrementally repair any nodes that are up, but should not promote the sstables to repaired """ - cluster = self.cluster - cluster.set_configuration_options(values={'hinted_handoff_enabled': False, 'num_tokens': 1, 'commitlog_sync_period_in_ms': 500}) - cluster.populate(3).start() - node1, node2, node3 = cluster.nodelist() + self.fixture_dtest_setup.setup_overrides.cluster_options = ImmutableMapping({'hinted_handoff_enabled': 'false', + 'num_tokens': 1, + 'commitlog_sync_period_in_ms': 500}) + self.cluster.populate(3).start() + node1, node2, node3 = self.cluster.nodelist() session = self.patient_exclusive_cql_connection(node3) session.execute("CREATE KEYSPACE ks WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 3}") @@ -793,7 +815,7 @@ class TestIncRepair(Tester): node2.stop() # repair should fail because node2 is down - with self.assertRaises(ToolError): + with pytest.raises(ToolError): node1.repair(options=['ks']) # run with force flag @@ -804,15 +826,16 @@ class TestIncRepair(Tester): self.assertNoRepairedSSTables(node2, 'ks') @since('4.0') - def hosts_test(self): + def test_hosts(self): """ running an incremental repair with hosts specified should incrementally repair the given nodes, but should not promote the sstables to repaired """ - cluster = self.cluster - cluster.set_configuration_options(values={'hinted_handoff_enabled': False, 'num_tokens': 1, 'commitlog_sync_period_in_ms': 500}) - cluster.populate(3).start() - node1, node2, node3 = cluster.nodelist() + self.fixture_dtest_setup.setup_overrides.cluster_options = ImmutableMapping({'hinted_handoff_enabled': 'false', + 'num_tokens': 1, + 'commitlog_sync_period_in_ms': 500}) + self.cluster.populate(3).start() + node1, node2, node3 = self.cluster.nodelist() session = self.patient_exclusive_cql_connection(node3) session.execute("CREATE KEYSPACE ks WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 3}") @@ -830,18 +853,17 @@ class TestIncRepair(Tester): self.assertNoRepairedSSTables(node2, 'ks') @since('4.0') - def subrange_test(self): + def test_subrange(self): """ running an incremental repair with hosts specified should incrementally repair the given nodes, but should not promote the sstables to repaired """ - cluster = self.cluster - cluster.set_configuration_options(values={'hinted_handoff_enabled': False, - 'num_tokens': 1, - 'commitlog_sync_period_in_ms': 500, - 'partitioner': 'org.apache.cassandra.dht.Murmur3Partitioner'}) - cluster.populate(3).start() - node1, node2, node3 = cluster.nodelist() + self.fixture_dtest_setup.setup_overrides.cluster_options = ImmutableMapping({'hinted_handoff_enabled': 'false', + 'num_tokens': 1, + 'commitlog_sync_period_in_ms': 500, + 'partitioner': 'org.apache.cassandra.dht.Murmur3Partitioner'}) + self.cluster.populate(3).start() + node1, node2, node3 = self.cluster.nodelist() session = self.patient_exclusive_cql_connection(node3) session.execute("CREATE KEYSPACE ks WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 3}") @@ -851,12 +873,12 @@ class TestIncRepair(Tester): for i in range(10): session.execute(stmt, (i, i)) - for node in cluster.nodelist(): + for node in self.cluster.nodelist(): node.flush() self.assertNoRepairedSSTables(node, 'ks') # only repair the partition k=0 - token = Murmur3Token.from_key(str(bytearray([0, 0, 0, 0]))) + token = Murmur3Token.from_key(bytes([0, 0, 0, 0])) # import ipdb; ipdb.set_trace() # run with force flag node1.repair(options=['ks', '-st', str(token.value - 1), '-et', str(token.value)]) http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/repair_tests/preview_repair_test.py ---------------------------------------------------------------------- diff --git a/repair_tests/preview_repair_test.py b/repair_tests/preview_repair_test.py index 86627ab..ee5a38d 100644 --- a/repair_tests/preview_repair_test.py +++ b/repair_tests/preview_repair_test.py @@ -1,23 +1,25 @@ +import pytest import time from cassandra import ConsistencyLevel from cassandra.query import SimpleStatement from dtest import Tester -from tools.decorators import no_vnodes, since + +since = pytest.mark.since @since('4.0') -class PreviewRepairTest(Tester): +class TestPreviewRepair(Tester): def assert_no_repair_history(self, session): rows = session.execute("select * from system_distributed.repair_history") - self.assertEqual(rows.current_rows, []) + assert rows.current_rows == [] rows = session.execute("select * from system_distributed.parent_repair_history") - self.assertEqual(rows.current_rows, []) + assert rows.current_rows == [] - @no_vnodes() - def preview_test(self): + @pytest.mark.no_vnodes + def test_preview(self): """ Test that preview correctly detects out of sync data """ cluster = self.cluster cluster.set_configuration_options(values={'hinted_handoff_enabled': False, 'commitlog_sync_period_in_ms': 500}) @@ -30,7 +32,7 @@ class PreviewRepairTest(Tester): # everything should be in sync result = node1.repair(options=['ks', '--preview']) - self.assertIn("Previewed data was in sync", result.stdout) + assert "Previewed data was in sync" in result.stdout self.assert_no_repair_history(session) # make data inconsistent between nodes @@ -57,16 +59,16 @@ class PreviewRepairTest(Tester): # data should not be in sync for full and unrepaired previews result = node1.repair(options=['ks', '--preview']) - self.assertIn("Total estimated streaming", result.stdout) - self.assertNotIn("Previewed data was in sync", result.stdout) + assert "Total estimated streaming" in result.stdout + assert "Previewed data was in sync" not in result.stdout result = node1.repair(options=['ks', '--preview', '--full']) - self.assertIn("Total estimated streaming", result.stdout) - self.assertNotIn("Previewed data was in sync", result.stdout) + assert "Total estimated streaming" in result.stdout + assert "Previewed data was in sync" not in result.stdout # repaired data should be in sync anyway result = node1.repair(options=['ks', '--validate']) - self.assertIn("Repaired data is in sync", result.stdout) + assert "Repaired data is in sync" in result.stdout self.assert_no_repair_history(session) @@ -77,10 +79,10 @@ class PreviewRepairTest(Tester): # ...and everything should be in sync result = node1.repair(options=['ks', '--preview']) - self.assertIn("Previewed data was in sync", result.stdout) + assert "Previewed data was in sync" in result.stdout result = node1.repair(options=['ks', '--preview', '--full']) - self.assertIn("Previewed data was in sync", result.stdout) + assert "Previewed data was in sync" in result.stdout result = node1.repair(options=['ks', '--validate']) - self.assertIn("Repaired data is in sync", result.stdout) + assert "Repaired data is in sync" in result.stdout --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org