http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/repair_tests/repair_test.py
----------------------------------------------------------------------
diff --git a/repair_tests/repair_test.py b/repair_tests/repair_test.py
index 238871f..59910e0 100644
--- a/repair_tests/repair_test.py
+++ b/repair_tests/repair_test.py
@@ -3,18 +3,21 @@ import os.path
 import threading
 import time
 import re
+import pytest
+import logging
+
 from collections import namedtuple
 from threading import Thread
-from unittest import skip, skipIf
 
 from cassandra import ConsistencyLevel
 from cassandra.query import SimpleStatement
 from ccmlib.node import ToolError
-from nose.plugins.attrib import attr
 
-from dtest import CASSANDRA_VERSION_FROM_BUILD, FlakyRetryPolicy, Tester, 
debug, create_ks, create_cf
+from dtest import CASSANDRA_VERSION_FROM_BUILD, FlakyRetryPolicy, Tester, 
create_ks, create_cf
 from tools.data import insert_c1c2, query_c1c2
-from tools.decorators import no_vnodes, since
+
+since = pytest.mark.since
+logger = logging.getLogger(__name__)
 
 
 def _repair_options(version, ks='', cf=None, sequential=True):
@@ -46,7 +49,6 @@ def _repair_options(version, ks='', cf=None, sequential=True):
 
 
 class BaseRepairTest(Tester):
-    __test__ = False
 
     def check_rows_on_node(self, node_to_check, rows, found=None, 
missings=None, restart=True):
         """
@@ -64,14 +66,14 @@ class BaseRepairTest(Tester):
             missings = []
         stopped_nodes = []
 
-        for node in self.cluster.nodes.values():
+        for node in list(self.cluster.nodes.values()):
             if node.is_running() and node is not node_to_check:
                 stopped_nodes.append(node)
                 node.stop(wait_other_notice=True)
 
         session = self.patient_exclusive_cql_connection(node_to_check, 'ks')
-        result = list(session.execute("SELECT * FROM cf LIMIT {}".format(rows 
* 2)))
-        self.assertEqual(len(result), rows)
+        result = list(session.execute("SELECT * FROM cf LIMIT {}".format(rows 
* 2), timeout=10))
+        assert len(result) == rows
 
         for k in found:
             query_c1c2(session, k, ConsistencyLevel.ONE)
@@ -79,7 +81,7 @@ class BaseRepairTest(Tester):
         for k in missings:
             query = SimpleStatement("SELECT c1, c2 FROM cf WHERE 
key='k{}'".format(k), consistency_level=ConsistencyLevel.ONE)
             res = list(session.execute(query))
-            self.assertEqual(len(filter(lambda x: len(x) != 0, res)), 0, res)
+            assert len([x for x in res if len(x) != 0]) == 0, res
 
         if restart:
             for node in stopped_nodes:
@@ -92,7 +94,7 @@ class BaseRepairTest(Tester):
         # interfere with the test (this must be after the populate)
         cluster.set_configuration_options(values={'hinted_handoff_enabled': 
False})
         cluster.set_batch_commitlog(enabled=True)
-        debug("Starting cluster..")
+        logger.debug("Starting cluster..")
         cluster.populate(3).start()
         node1, node2, node3 = cluster.nodelist()
 
@@ -101,13 +103,13 @@ class BaseRepairTest(Tester):
         create_cf(session, 'cf', read_repair=0.0, columns={'c1': 'text', 'c2': 
'text'})
 
         # Insert 1000 keys, kill node 3, insert 1 key, restart node 3, insert 
1000 more keys
-        debug("Inserting data...")
+        logger.debug("Inserting data...")
         insert_c1c2(session, n=1000, consistency=ConsistencyLevel.ALL)
         node3.flush()
         node3.stop(wait_other_notice=True)
         insert_c1c2(session, keys=(1000, ), consistency=ConsistencyLevel.TWO)
         node3.start(wait_other_notice=True, wait_for_binary_proto=True)
-        insert_c1c2(session, keys=range(1001, 2001), 
consistency=ConsistencyLevel.ALL)
+        insert_c1c2(session, keys=list(range(1001, 2001)), 
consistency=ConsistencyLevel.ALL)
 
         cluster.flush()
 
@@ -116,46 +118,46 @@ class BaseRepairTest(Tester):
         node1, node2, node3 = cluster.nodelist()
 
         # Verify that node3 has only 2000 keys
-        debug("Checking data on node3...")
+        logger.debug("Checking data on node3...")
         self.check_rows_on_node(node3, 2000, missings=[1000])
 
         # Verify that node1 has 2001 keys
-        debug("Checking data on node1...")
+        logger.debug("Checking data on node1...")
         self.check_rows_on_node(node1, 2001, found=[1000])
 
         # Verify that node2 has 2001 keys
-        debug("Checking data on node2...")
+        logger.debug("Checking data on node2...")
         self.check_rows_on_node(node2, 2001, found=[1000])
 
         time.sleep(10)  # see CASSANDRA-4373
         # Run repair
         start = time.time()
-        debug("starting repair...")
+        logger.debug("starting repair...")
         node1.repair(_repair_options(self.cluster.version(), ks='ks', 
sequential=sequential))
-        debug("Repair time: {end}".format(end=time.time() - start))
+        logger.debug("Repair time: {end}".format(end=time.time() - start))
 
         # Validate that only one range was transfered
         out_of_sync_logs = node1.grep_log("{} and {} have ([0-9]+) range\(s\) 
out of sync".format(cluster.address_regex(), cluster.address_regex()))
 
-        self.assertEqual(len(out_of_sync_logs), 2, "Lines matching: " + 
str([elt[0] for elt in out_of_sync_logs]))
+        assert len(out_of_sync_logs) == 2, "Lines matching: " + str([elt[0] 
for elt in out_of_sync_logs])
 
         valid_out_of_sync_pairs = [{node1.address(), node3.address()},
                                    {node2.address(), node3.address()}]
 
         for line, m in out_of_sync_logs:
             num_out_of_sync_ranges, out_of_sync_nodes = m.group(3), 
{m.group(1), m.group(2)}
-            self.assertEqual(int(num_out_of_sync_ranges), 1, "Expecting 1 
range out of sync for {}, but saw {}".format(out_of_sync_nodes, line))
-            self.assertIn(out_of_sync_nodes, valid_out_of_sync_pairs, 
str(out_of_sync_nodes))
+            assert int(num_out_of_sync_ranges) == 1, \
+                "Expecting 1 range out of sync for {}, but saw 
{}".format(out_of_sync_nodes, num_out_of_sync_ranges)
+            assert out_of_sync_nodes, valid_out_of_sync_pairs in 
str(out_of_sync_nodes)
 
         # Check node3 now has the key
         self.check_rows_on_node(node3, 2001, found=[1000], restart=False)
 
 
 class TestRepair(BaseRepairTest):
-    __test__ = True
 
-    @since('2.2.1', '4')
-    def no_anticompaction_after_dclocal_repair_test(self):
+    @since('2.2.1', max_version='4')
+    def test_no_anticompaction_after_dclocal_repair(self):
         """
         * Launch a four node, two DC cluster
         * Start a -local repair on node1 in dc1
@@ -166,44 +168,44 @@ class TestRepair(BaseRepairTest):
         @jira_ticket CASSANDRA-10422
         """
         cluster = self.cluster
-        debug("Starting cluster..")
+        logger.debug("Starting cluster..")
         cluster.populate([2, 2]).start(wait_for_binary_proto=True)
         node1_1, node2_1, node1_2, node2_2 = cluster.nodelist()
         node1_1.stress(stress_options=['write', 'n=50K', 'no-warmup', 
'cl=ONE', '-schema', 'replication(factor=4)', '-rate', 'threads=50'])
         node1_1.nodetool("repair -local keyspace1 standard1")
-        self.assertTrue(node1_1.grep_log("Not a global repair"))
-        self.assertTrue(node2_1.grep_log("Not a global repair"))
+        assert node1_1.grep_log("Not a global repair")
+        assert node2_1.grep_log("Not a global repair")
 
         # dc2 should not see these messages:
-        self.assertFalse(node1_2.grep_log("Not a global repair"))
-        self.assertFalse(node2_2.grep_log("Not a global repair"))
+        assert not node1_2.grep_log("Not a global repair")
+        assert not node2_2.grep_log("Not a global repair")
 
         # and no nodes should do anticompaction:
         for node in cluster.nodelist():
-            self.assertFalse(node.grep_log("Starting anticompaction"))
+            assert not node.grep_log("Starting anticompaction")
 
-    @skipIf(CASSANDRA_VERSION_FROM_BUILD == '3.9', "Test doesn't run on 3.9")
-    def nonexistent_table_repair_test(self):
+    @pytest.mark.skipif(CASSANDRA_VERSION_FROM_BUILD == '3.9', reason="Test 
doesn't run on 3.9")
+    def test_nonexistent_table_repair(self):
         """
         * Check that repairing a non-existent table fails
         @jira_ticket CASSANDRA-12279
         """
-        self.ignore_log_patterns = [r'Unknown keyspace/cf pair']
+        self.fixture_dtest_setup.ignore_log_patterns = [r'Unknown keyspace/cf 
pair']
         cluster = self.cluster
-        debug('Starting nodes')
+        logger.debug('Starting nodes')
         cluster.populate(2).start(wait_for_binary_proto=True)
         node1, _ = cluster.nodelist()
-        debug('Creating keyspace and tables')
+        logger.debug('Creating keyspace and tables')
         node1.stress(stress_options=['write', 'n=1', 'no-warmup',
                                      'cl=ONE', '-schema', 
'replication(factor=2)',
                                      '-rate', 'threads=1'])
-        debug('Repairing non-existent table')
+        logger.debug('Repairing non-existent table')
 
         def repair_non_existent_table():
             global nodetool_error
             try:
                 node1.nodetool('repair keyspace1 standard2')
-            except Exception, e:
+            except Exception as e:
                 nodetool_error = e
 
         # Launch in a external thread so it does not hang process
@@ -211,20 +213,20 @@ class TestRepair(BaseRepairTest):
         t.start()
 
         t.join(timeout=60)
-        self.assertFalse(t.isAlive(), 'Repair thread on inexistent table is 
still running')
+        assert not t.is_alive(), 'Repair thread on inexistent table is still 
running'
 
         if self.cluster.version() >= '2.2':
             node1.watch_log_for("Unknown keyspace/cf pair", timeout=60)
         # Repair only finishes with error status after CASSANDRA-12508 on 3.0+
         if self.cluster.version() >= '3.0':
-            self.assertTrue('nodetool_error' in globals() and 
isinstance(nodetool_error, ToolError),
-                            'Repair thread on inexistent table did not throw 
exception')
-            debug(nodetool_error.message)
-            self.assertTrue('Unknown keyspace/cf pair' in 
nodetool_error.message,
-                            'Repair thread on inexistent table did not detect 
inexistent table.')
+            assert 'nodetool_error' in globals() and 
isinstance(nodetool_error, ToolError), \
+                'Repair thread on inexistent table did not throw exception'
+            logger.debug(repr(nodetool_error))
+            assert 'Unknown keyspace/cf pair' in repr(nodetool_error),\
+                'Repair thread on inexistent table did not detect inexistent 
table.'
 
-    @since('2.2.1', '4')
-    def no_anticompaction_after_hostspecific_repair_test(self):
+    @since('2.2.1', max_version='4')
+    def test_no_anticompaction_after_hostspecific_repair(self):
         """
         * Launch a four node, two DC cluster
         * Start a repair on all nodes, by enumerating with -hosts
@@ -234,18 +236,18 @@ class TestRepair(BaseRepairTest):
         @jira_ticket CASSANDRA-10422
         """
         cluster = self.cluster
-        debug("Starting cluster..")
+        logger.debug("Starting cluster..")
         cluster.populate([2, 2]).start(wait_for_binary_proto=True)
         node1_1, node2_1, node1_2, node2_2 = cluster.nodelist()
         node1_1.stress(stress_options=['write', 'n=100K', 'no-warmup', 
'cl=ONE', '-schema', 'replication(factor=4)', '-rate', 'threads=50'])
         node1_1.nodetool("repair -hosts 
127.0.0.1,127.0.0.2,127.0.0.3,127.0.0.4 keyspace1 standard1")
         for node in cluster.nodelist():
-            self.assertTrue(node.grep_log("Not a global repair"))
+            assert node.grep_log("Not a global repair")
         for node in cluster.nodelist():
-            self.assertFalse(node.grep_log("Starting anticompaction"))
+            assert not node.grep_log("Starting anticompaction")
 
-    @since('2.2.4', '4')
-    def no_anticompaction_after_subrange_repair_test(self):
+    @since('2.2.4', max_version='4')
+    def test_no_anticompaction_after_subrange_repair(self):
         """
         * Launch a three node, two DC cluster
         * Start a repair on a token range
@@ -255,15 +257,15 @@ class TestRepair(BaseRepairTest):
         @jira_ticket CASSANDRA-10422
         """
         cluster = self.cluster
-        debug("Starting cluster..")
+        logger.debug("Starting cluster..")
         cluster.populate(3).start(wait_for_binary_proto=True)
         node1, node2, node3 = cluster.nodelist()
         node1.stress(stress_options=['write', 'n=50K', 'no-warmup', 'cl=ONE', 
'-schema', 'replication(factor=3)', '-rate', 'threads=50'])
         node1.nodetool("repair -st 0 -et 1000 keyspace1 standard1")
         for node in cluster.nodelist():
-            self.assertTrue(node.grep_log("Not a global repair"))
+            assert node.grep_log("Not a global repair")
         for node in cluster.nodelist():
-            self.assertFalse(node.grep_log("Starting anticompaction"))
+            assert not node.grep_log("Starting anticompaction")
 
     def _get_repaired_data(self, node, keyspace):
         """
@@ -276,17 +278,17 @@ class TestRepair(BaseRepairTest):
         out = node.run_sstablemetadata(keyspace=keyspace).stdout
 
         def matches(pattern):
-            return filter(None, [pattern.match(l) for l in out.split('\n')])
+            return [_f for _f in [pattern.match(l) for l in 
out.decode("utf-8").split('\n')] if _f]
 
         names = [m.group(1) for m in matches(_sstable_name)]
         repaired_times = [int(m.group(1)) for m in matches(_repaired_at)]
 
-        self.assertTrue(names)
-        self.assertTrue(repaired_times)
+        assert names
+        assert repaired_times
         return [_sstable_data(*a) for a in zip(names, repaired_times)]
 
-    @since('2.2.10', '4')
-    def no_anticompaction_of_already_repaired_test(self):
+    @since('2.2.10', max_version='4')
+    def test_no_anticompaction_of_already_repaired(self):
         """
         * Launch three node cluster and stress with RF2
         * Do incremental repair to have all sstables flagged as repaired
@@ -294,9 +296,8 @@ class TestRepair(BaseRepairTest):
         * Verify that none of the already repaired sstables have been 
anti-compacted again
         @jira_ticket CASSANDRA-13153
         """
-
         cluster = self.cluster
-        debug("Starting cluster..")
+        logger.debug("Starting cluster..")
         # disable JBOD conf since the test expects sstables to be on the same 
disk
         cluster.set_datadir_count(1)
         cluster.populate(3).start(wait_for_binary_proto=True)
@@ -309,7 +310,7 @@ class TestRepair(BaseRepairTest):
         node1.nodetool("repair keyspace1 standard1")
         meta = self._get_repaired_data(node1, 'keyspace1')
         repaired = set([m for m in meta if m.repaired > 0])
-        self.assertEquals(len(repaired), len(meta))
+        assert len(repaired) == len(meta)
 
         # stop node2, stress and start full repair to find out how synced 
ranges affect repairedAt values
         node2.stop(wait_other_notice=True)
@@ -320,10 +321,10 @@ class TestRepair(BaseRepairTest):
         meta = self._get_repaired_data(node1, 'keyspace1')
         repairedAfterFull = set([m for m in meta if m.repaired > 0])
         # already repaired sstables must remain untouched
-        self.assertEquals(repaired.intersection(repairedAfterFull), repaired)
+        assert repaired.intersection(repairedAfterFull) == repaired
 
     @since('2.2.1', '4')
-    def anticompaction_after_normal_repair_test(self):
+    def test_anticompaction_after_normal_repair(self):
         """
         * Launch a four node, two DC cluster
         * Start a normal repair
@@ -331,80 +332,80 @@ class TestRepair(BaseRepairTest):
         @jira_ticket CASSANDRA-10422
         """
         cluster = self.cluster
-        debug("Starting cluster..")
+        logger.debug("Starting cluster..")
         cluster.populate([2, 2]).start(wait_for_binary_proto=True)
         node1_1, node2_1, node1_2, node2_2 = cluster.nodelist()
         node1_1.stress(stress_options=['write', 'n=50K', 'no-warmup', 
'cl=ONE', '-schema', 'replication(factor=4)'])
         node1_1.nodetool("repair keyspace1 standard1")
         for node in cluster.nodelist():
-            self.assertTrue("Starting anticompaction")
+            assert "Starting anticompaction"
 
-    def simple_sequential_repair_test(self):
+    def test_simple_sequential_repair(self):
         """
         Calls simple repair test with a sequential repair
         """
         self._simple_repair(sequential=True)
 
-    def simple_parallel_repair_test(self):
+    def test_simple_parallel_repair(self):
         """
         Calls simple repair test with a parallel repair
         """
         self._simple_repair(sequential=False)
 
-    def empty_vs_gcable_sequential_repair_test(self):
+    def test_empty_vs_gcable_sequential_repair(self):
         """
         Calls empty_vs_gcable repair test with a sequential repair
         """
         self._empty_vs_gcable_no_repair(sequential=True)
 
-    def empty_vs_gcable_parallel_repair_test(self):
+    def test_empty_vs_gcable_parallel_repair(self):
         """
         Calls empty_vs_gcable repair test with a parallel repair
         """
         self._empty_vs_gcable_no_repair(sequential=False)
 
-    def range_tombstone_digest_sequential_repair_test(self):
+    def test_range_tombstone_digest_sequential_repair(self):
         """
         Calls range_tombstone_digest with a sequential repair
         """
         self._range_tombstone_digest(sequential=True)
 
-    def range_tombstone_digest_parallel_repair_test(self):
+    def test_range_tombstone_digest_parallel_repair(self):
         """
         Calls range_tombstone_digest with a parallel repair
         """
         self._range_tombstone_digest(sequential=False)
 
     @since('2.1')
-    def shadowed_cell_digest_sequential_repair_test(self):
+    def test_shadowed_cell_digest_sequential_repair(self):
         """
         Calls _cell_shadowed_by_range_tombstone with sequential repair
         """
         self._cell_shadowed_by_range_tombstone(sequential=True)
 
     @since('2.1')
-    def shadowed_cell_digest_parallel_repair_test(self):
+    def test_shadowed_cell_digest_parallel_repair(self):
         """
         Calls _cell_shadowed_by_range_tombstone with parallel repair
         """
         self._cell_shadowed_by_range_tombstone(sequential=False)
 
     @since('3.0')
-    def shadowed_range_tombstone_digest_sequential_repair_test(self):
+    def test_shadowed_range_tombstone_digest_sequential_repair(self):
         """
         Calls _range_tombstone_shadowed_by_range_tombstone with sequential 
repair
         """
         self._range_tombstone_shadowed_by_range_tombstone(sequential=True)
 
     @since('3.0')
-    def shadowed_range_tombstone_digest_parallel_repair_test(self):
+    def test_shadowed_range_tombstone_digest_parallel_repair(self):
         """
         Calls _range_tombstone_shadowed_by_range_tombstone with parallel repair
         """
         self._range_tombstone_shadowed_by_range_tombstone(sequential=False)
 
-    @no_vnodes()
-    def simple_repair_order_preserving_test(self):
+    @pytest.mark.no_vnodes
+    def test_simple_repair_order_preserving(self):
         """
         Calls simple repair test with OPP and sequential repair
         @jira_ticket CASSANDRA-5220
@@ -484,18 +485,18 @@ class TestRepair(BaseRepairTest):
         node2.stop(wait_other_notice=True)
         for cf in ['cf1', 'cf2']:
             # insert some data
-            for i in xrange(0, 10):
-                for j in xrange(0, 1000):
+            for i in range(0, 10):
+                for j in range(0, 1000):
                     query = SimpleStatement("INSERT INTO {} (key, c1, c2) 
VALUES ('k{}', 'v{}', 'value')".format(cf, i, j), 
consistency_level=ConsistencyLevel.ONE)
                     session.execute(query)
             node1.flush()
             # delete those data, half with row tombstone, and the rest with 
cell range tombstones
-            for i in xrange(0, 5):
+            for i in range(0, 5):
                 query = SimpleStatement("DELETE FROM {} WHERE 
key='k{}'".format(cf, i), consistency_level=ConsistencyLevel.ONE)
                 session.execute(query)
             node1.flush()
-            for i in xrange(5, 10):
-                for j in xrange(0, 1000):
+            for i in range(5, 10):
+                for j in range(0, 1000):
                     query = SimpleStatement("DELETE FROM {} WHERE key='k{}' 
AND c1='v{}'".format(cf, i, j), consistency_level=ConsistencyLevel.ONE)
                     session.execute(query)
             node1.flush()
@@ -509,17 +510,17 @@ class TestRepair(BaseRepairTest):
 
         # check no rows will be returned
         for cf in ['cf1', 'cf2']:
-            for i in xrange(0, 10):
+            for i in range(0, 10):
                 query = SimpleStatement("SELECT c1, c2 FROM {} WHERE 
key='k{}'".format(cf, i), consistency_level=ConsistencyLevel.ALL)
                 res = list(session.execute(query))
-                self.assertEqual(len(filter(lambda x: len(x) != 0, res)), 0, 
res)
+                assert len([x for x in res if len(x) != 0]) == 0, res
 
         # check log for no repair happened for gcable data
         out_of_sync_logs = node2.grep_log("{} and {} have ([0-9]+) range\(s\) 
out of sync for cf1".format(cluster.address_regex(), cluster.address_regex()))
-        self.assertEqual(len(out_of_sync_logs), 0, "GC-able data does not need 
to be repaired with empty data: " + str([elt[0] for elt in out_of_sync_logs]))
+        assert len(out_of_sync_logs) == 0, "GC-able data does not need to be 
repaired with empty data: " + str([elt[0] for elt in out_of_sync_logs])
         # check log for actual repair for non gcable data
         out_of_sync_logs = node2.grep_log("{} and {} have ([0-9]+) range\(s\) 
out of sync for cf2".format(cluster.address_regex(), cluster.address_regex()))
-        self.assertGreater(len(out_of_sync_logs), 0, "Non GC-able data should 
be repaired")
+        assert len(out_of_sync_logs) > 0, "Non GC-able data should be repaired"
 
     def _range_tombstone_digest(self, sequential):
         """
@@ -595,9 +596,9 @@ class TestRepair(BaseRepairTest):
 
         # check log for no repair happened for gcable data
         out_of_sync_logs = node2.grep_log("{} and {} have ([0-9]+) range\(s\) 
out of sync for table1".format(cluster.address_regex(), 
cluster.address_regex()))
-        self.assertEqual(len(out_of_sync_logs), 0, "Digest mismatch for range 
tombstone: {}".format(str([elt[0] for elt in out_of_sync_logs])))
+        assert len(out_of_sync_logs) == 0, "Digest mismatch for range 
tombstone: {}".format(str([elt[0] for elt in out_of_sync_logs]))
 
-    def local_dc_repair_test(self):
+    def test_local_dc_repair(self):
         """
         * Set up a multi DC cluster
         * Perform a -local repair on one DC
@@ -607,25 +608,25 @@ class TestRepair(BaseRepairTest):
         node1 = cluster.nodes["node1"]
         node2 = cluster.nodes["node2"]
 
-        debug("starting repair...")
+        logger.debug("starting repair...")
         opts = ["-local"]
         opts += _repair_options(self.cluster.version(), ks="ks")
         node1.repair(opts)
 
         # Verify that only nodes in dc1 are involved in repair
         out_of_sync_logs = node1.grep_log("{} and {} have ([0-9]+) range\(s\) 
out of sync".format(cluster.address_regex(), cluster.address_regex()))
-        self.assertEqual(len(out_of_sync_logs), 1, "Lines matching: 
{}".format(len(out_of_sync_logs)))
+        assert len(out_of_sync_logs) == 1, "Lines matching: 
{}".format(len(out_of_sync_logs))
 
         line, m = out_of_sync_logs[0]
         num_out_of_sync_ranges, out_of_sync_nodes = m.group(3), {m.group(1), 
m.group(2)}
 
-        self.assertEqual(int(num_out_of_sync_ranges), 1, "Expecting 1 range 
out of sync for {}, but saw {}".format(out_of_sync_nodes, line))
+        assert int(num_out_of_sync_ranges) == 1, "Expecting 1 range out of 
sync for {}, but saw {}".format(out_of_sync_nodes, num_out_of_sync_ranges)
         valid_out_of_sync_pairs = {node1.address(), node2.address()}
-        self.assertEqual(out_of_sync_nodes, valid_out_of_sync_pairs, 
"Unrelated node found in local repair: {}, expected 
{}".format(out_of_sync_nodes, valid_out_of_sync_pairs))
+        assert out_of_sync_nodes == valid_out_of_sync_pairs, "Unrelated node 
found in local repair: {}, expected {}".format(out_of_sync_nodes, 
valid_out_of_sync_pairs)
         # Check node2 now has the key
         self.check_rows_on_node(node2, 2001, found=[1000], restart=False)
 
-    def dc_repair_test(self):
+    def test_dc_repair(self):
         """
         * Set up a multi DC cluster
         * Perform a -dc repair on two dc's
@@ -636,26 +637,26 @@ class TestRepair(BaseRepairTest):
         node2 = cluster.nodes["node2"]
         node3 = cluster.nodes["node3"]
 
-        debug("starting repair...")
+        logger.debug("starting repair...")
         opts = ["-dc", "dc1", "-dc", "dc2"]
         opts += _repair_options(self.cluster.version(), ks="ks")
         node1.repair(opts)
 
         # Verify that only nodes in dc1 and dc2 are involved in repair
         out_of_sync_logs = node1.grep_log("{} and {} have ([0-9]+) range\(s\) 
out of sync".format(cluster.address_regex(), cluster.address_regex()))
-        self.assertEqual(len(out_of_sync_logs), 2, "Lines matching: " + 
str([elt[0] for elt in out_of_sync_logs]))
+        assert len(out_of_sync_logs) == 2, "Lines matching: " + str([elt[0] 
for elt in out_of_sync_logs])
         valid_out_of_sync_pairs = [{node1.address(), node2.address()},
                                    {node2.address(), node3.address()}]
 
         for line, m in out_of_sync_logs:
             num_out_of_sync_ranges, out_of_sync_nodes = m.group(3), 
{m.group(1), m.group(2)}
-            self.assertEqual(int(num_out_of_sync_ranges), 1, "Expecting 1 
range out of sync for {}, but saw {}".format(out_of_sync_nodes, line))
-            self.assertIn(out_of_sync_nodes, valid_out_of_sync_pairs, 
str(out_of_sync_nodes))
+            assert int(num_out_of_sync_ranges) == 1, "Expecting 1 range out of 
sync for {}, but saw {}".format(out_of_sync_nodes , num_out_of_sync_ranges)
+            assert out_of_sync_nodes, valid_out_of_sync_pairs in 
str(out_of_sync_nodes)
 
         # Check node2 now has the key
         self.check_rows_on_node(node2, 2001, found=[1000], restart=False)
 
-    def dc_parallel_repair_test(self):
+    def test_dc_parallel_repair(self):
         """
         * Set up a multi DC cluster
         * Perform a -dc repair on two dc's, with -dcpar
@@ -666,30 +667,30 @@ class TestRepair(BaseRepairTest):
         node2 = cluster.nodes["node2"]
         node3 = cluster.nodes["node3"]
 
-        debug("starting repair...")
+        logger.debug("starting repair...")
         opts = ["-dc", "dc1", "-dc", "dc2", "-dcpar"]
         opts += _repair_options(self.cluster.version(), ks="ks", 
sequential=False)
         node1.repair(opts)
 
         # Verify that only nodes in dc1 and dc2 are involved in repair
         out_of_sync_logs = node1.grep_log("{} and {} have ([0-9]+) range\(s\) 
out of sync".format(cluster.address_regex(), cluster.address_regex()))
-        self.assertEqual(len(out_of_sync_logs), 2, "Lines matching: " + 
str([elt[0] for elt in out_of_sync_logs]))
+        assert len(out_of_sync_logs) == 2, "Lines matching: " + str([elt[0] 
for elt in out_of_sync_logs])
         valid_out_of_sync_pairs = [{node1.address(), node2.address()},
                                    {node2.address(), node3.address()}]
 
         for line, m in out_of_sync_logs:
             num_out_of_sync_ranges, out_of_sync_nodes = m.group(3), 
{m.group(1), m.group(2)}
-            self.assertEqual(int(num_out_of_sync_ranges), 1, "Expecting 1 
range out of sync for {}, but saw {}".format(out_of_sync_nodes, line))
-            self.assertIn(out_of_sync_nodes, valid_out_of_sync_pairs, 
str(out_of_sync_nodes))
+            assert int(num_out_of_sync_ranges) == 1, "Expecting 1 range out of 
sync for {}, but saw {}".format(out_of_sync_nodes, num_out_of_sync_ranges)
+            assert out_of_sync_nodes, valid_out_of_sync_pairs in 
str(out_of_sync_nodes)
 
         # Check node2 now has the key
         self.check_rows_on_node(node2, 2001, found=[1000], restart=False)
 
         # Check the repair was a dc parallel repair
         if self.cluster.version() >= '2.2':
-            self.assertEqual(len(node1.grep_log('parallelism: dc_parallel')), 
1, str(node1.grep_log('parallelism')))
+            assert len(node1.grep_log('parallelism: dc_parallel')) == 1, 
str(node1.grep_log('parallelism'))
         else:
-            self.assertEqual(len(node1.grep_log('parallelism=PARALLEL')), 1, 
str(node1.grep_log('parallelism')))
+            assert len(node1.grep_log('parallelism=PARALLEL')) == 1, 
str(node1.grep_log('parallelism'))
 
     def _setup_multi_dc(self):
         """
@@ -702,7 +703,7 @@ class TestRepair(BaseRepairTest):
         # interfer with the test (this must be after the populate)
         cluster.set_configuration_options(values={'hinted_handoff_enabled': 
False})
         cluster.set_batch_commitlog(enabled=True)
-        debug("Starting cluster..")
+        logger.debug("Starting cluster..")
         # populate 2 nodes in dc1, and one node each in dc2 and dc3
         cluster.populate([2, 1, 1]).start(wait_for_binary_proto=True)
 
@@ -713,19 +714,19 @@ class TestRepair(BaseRepairTest):
         create_cf(session, 'cf', read_repair=0.0, columns={'c1': 'text', 'c2': 
'text'})
 
         # Insert 1000 keys, kill node 2, insert 1 key, restart node 2, insert 
1000 more keys
-        debug("Inserting data...")
+        logger.debug("Inserting data...")
         insert_c1c2(session, n=1000, consistency=ConsistencyLevel.ALL)
         node2.flush()
         node2.stop(wait_other_notice=True)
         insert_c1c2(session, keys=(1000, ), consistency=ConsistencyLevel.THREE)
         node2.start(wait_for_binary_proto=True, wait_other_notice=True)
         node1.watch_log_for_alive(node2)
-        insert_c1c2(session, keys=range(1001, 2001), 
consistency=ConsistencyLevel.ALL)
+        insert_c1c2(session, keys=list(range(1001, 2001)), 
consistency=ConsistencyLevel.ALL)
 
         cluster.flush()
 
         # Verify that only node2 has only 2000 keys and others have 2001 keys
-        debug("Checking data...")
+        logger.debug("Checking data...")
         self.check_rows_on_node(node2, 2000, missings=[1000])
         for node in [node1, node3, node4]:
             self.check_rows_on_node(node, 2001, found=[1000])
@@ -739,7 +740,7 @@ class TestRepair(BaseRepairTest):
         Tests that multiple parallel repairs on the same table isn't
         causing reference leaks.
         """
-        self.ignore_log_patterns = [
+        self.fixture_dtest_setup.ignore_log_patterns = [
             "Cannot start multiple repair sessions over the same sstables",  # 
The message we are expecting
             "Validation failed in",                                          # 
Expecting validation to fail
             "RMI Runtime",                                                   # 
JMX Repair failures
@@ -749,7 +750,7 @@ class TestRepair(BaseRepairTest):
         ]
 
         cluster = self.cluster
-        debug("Starting cluster..")
+        logger.debug("Starting cluster..")
         cluster.populate([3]).start(wait_for_binary_proto=True)
         node1, node2, node3 = cluster.nodelist()
         node1.stress(stress_options=['write', 'n=10k', 'no-warmup', 'cl=ONE', 
'-schema', 'replication(factor=3)', '-rate', 'threads=50'])
@@ -771,10 +772,10 @@ class TestRepair(BaseRepairTest):
             if len(node.grep_log("Cannot start multiple repair sessions over 
the same sstables")) > 0:
                 found_message = True
                 break
-        self.assertTrue(found_message)
+        assert found_message
 
-    @no_vnodes()
-    def token_range_repair_test(self):
+    @pytest.mark.no_vnodes
+    def test_token_range_repair(self):
         """
         Test repair using the -st and -et options
         * Launch a three node cluster
@@ -786,15 +787,15 @@ class TestRepair(BaseRepairTest):
         cluster = self.cluster
         cluster.set_configuration_options(values={'hinted_handoff_enabled': 
False})
         cluster.set_batch_commitlog(enabled=True)
-        debug("Starting cluster..")
+        logger.debug("Starting cluster..")
         cluster.populate(3).start(wait_for_binary_proto=True)
 
         node1, node2, node3 = cluster.nodelist()
 
         self._parameterized_range_repair(repair_opts=['-st', 
str(node3.initial_token), '-et', str(node1.initial_token)])
 
-    @no_vnodes()
-    def token_range_repair_test_with_cf(self):
+    @pytest.mark.no_vnodes
+    def test_token_range_repair_with_cf(self):
         """
         @jira_ticket CASSANDRA-11866
 
@@ -810,13 +811,13 @@ class TestRepair(BaseRepairTest):
         cluster = self.cluster
         cluster.set_configuration_options(values={'hinted_handoff_enabled': 
False})
         cluster.set_batch_commitlog(enabled=True)
-        debug("Starting cluster..")
+        logger.debug("Starting cluster..")
         cluster.populate(3).start(wait_for_binary_proto=True)
 
         node1, node2, node3 = cluster.nodelist()
 
         # Insert data, kill node 2, insert more data, restart node 2, insert 
another set of data
-        debug("Inserting data...")
+        logger.debug("Inserting data...")
         node1.stress(['write', 'n=1k', 'no-warmup', 'cl=ALL', '-schema', 
'replication(factor=2)', '-rate', 'threads=30'])
         node2.flush()
         node2.stop(wait_other_notice=True)
@@ -829,25 +830,25 @@ class TestRepair(BaseRepairTest):
         opts = ['-st', str(node3.initial_token), '-et', 
str(node1.initial_token), ]
         opts += _repair_options(self.cluster.version(), ks='keyspace1', 
cf='counter1', sequential=False)
         node1.repair(opts)
-        self.assertEqual(len(node1.grep_log('are consistent for standard1')), 
0, "Nodes 1 and 2 should not be consistent.")
-        self.assertEqual(len(node3.grep_log('Repair command')), 0, "Node 3 
should not have been involved in the repair.")
+        assert len(node1.grep_log('are consistent for standard1')) == 0, 
"Nodes 1 and 2 should not be consistent."
+        assert len(node3.grep_log('Repair command')) == 0, "Node 3 should not 
have been involved in the repair."
         out_of_sync_logs = node1.grep_log("{} and {} have ([0-9]+) range\(s\) 
out of sync".format(cluster.address_regex(), cluster.address_regex()))
-        self.assertEqual(len(out_of_sync_logs), 0, "We repaired the wrong CF, 
so things should still be broke")
+        assert len(out_of_sync_logs) == 0, "We repaired the wrong CF == so 
things should still be broke"
 
         # Repair only the range node 1 owns on the right  CF, assert 
everything is fixed
         opts = ['-st', str(node3.initial_token), '-et', 
str(node1.initial_token), ]
         opts += _repair_options(self.cluster.version(), ks='keyspace1', 
cf='standard1', sequential=False)
         node1.repair(opts)
-        self.assertEqual(len(node1.grep_log('are consistent for standard1')), 
0, "Nodes 1 and 2 should not be consistent.")
-        self.assertEqual(len(node3.grep_log('Repair command')), 0, "Node 3 
should not have been involved in the repair.")
+        assert len(node1.grep_log('are consistent for standard1')) == 0, 
"Nodes 1 and 2 should not be consistent."
+        assert len(node3.grep_log('Repair command')) == 0, "Node 3 should not 
have been involved in the repair."
         out_of_sync_logs = node1.grep_log("{} and {} have ([0-9]+) range\(s\) 
out of sync".format(cluster.address_regex(), cluster.address_regex()))
         _, matches = out_of_sync_logs[0]
         out_of_sync_nodes = {matches.group(1), matches.group(2)}
         valid_out_of_sync_pairs = [{node1.address(), node2.address()}]
-        self.assertIn(out_of_sync_nodes, valid_out_of_sync_pairs, 
str(out_of_sync_nodes))
+        assert out_of_sync_nodes, valid_out_of_sync_pairs in 
str(out_of_sync_nodes)
 
-    @no_vnodes()
-    def partitioner_range_repair_test(self):
+    @pytest.mark.no_vnodes
+    def test_partitioner_range_repair(self):
         """
         Test repair using the -pr option
         * Launch a three node cluster
@@ -859,7 +860,7 @@ class TestRepair(BaseRepairTest):
         cluster = self.cluster
         cluster.set_configuration_options(values={'hinted_handoff_enabled': 
False})
         cluster.set_batch_commitlog(enabled=True)
-        debug("Starting cluster..")
+        logger.debug("Starting cluster..")
         cluster.populate(3).start(wait_for_binary_proto=True)
 
         node1, node2, node3 = cluster.nodelist()
@@ -867,8 +868,8 @@ class TestRepair(BaseRepairTest):
         self._parameterized_range_repair(repair_opts=['-pr'])
 
     @since('3.10')
-    @no_vnodes()
-    def pull_repair_test(self):
+    @pytest.mark.no_vnodes
+    def test_pull_repair(self):
         """
         Test repair using the --pull option
         @jira_ticket CASSANDRA-9876
@@ -883,7 +884,7 @@ class TestRepair(BaseRepairTest):
         cluster = self.cluster
         cluster.set_configuration_options(values={'hinted_handoff_enabled': 
False})
         cluster.set_batch_commitlog(enabled=True)
-        debug("Starting cluster..")
+        logger.debug("Starting cluster..")
         cluster.populate(3).start(wait_for_binary_proto=True)
 
         node1, node2, node3 = cluster.nodelist()
@@ -894,14 +895,14 @@ class TestRepair(BaseRepairTest):
         self._parameterized_range_repair(repair_opts=['--pull', '--in-hosts', 
node1_address + ',' + node2_address, '-st', str(node3.initial_token), '-et', 
str(node1.initial_token)])
 
         # Node 1 should only receive files (as we ran a pull repair on node1)
-        self.assertTrue(len(node1.grep_log("Receiving [1-9][0-9]* files")) > 0)
-        self.assertEqual(len(node1.grep_log("sending [1-9][0-9]* files")), 0)
-        self.assertTrue(len(node1.grep_log("sending 0 files")) > 0)
+        assert len(node1.grep_log("Receiving [1-9][0-9]* files")) > 0
+        assert len(node1.grep_log("sending [1-9][0-9]* files")) == 0
+        assert len(node1.grep_log("sending 0 files")) > 0
 
         # Node 2 should only send files (as we ran a pull repair on node1)
-        self.assertEqual(len(node2.grep_log("Receiving [1-9][0-9]* files")), 0)
-        self.assertTrue(len(node2.grep_log("Receiving 0 files")) > 0)
-        self.assertTrue(len(node2.grep_log("sending [1-9][0-9]* files")) > 0)
+        assert len(node2.grep_log("Receiving [1-9][0-9]* files")) == 0
+        assert len(node2.grep_log("Receiving 0 files")) > 0
+        assert len(node2.grep_log("sending [1-9][0-9]* files")) > 0
 
     def _parameterized_range_repair(self, repair_opts):
         """
@@ -916,7 +917,7 @@ class TestRepair(BaseRepairTest):
         node1, node2, node3 = cluster.nodelist()
 
         # Insert data, kill node 2, insert more data, restart node 2, insert 
another set of data
-        debug("Inserting data...")
+        logger.debug("Inserting data...")
         node1.stress(['write', 'n=20K', 'no-warmup', 'cl=ALL', '-schema', 
'replication(factor=2)', '-rate', 'threads=30'])
 
         node2.flush()
@@ -934,8 +935,8 @@ class TestRepair(BaseRepairTest):
         opts += _repair_options(self.cluster.version(), ks='keyspace1', 
cf='standard1', sequential=False)
         node1.repair(opts)
 
-        self.assertEqual(len(node1.grep_log('are consistent for standard1')), 
0, "Nodes 1 and 2 should not be consistent.")
-        self.assertEqual(len(node3.grep_log('Repair command')), 0, "Node 3 
should not have been involved in the repair.")
+        assert len(node1.grep_log('are consistent for standard1')) == 0, 
"Nodes 1 and 2 should not be consistent."
+        assert len(node3.grep_log('Repair command')) == 0, "Node 3 should not 
have been involved in the repair."
 
         out_of_sync_logs = node1.grep_log("{} and {} have ([0-9]+) range\(s\) 
out of sync".format(cluster.address_regex(), cluster.address_regex()))
         _, matches = out_of_sync_logs[0]
@@ -943,10 +944,10 @@ class TestRepair(BaseRepairTest):
 
         valid_out_of_sync_pairs = [{node1.address(), node2.address()}]
 
-        self.assertIn(out_of_sync_nodes, valid_out_of_sync_pairs, 
str(out_of_sync_nodes))
+        assert out_of_sync_nodes, valid_out_of_sync_pairs in 
str(out_of_sync_nodes)
 
     @since('2.2')
-    def trace_repair_test(self):
+    def test_trace_repair(self):
         """
         * Launch a three node cluster
         * Insert some data at RF 2
@@ -957,12 +958,12 @@ class TestRepair(BaseRepairTest):
         cluster = self.cluster
         cluster.set_configuration_options(values={'hinted_handoff_enabled': 
False})
         cluster.set_batch_commitlog(enabled=True)
-        debug("Starting cluster..")
+        logger.debug("Starting cluster..")
         cluster.populate(3).start(wait_for_binary_proto=True)
 
         node1, node2, node3 = cluster.nodelist()
 
-        debug("Inserting data...")
+        logger.debug("Inserting data...")
         node1.stress(['write', 'n=20K', 'no-warmup', 'cl=ALL', '-schema', 
'replication(factor=2)', '-rate', 'threads=30'])
 
         node2.flush()
@@ -984,12 +985,11 @@ class TestRepair(BaseRepairTest):
         rows = list(session.execute("SELECT activity FROM 
system_traces.events"))
         # This check assumes that the only (or at least first) thing to write 
to `system_traces.events.activity` is
         # the repair task triggered in the test.
-        self.assertIn('job threads: {}'.format(job_thread_count),
-                      rows[0][0],
-                      'Expected {} job threads in repair options. Instead we 
saw {}'.format(job_thread_count, rows[0][0]))
+        assert 'job threads: {}'.format(job_thread_count) in rows[0][0], \
+            'Expected {} job threads in repair options. Instead we saw 
{}'.format(job_thread_count, rows[0][0])
 
     @since('2.2')
-    def thread_count_repair_test(self):
+    def test_thread_count_repair(self):
         """
         * Launch a three node cluster
         * Insert some data at RF 2
@@ -1001,14 +1001,14 @@ class TestRepair(BaseRepairTest):
         cluster = self.cluster
         cluster.set_configuration_options(values={'hinted_handoff_enabled': 
False})
         cluster.set_batch_commitlog(enabled=True)
-        debug("Starting cluster..")
+        logger.debug("Starting cluster..")
         cluster.populate(3).start(wait_for_binary_proto=True)
 
         node1, node2, node3 = cluster.nodelist()
 
         # Valid job thread counts: 1, 2, 3, and 4
         for job_thread_count in range(1, 5):
-            debug("Inserting data...")
+            logger.debug("Inserting data...")
             node1.stress(['write', 'n=2K', 'no-warmup', 'cl=ALL', '-schema', 
'replication(factor=2)', '-rate',
                           'threads=30', '-pop', 'seq={}..{}K'.format(2 * 
(job_thread_count - 1), 2 * job_thread_count)])
 
@@ -1032,11 +1032,10 @@ class TestRepair(BaseRepairTest):
             rows = list(session.execute("SELECT activity FROM 
system_traces.events"))
             # This check assumes that the only (or at least first) thing to 
write to `system_traces.events.activity` is
             # the repair task triggered in the test.
-            self.assertIn('job threads: {}'.format(job_thread_count),
-                          rows[0][0],
-                          'Expected {} job threads in repair options. Instead 
we saw {}'.format(job_thread_count, rows[0][0]))
+            assert 'job threads: {}'.format(job_thread_count) in rows[0][0], \
+                'Expected {} job threads in repair options. Instead we saw 
{}'.format(job_thread_count, rows[0][0])
 
-    @no_vnodes()
+    @pytest.mark.no_vnodes
     def test_multiple_concurrent_repairs(self):
         """
         @jira_ticket CASSANDRA-11451
@@ -1061,7 +1060,7 @@ class TestRepair(BaseRepairTest):
         node1.stop(wait_other_notice=True)
         node3.stop(wait_other_notice=True)
         _, _, rc = node2.stress(['read', 'n=1M', 'no-warmup', '-rate', 
'threads=30'], whitelist=True)
-        self.assertEqual(rc, 0)
+        assert rc == 0
 
     @since('4.0')
     def test_wide_row_repair(self):
@@ -1075,7 +1074,7 @@ class TestRepair(BaseRepairTest):
         node1, node2 = cluster.nodelist()
         node2.stop(wait_other_notice=True)
         profile_path = os.path.join(os.getcwd(), 
'stress_profiles/repair_wide_rows.yaml')
-        print("yaml = " + profile_path)
+        logger.info(("yaml = " + profile_path))
         node1.stress(['user', 'profile=' + profile_path, 'n=50', 
'ops(insert=1)', 'no-warmup', '-rate', 'threads=8',
                       '-insert', 'visits=FIXED(100K)', 'revisit=FIXED(100K)'])
         node2.start(wait_for_binary_proto=True)
@@ -1101,12 +1100,12 @@ class TestRepair(BaseRepairTest):
             node1.watch_log_for('requesting merkle trees', 
filename='system.log')
             time.sleep(2)
 
-        debug("stopping node1")
+        logger.debug("stopping node1")
         node1.stop(gently=False, wait_other_notice=True)
         t1.join()
-        debug("starting node1 - first repair should have failed")
+        logger.debug("starting node1 - first repair should have failed")
         node1.start(wait_for_binary_proto=True, wait_other_notice=True)
-        debug("running second repair")
+        logger.debug("running second repair")
         if cluster.version() >= "2.2":
             node1.repair()
         else:
@@ -1126,7 +1125,7 @@ class TestRepair(BaseRepairTest):
         """
         self._test_failure_during_repair(phase='sync', initiator=False,)
 
-    @since('2.2', '4')
+    @since('2.2', max_version='4')
     def test_failure_during_anticompaction(self):
         """
         @jira_ticket CASSANDRA-12901
@@ -1144,48 +1143,49 @@ class TestRepair(BaseRepairTest):
         cluster = self.cluster
         # We are not interested in specific errors, but
         # that the repair session finishes on node failure without hanging
-        self.ignore_log_patterns = [
+        self.fixture_dtest_setup.ignore_log_patterns = [
             "Endpoint .* died",
             "Streaming error occurred",
             "StreamReceiveTask",
             "Stream failed",
             "Session completed with the following error",
             "Repair session .* for range .* failed with error",
-            "Sync failed between .* and .*"
+            "Sync failed between .* and .*",
+            "failed to send a stream message/file to peer"
         ]
 
         # Disable hinted handoff and set batch commit log so this doesn't
         # interfere with the test (this must be after the populate)
         cluster.set_configuration_options(values={'hinted_handoff_enabled': 
False})
         cluster.set_batch_commitlog(enabled=True)
-        debug("Setting up cluster..")
+        logger.debug("Setting up cluster..")
         cluster.populate(3)
         node1, node2, node3 = cluster.nodelist()
 
         node_to_kill = node2 if (phase == 'sync' and initiator) else node3
-        debug("Setting up byteman on {}".format(node_to_kill.name))
+        logger.debug("Setting up byteman on {}".format(node_to_kill.name))
         # set up byteman
         node_to_kill.byteman_port = '8100'
         node_to_kill.import_config_files()
 
-        debug("Starting cluster..")
+        logger.debug("Starting cluster..")
         cluster.start(wait_other_notice=True)
 
-        debug("stopping node3")
+        logger.debug("stopping node3")
         node3.stop(gently=False, wait_other_notice=True)
 
         self.patient_exclusive_cql_connection(node1)
-        debug("inserting data while node3 is down")
+        logger.debug("inserting data while node3 is down")
         node1.stress(stress_options=['write', 'n=1k',
                                      'no-warmup', 'cl=ONE',
                                      '-schema', 'replication(factor=3)',
                                      '-rate', 'threads=10'])
 
-        debug("bring back node3")
+        logger.debug("bring back node3")
         node3.start(wait_other_notice=True, wait_for_binary_proto=True)
 
         script = 'stream_sleep.btm' if phase == 'sync' else 
'repair_{}_sleep.btm'.format(phase)
-        debug("Submitting byteman script to {}".format(node_to_kill.name))
+        logger.debug("Submitting byteman script to 
{}".format(node_to_kill.name))
         # Sleep on anticompaction/stream so there will be time for node to be 
killed
         node_to_kill.byteman_submit(['./byteman/{}'.format(script)])
 
@@ -1193,15 +1193,15 @@ class TestRepair(BaseRepairTest):
             global nodetool_error
             try:
                 node1.nodetool('repair keyspace1 standard1')
-            except Exception, e:
+            except Exception as e:
                 nodetool_error = e
 
-        debug("repair node1")
+        logger.debug("repair node1")
         # Launch in a external thread so it does not hang process
         t = Thread(target=node1_repair)
         t.start()
 
-        debug("Will kill {} in middle of {}".format(node_to_kill.name, phase))
+        logger.debug("Will kill {} in middle of {}".format(node_to_kill.name, 
phase))
         msg_to_wait = 'streaming plan for Repair'
         if phase == 'anticompaction':
             msg_to_wait = 'Got anticompaction request'
@@ -1210,10 +1210,10 @@ class TestRepair(BaseRepairTest):
         node_to_kill.watch_log_for(msg_to_wait, filename='debug.log')
         node_to_kill.stop(gently=False, wait_other_notice=True)
 
-        debug("Killed {}, now waiting repair to 
finish".format(node_to_kill.name))
+        logger.debug("Killed {}, now waiting repair to 
finish".format(node_to_kill.name))
         t.join(timeout=60)
-        self.assertFalse(t.isAlive(), 'Repair still running after sync {} was 
killed'
-                                      .format("initiator" if initiator else 
"participant"))
+        assert not t.is_alive(), 'Repair still running after sync {} was 
killed'\
+            .format("initiator" if initiator else "participant")
 
         if cluster.version() < '4.0' or phase != 'sync':
             # the log entry we're watching for in the sync task came from the
@@ -1227,7 +1227,7 @@ RepairTableContents = namedtuple('RepairTableContents',
 
 
 @since('2.2')
-@attr("resource-intensive")
+@pytest.mark.resource_intensive
 class TestRepairDataSystemTable(Tester):
     """
     @jira_ticket CASSANDRA-5839
@@ -1237,21 +1237,19 @@ class TestRepairDataSystemTable(Tester):
     to a cluster, then ensuring these tables are in valid states before and
     after running repair.
     """
-
-    def setUp(self):
+    @pytest.fixture(scope='function', autouse=True)
+    def fixture_set_cluster_settings(self, fixture_dtest_setup):
         """
         Prepares a cluster for tests of the repair history tables by starting
         a 5-node cluster, then inserting 5000 values with RF=3.
         """
-
-        Tester.setUp(self)
-        self.cluster.populate(5).start(wait_for_binary_proto=True)
+        
fixture_dtest_setup.cluster.populate(5).start(wait_for_binary_proto=True)
         self.node1 = self.cluster.nodelist()[0]
-        self.session = self.patient_cql_connection(self.node1)
+        self.session = fixture_dtest_setup.patient_cql_connection(self.node1)
 
         self.node1.stress(stress_options=['write', 'n=5K', 'no-warmup', 
'cl=ONE', '-schema', 'replication(factor=3)'])
 
-        self.cluster.flush()
+        fixture_dtest_setup.cluster.flush()
 
     def repair_table_contents(self, node, include_system_keyspaces=True):
         """
@@ -1281,15 +1279,15 @@ class TestRepairDataSystemTable(Tester):
         return RepairTableContents(parent_repair_history=parent_repair_history,
                                    repair_history=repair_history)
 
-    @skip('hangs CI')
-    def initial_empty_repair_tables_test(self):
-        debug('repair tables:')
-        debug(self.repair_table_contents(node=self.node1, 
include_system_keyspaces=False))
+    @pytest.mark.skip(reason='hangs CI')
+    def test_initial_empty_repair_tables(self):
+        logger.debug('repair tables:')
+        logger.debug(self.repair_table_contents(node=self.node1, 
include_system_keyspaces=False))
         repair_tables_dict = self.repair_table_contents(node=self.node1, 
include_system_keyspaces=False)._asdict()
-        for table_name, table_contents in repair_tables_dict.items():
-            self.assertFalse(table_contents, '{} is 
non-empty'.format(table_name))
+        for table_name, table_contents in list(repair_tables_dict.items()):
+            assert not table_contents, '{} is non-empty'.format(table_name)
 
-    def repair_parent_table_test(self):
+    def test_repair_parent_table(self):
         """
         Test that `system_distributed.parent_repair_history` is properly 
populated
         after repair by:
@@ -1299,9 +1297,9 @@ class TestRepairDataSystemTable(Tester):
         """
         self.node1.repair()
         parent_repair_history, _ = self.repair_table_contents(node=self.node1, 
include_system_keyspaces=False)
-        self.assertTrue(len(parent_repair_history))
+        assert len(parent_repair_history)
 
-    def repair_table_test(self):
+    def test_repair_table(self):
         """
         Test that `system_distributed.repair_history` is properly populated
         after repair by:
@@ -1311,4 +1309,4 @@ class TestRepairDataSystemTable(Tester):
         """
         self.node1.repair()
         _, repair_history = self.repair_table_contents(node=self.node1, 
include_system_keyspaces=False)
-        self.assertTrue(len(repair_history))
+        assert len(repair_history)

http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/replace_address_test.py
----------------------------------------------------------------------
diff --git a/replace_address_test.py b/replace_address_test.py
index 2b60077..31c1394 100644
--- a/replace_address_test.py
+++ b/replace_address_test.py
@@ -1,18 +1,24 @@
 import os
 import tempfile
+import pytest
+import logging
+import time
+
+from flaky import flaky
+
 from itertools import chain
 from shutil import rmtree
-from unittest import skipIf
 
 from cassandra import ConsistencyLevel, ReadTimeout, Unavailable
 from cassandra.query import SimpleStatement
 from ccmlib.node import Node
-from nose.plugins.attrib import attr
 
-from dtest import CASSANDRA_VERSION_FROM_BUILD, DISABLE_VNODES, Tester, debug
+from dtest import CASSANDRA_VERSION_FROM_BUILD, Tester
 from tools.assertions import assert_bootstrap_state, assert_all, 
assert_not_running
 from tools.data import rows_to_list
-from tools.decorators import since
+
+since = pytest.mark.since
+logger = logging.getLogger(__name__)
 
 
 class NodeUnavailable(Exception):
@@ -20,24 +26,26 @@ class NodeUnavailable(Exception):
 
 
 class BaseReplaceAddressTest(Tester):
-    __test__ = False
-    replacement_node = None
-    ignore_log_patterns = (
-        # This one occurs when trying to send the migration to a
-        # node that hasn't started yet, and when it does, it gets
-        # replayed and everything is fine.
-        r'Can\'t send migration request: node.*is down',
-        r'Migration task failed to complete',  # 10978
-        # ignore streaming error during bootstrap
-        r'Streaming error occurred',
-        r'failed stream session'
-    )
+
+    @pytest.fixture(autouse=True)
+    def fixture_add_additional_log_patterns(self, fixture_dtest_setup):
+        fixture_dtest_setup.ignore_log_patterns = (
+            # This one occurs when trying to send the migration to a
+            # node that hasn't started yet, and when it does, it gets
+            # replayed and everything is fine.
+            r'Can\'t send migration request: node.*is down',
+            r'Migration task failed to complete',  # 10978
+            # ignore streaming error during bootstrap
+            r'Streaming error occurred',
+            r'failed stream session',
+            r'Failed to properly handshake with peer'
+        )
 
     def _setup(self, n=3, opts=None, enable_byteman=False, 
mixed_versions=False):
-        debug("Starting cluster with {} nodes.".format(n))
-        self.cluster.populate(n, use_vnodes=not DISABLE_VNODES)
+        logger.debug("Starting cluster with {} nodes.".format(n))
+        self.cluster.populate(n)
         if opts is not None:
-            debug("Setting cluster options: {}".format(opts))
+            logger.debug("Setting cluster options: {}".format(opts))
             self.cluster.set_configuration_options(opts)
 
         self.cluster.set_batch_commitlog(enabled=True)
@@ -46,7 +54,7 @@ class BaseReplaceAddressTest(Tester):
 
         self.cluster.seeds.remove(self.replaced_node)
         NUM_TOKENS = os.environ.get('NUM_TOKENS', '256')
-        if DISABLE_VNODES:
+        if not self.dtest_config.use_vnodes:
             self.cluster.set_configuration_options(values={'initial_token': 
None, 'num_tokens': 1})
         else:
             self.cluster.set_configuration_options(values={'initial_token': 
None, 'num_tokens': NUM_TOKENS})
@@ -57,7 +65,7 @@ class BaseReplaceAddressTest(Tester):
             self.query_node.import_config_files()
 
         if mixed_versions:
-            debug("Starting nodes on version 2.2.4")
+            logger.debug("Starting nodes on version 2.2.4")
             self.cluster.set_install_dir(version="2.2.4")
 
         self.cluster.start()
@@ -83,12 +91,12 @@ class BaseReplaceAddressTest(Tester):
                 replacement_address = self.replaced_node.address()
                 self.cluster.remove(self.replaced_node)
 
-            debug("Starting replacement node {} with jvm_option 
'{}={}'".format(replacement_address, jvm_option, replace_address))
+            logger.debug("Starting replacement node {} with jvm_option 
'{}={}'".format(replacement_address, jvm_option, replace_address))
             self.replacement_node = Node('replacement', cluster=self.cluster, 
auto_bootstrap=True,
                                          thrift_interface=None, 
storage_interface=(replacement_address, 7000),
                                          jmx_port='7400', 
remote_debug_port='0', initial_token=None, 
binary_interface=(replacement_address, 9042))
             if opts is not None:
-                debug("Setting options on replacement node: {}".format(opts))
+                logger.debug("Setting options on replacement node: 
{}".format(opts))
                 self.replacement_node.set_configuration_options(opts)
             self.cluster.add(self.replacement_node, False, 
data_center=data_center)
 
@@ -107,39 +115,41 @@ class BaseReplaceAddressTest(Tester):
 
     def _stop_node_to_replace(self, gently=False, table='keyspace1.standard1', 
cl=ConsistencyLevel.THREE):
         if self.replaced_node.is_running():
-            debug("Stopping {}".format(self.replaced_node.name))
+            logger.debug("Stopping {}".format(self.replaced_node.name))
             self.replaced_node.stop(gently=gently, wait_other_notice=True)
 
-        debug("Testing node stoppage (query should fail).")
-        with self.assertRaises((Unavailable, ReadTimeout)):
+        logger.debug("Testing node stoppage (query should fail).")
+        with pytest.raises((Unavailable, ReadTimeout)):
             session = self.patient_cql_connection(self.query_node)
             query = SimpleStatement('select * from {}'.format(table), 
consistency_level=cl)
             session.execute(query)
 
     def _insert_data(self, n='1k', rf=3, whitelist=False):
-        debug("Inserting {} entries with rf={} with stress...".format(n, rf))
-        self.query_node.stress(['write', 'n={}'.format(n), 'no-warmup', 
'-schema', 'replication(factor={})'.format(rf)],
+        logger.debug("Inserting {} entries with rf={} with 
stress...".format(n, rf))
+        self.query_node.stress(['write', 'n={}'.format(n), 'no-warmup', 
'-schema', 'replication(factor={})'.format(rf),
+                                '-rate', 'threads=10'],
                                whitelist=whitelist)
         self.cluster.flush()
+        time.sleep(20)
 
     def _fetch_initial_data(self, table='keyspace1.standard1', 
cl=ConsistencyLevel.THREE, limit=10000):
-        debug("Fetching initial data from {} on {} with CL={} and 
LIMIT={}".format(table, self.query_node.name, cl, limit))
+        logger.debug("Fetching initial data from {} on {} with CL={} and 
LIMIT={}".format(table, self.query_node.name, cl, limit))
         session = self.patient_cql_connection(self.query_node)
         query = SimpleStatement('select * from {} LIMIT {}'.format(table, 
limit), consistency_level=cl)
-        return rows_to_list(session.execute(query))
+        return rows_to_list(session.execute(query, timeout=20))
 
     def _verify_data(self, initial_data, table='keyspace1.standard1', 
cl=ConsistencyLevel.ONE, limit=10000,
                      restart_nodes=False):
-        self.assertGreater(len(initial_data), 0, "Initial data must be greater 
than 0")
+        assert len(initial_data) > 0, "Initial data must be greater than 0"
 
         # query should work again
-        debug("Stopping old nodes")
+        logger.debug("Stopping old nodes")
         for node in self.cluster.nodelist():
             if node.is_running() and node != self.replacement_node:
-                debug("Stopping {}".format(node.name))
+                logger.debug("Stopping {}".format(node.name))
                 node.stop(gently=False, wait_other_notice=True)
 
-        debug("Verifying {} on {} with CL={} and LIMIT={}".format(table, 
self.replacement_node.address(), cl, limit))
+        logger.debug("Verifying {} on {} with CL={} and 
LIMIT={}".format(table, self.replacement_node.address(), cl, limit))
         session = self.patient_exclusive_cql_connection(self.replacement_node)
         assert_all(session, 'select * from {} LIMIT {}'.format(table, limit),
                    expected=initial_data,
@@ -166,22 +176,22 @@ class BaseReplaceAddressTest(Tester):
                                    timeout=60)
 
     def _verify_tokens_migrated_successfully(self, previous_log_size=None):
-        if DISABLE_VNODES:
+        if not self.dtest_config.use_vnodes:
             num_tokens = 1
         else:
             # a little hacky but grep_log returns the whole line...
             num_tokens = 
int(self.replacement_node.get_conf_option('num_tokens'))
 
-        debug("Verifying {} tokens migrated sucessfully".format(num_tokens))
+        logger.debug("Verifying {} tokens migrated 
sucessfully".format(num_tokens))
         logs = self.replacement_node.grep_log(r"Token (.*?) changing ownership 
from /{} to /{}"
                                               
.format(self.replaced_node.address(),
                                                       
self.replacement_node.address()))
         if (previous_log_size is not None):
-            self.assertEquals(len(logs), previous_log_size)
+            assert len(logs) == previous_log_size
 
         moved_tokens = set([l[1].group(1) for l in logs])
-        debug("number of moved tokens: {}".format(len(moved_tokens)))
-        self.assertEquals(len(moved_tokens), num_tokens)
+        logger.debug("number of moved tokens: {}".format(len(moved_tokens)))
+        assert len(moved_tokens) == num_tokens
 
         return len(logs)
 
@@ -197,11 +207,11 @@ class BaseReplaceAddressTest(Tester):
         self._stop_node_to_replace()
 
         if mixed_versions:
-            debug("Upgrading all except {} to current 
version".format(self.query_node.address()))
+            logger.debug("Upgrading all except {} to current 
version".format(self.query_node.address()))
             self.cluster.set_install_dir(install_dir=default_install_dir)
             for node in self.cluster.nodelist():
                 if node.is_running() and node != self.query_node:
-                    debug("Upgrading {} to current 
version".format(node.address()))
+                    logger.debug("Upgrading {} to current 
version".format(node.address()))
                     node.stop(gently=True, wait_other_notice=True)
                     node.start(wait_other_notice=True, 
wait_for_binary_proto=True)
 
@@ -216,7 +226,7 @@ class BaseReplaceAddressTest(Tester):
         if not same_address and not mixed_versions:
             initial_data = self._fetch_initial_data(cl=ConsistencyLevel.TWO)
 
-        debug("Joining replaced node")
+        logger.debug("Joining replaced node")
         self.replacement_node.nodetool("join")
 
         if not same_address:
@@ -229,33 +239,32 @@ class BaseReplaceAddressTest(Tester):
 
 
 class TestReplaceAddress(BaseReplaceAddressTest):
-    __test__ = True
 
-    @attr('resource-intensive')
-    def replace_stopped_node_test(self):
+    @pytest.mark.resource_intensive
+    def test_replace_stopped_node(self):
         """
         Test that we can replace a node that is not shutdown gracefully.
         """
         self._test_replace_node(gently=False)
 
-    @attr('resource-intensive')
-    def replace_shutdown_node_test(self):
+    @pytest.mark.resource_intensive
+    def test_replace_shutdown_node(self):
         """
         @jira_ticket CASSANDRA-9871
         Test that we can replace a node that is shutdown gracefully.
         """
         self._test_replace_node(gently=True)
 
-    @attr('resource-intensive')
-    def replace_stopped_node_same_address_test(self):
+    @pytest.mark.resource_intensive
+    def test_replace_stopped_node_same_address(self):
         """
         @jira_ticket CASSANDRA-8523
         Test that we can replace a node with the same address correctly
         """
         self._test_replace_node(gently=False, same_address=True)
 
-    @attr('resource-intensive')
-    def replace_first_boot_test(self):
+    @pytest.mark.resource_intensive
+    def test_replace_first_boot(self):
         self._test_replace_node(jvm_option='replace_address_first_boot')
 
     def _test_replace_node(self, gently=False, jvm_option='replace_address', 
same_address=False):
@@ -288,32 +297,35 @@ class TestReplaceAddress(BaseReplaceAddressTest):
 
         self._verify_data(initial_data)
 
-    @attr('resource-intensive')
-    def replace_active_node_test(self):
-        self.ignore_log_patterns = list(self.ignore_log_patterns) + 
[r'Exception encountered during startup']
+    @pytest.mark.resource_intensive
+    def test_replace_active_node(self):
+        self.fixture_dtest_setup.ignore_log_patterns = 
list(self.fixture_dtest_setup.ignore_log_patterns) + [
+            r'Exception encountered during startup']
+
         self._setup(n=3)
         self._do_replace(wait_for_binary_proto=False)
 
-        debug("Waiting for replace to fail")
+        logger.debug("Waiting for replace to fail")
         
self.replacement_node.watch_log_for("java.lang.UnsupportedOperationException: 
Cannot replace a live node...")
         assert_not_running(self.replacement_node)
 
-    @attr('resource-intensive')
-    def replace_nonexistent_node_test(self):
-        self.ignore_log_patterns = list(self.ignore_log_patterns) + [
+    @pytest.mark.resource_intensive
+    def test_replace_nonexistent_node(self):
+        self.fixture_dtest_setup.ignore_log_patterns = 
list(self.fixture_dtest_setup.ignore_log_patterns) + [
             # This is caused by starting a node improperly (replacing 
active/nonexistent)
             r'Exception encountered during startup',
             # This is caused by trying to replace a nonexistent node
             r'Exception in thread Thread']
+
         self._setup(n=3)
         self._do_replace(replace_address='127.0.0.5', 
wait_for_binary_proto=False)
 
-        debug("Waiting for replace to fail")
+        logger.debug("Waiting for replace to fail")
         self.replacement_node.watch_log_for("java.lang.RuntimeException: 
Cannot replace_address /127.0.0.5 because it doesn't exist in gossip")
         assert_not_running(self.replacement_node)
 
     @since('3.6')
-    def fail_without_replace_test(self):
+    def test_fail_without_replace(self):
         """
         When starting a node from a clean slate with the same address as
         an existing down node, the node should error out even when
@@ -321,14 +333,16 @@ class TestReplaceAddress(BaseReplaceAddressTest):
         to use replace_address.
         @jira_ticket CASSANDRA-10134
         """
-        self.ignore_log_patterns = list(self.ignore_log_patterns) + 
[r'Exception encountered during startup']
+        self.fixture_dtest_setup.ignore_log_patterns = 
list(self.fixture_dtest_setup.ignore_log_patterns) + [
+            r'Exception encountered during startup']
+
         self._setup(n=3)
         self._insert_data()
         node1, node2, node3 = self.cluster.nodelist()
 
         mark = None
         for auto_bootstrap in (True, False):
-            debug("Stopping node 3.")
+            logger.debug("Stopping node 3.")
             node3.stop(gently=False)
 
             # completely delete the data, commitlog, and saved caches
@@ -339,13 +353,13 @@ class TestReplaceAddress(BaseReplaceAddressTest):
                     rmtree(d)
 
             node3.set_configuration_options(values={'auto_bootstrap': 
auto_bootstrap})
-            debug("Starting node 3 with auto_bootstrap = 
{val}".format(val=auto_bootstrap))
+            logger.debug("Starting node 3 with auto_bootstrap = 
{val}".format(val=auto_bootstrap))
             node3.start(wait_other_notice=False)
             node3.watch_log_for('Use cassandra.replace_address if you want to 
replace this node', from_mark=mark, timeout=20)
             mark = node3.mark_log()
 
     @since('3.6')
-    def unsafe_replace_test(self):
+    def test_unsafe_replace(self):
         """
         To handle situations such as failed disk in a JBOD, it may be 
desirable to
         replace a node without bootstrapping. In such scenarios best practice
@@ -359,14 +373,16 @@ class TestReplaceAddress(BaseReplaceAddressTest):
 
         @jira_ticket CASSANDRA-10134
         """
-        self.ignore_log_patterns = list(self.ignore_log_patterns) + 
[r'Exception encountered during startup']
+        self.fixture_dtest_setup.ignore_log_patterns = 
list(self.fixture_dtest_setup.ignore_log_patterns) + [
+            r'Exception encountered during startup']
+
         self._setup(n=3)
         self._insert_data()
         initial_data = self._fetch_initial_data()
         self.replacement_node = self.replaced_node
 
         for set_allow_unsafe_flag in [False, True]:
-            debug("Stopping {}".format(self.replaced_node.name))
+            logger.debug("Stopping {}".format(self.replaced_node.name))
             self.replaced_node.stop(gently=False)
 
             # completely delete the system keyspace data plus commitlog and 
saved caches
@@ -384,27 +400,27 @@ class TestReplaceAddress(BaseReplaceAddressTest):
             mark = self.replacement_node.mark_log()
 
             if set_allow_unsafe_flag:
-                debug('Starting replacement node with auto_bootstrap = false 
and replace_address = {} and allow_unsafe_replace = 
true'.format(self.replaced_node.address()))
+                logger.debug('Starting replacement node with auto_bootstrap = 
false and replace_address = {} and allow_unsafe_replace = 
true'.format(self.replaced_node.address()))
                 
self._do_replace(extra_jvm_args=['-Dcassandra.allow_unsafe_replace=true'])
                 self._verify_data(initial_data)
             else:
-                debug('Starting replacement node with auto_bootstrap = false 
and replace_address = {}'.format(self.replaced_node.address()))
+                logger.debug('Starting replacement node with auto_bootstrap = 
false and replace_address = {}'.format(self.replaced_node.address()))
                 self._do_replace(wait_for_binary_proto=False)
                 self.replacement_node.watch_log_for('To perform this 
operation, please restart with -Dcassandra.allow_unsafe_replace=true',
                                                     from_mark=mark, timeout=20)
 
-    @skipIf(CASSANDRA_VERSION_FROM_BUILD == '3.9', "Test doesn't run on 3.9")
+    @pytest.mark.skipif(CASSANDRA_VERSION_FROM_BUILD == '3.9', reason="Test 
doesn't run on 3.9")
     @since('2.2')
-    def insert_data_during_replace_same_address_test(self):
+    def test_insert_data_during_replace_same_address(self):
         """
         Test that replacement node with same address DOES NOT receive writes 
during replacement
         @jira_ticket CASSANDRA-8523
         """
         self._test_insert_data_during_replace(same_address=True)
 
-    @skipIf(CASSANDRA_VERSION_FROM_BUILD == '3.9', "Test doesn't run on 3.9")
+    @pytest.mark.skipif(CASSANDRA_VERSION_FROM_BUILD == '3.9', reason="Test 
doesn't run on 3.9")
     @since('2.2')
-    def insert_data_during_replace_different_address_test(self):
+    def test_insert_data_during_replace_different_address(self):
         """
         Test that replacement node with different address DOES receive writes 
during replacement
         @jira_ticket CASSANDRA-8523
@@ -412,8 +428,8 @@ class TestReplaceAddress(BaseReplaceAddressTest):
         self._test_insert_data_during_replace(same_address=False)
 
     @since('2.2')
-    @attr('resource-intensive')
-    def resume_failed_replace_test(self):
+    @pytest.mark.resource_intensive
+    def test_resume_failed_replace(self):
         """
         Test resumable bootstrap while replacing node. Feature introduced in
         2.2 with ticket https://issues.apache.org/jira/browse/CASSANDRA-8838
@@ -423,21 +439,23 @@ class TestReplaceAddress(BaseReplaceAddressTest):
         self._test_restart_failed_replace(mode='resume')
 
     @since('2.2')
-    @attr('resource-intensive')
-    def restart_failed_replace_with_reset_resume_state_test(self):
+    @pytest.mark.resource_intensive
+    def test_restart_failed_replace_with_reset_resume_state(self):
         """Test replace with resetting bootstrap progress"""
         self._test_restart_failed_replace(mode='reset_resume_state')
 
     @since('2.2')
-    @attr('resource-intensive')
-    def restart_failed_replace_test(self):
+    @pytest.mark.resource_intensive
+    def test_restart_failed_replace(self):
         """
         Test that if a node fails to replace, it can join the cluster even if 
the data is wiped.
         """
         self._test_restart_failed_replace(mode='wipe')
 
     def _test_restart_failed_replace(self, mode):
-        self.ignore_log_patterns = list(self.ignore_log_patterns) + [r'Error 
while waiting on bootstrap to complete']
+        self.fixture_dtest_setup.ignore_log_patterns = 
list(self.fixture_dtest_setup.ignore_log_patterns) + [
+            r'Error while waiting on bootstrap to complete']
+
         self._setup(n=3, enable_byteman=True)
         self._insert_data(n="1k")
 
@@ -445,7 +463,7 @@ class TestReplaceAddress(BaseReplaceAddressTest):
 
         self._stop_node_to_replace()
 
-        debug("Submitting byteman script to make stream fail")
+        logger.debug("Submitting byteman script to make stream fail")
 
         if self.cluster.version() < '4.0':
             
self.query_node.byteman_submit(['./byteman/pre4.0/stream_failure.btm'])
@@ -460,7 +478,7 @@ class TestReplaceAddress(BaseReplaceAddressTest):
 
         if mode == 'reset_resume_state':
             mark = self.replacement_node.mark_log()
-            debug("Restarting replacement node with 
-Dcassandra.reset_bootstrap_progress=true")
+            logger.debug("Restarting replacement node with 
-Dcassandra.reset_bootstrap_progress=true")
             # restart replacement node with resetting bootstrap state
             self.replacement_node.stop()
             self.replacement_node.start(jvm_args=[
@@ -471,7 +489,7 @@ class TestReplaceAddress(BaseReplaceAddressTest):
             # check if we reset bootstrap state
             self.replacement_node.watch_log_for("Resetting bootstrap progress 
to start fresh", from_mark=mark)
         elif mode == 'resume':
-            debug("Resuming failed bootstrap")
+            logger.debug("Resuming failed bootstrap")
             self.replacement_node.nodetool('bootstrap resume')
             # check if we skipped already retrieved ranges
             self.replacement_node.watch_log_for("already available. Skipping 
streaming.")
@@ -479,37 +497,39 @@ class TestReplaceAddress(BaseReplaceAddressTest):
         elif mode == 'wipe':
             self.replacement_node.stop()
 
-            debug("Waiting other nodes to detect node stopped")
-            self.query_node.watch_log_for("FatClient /{} has been silent for 
30000ms, removing from gossip".format(self.replacement_node.address()), 
timeout=60)
-            self.query_node.watch_log_for("Node /{} failed during 
replace.".format(self.replacement_node.address()), timeout=60, 
filename='debug.log')
+            logger.debug("Waiting other nodes to detect node stopped")
+            self.query_node.watch_log_for("FatClient /{} has been silent for 
30000ms, removing from gossip".format(self.replacement_node.address()), 
timeout=120)
+            self.query_node.watch_log_for("Node /{} failed during 
replace.".format(self.replacement_node.address()), timeout=120, 
filename='debug.log')
 
-            debug("Restarting node after wiping data")
+            logger.debug("Restarting node after wiping data")
             self._cleanup(self.replacement_node)
             
self.replacement_node.start(jvm_args=["-Dcassandra.replace_address_first_boot={}".format(self.replaced_node.address())],
                                         wait_for_binary_proto=True)
         else:
-            raise RuntimeError('invalid mode value {mode}'.format(mode))
+            raise RuntimeError('invalid mode value {mode}'.format(mode=mode))
 
         # check if bootstrap succeeded
         assert_bootstrap_state(self, self.replacement_node, 'COMPLETED')
 
-        debug("Bootstrap finished successully, verifying data.")
+        logger.debug("Bootstrap finished successully, verifying data.")
 
         self._verify_data(initial_data)
 
-    def replace_with_insufficient_replicas_test(self):
+    def test_replace_with_insufficient_replicas(self):
         """
         Test that replace fails when there are insufficient replicas
         @jira_ticket CASSANDRA-11848
         """
-        self.ignore_log_patterns = list(self.ignore_log_patterns) + [r'Unable 
to find sufficient sources for streaming range']
+        self.fixture_dtest_setup.ignore_log_patterns = 
list(self.fixture_dtest_setup.ignore_log_patterns) + [
+            r'Unable to find sufficient sources for streaming range']
+
         self._setup(n=3)
         self._insert_data(rf=2)
 
         self._stop_node_to_replace()
 
         # stop other replica
-        debug("Stopping other replica")
+        logger.debug("Stopping other replica")
         self.query_node.stop(wait_other_notice=True)
 
         self._do_replace(wait_for_binary_proto=False, wait_other_notice=False)
@@ -518,7 +538,9 @@ class TestReplaceAddress(BaseReplaceAddressTest):
         self.replacement_node.watch_log_for("Unable to find sufficient sources 
for streaming range")
         assert_not_running(self.replacement_node)
 
-    def multi_dc_replace_with_rf1_test(self):
+    @flaky
+    @pytest.mark.vnodes
+    def test_multi_dc_replace_with_rf1(self):
         """
         Test that multi-dc replace works when rf=1 on each dc
         """
@@ -550,7 +572,11 @@ class TestReplaceAddress(BaseReplaceAddressTest):
             stress_config.write(yaml_config)
             stress_config.flush()
             self.query_node.stress(['user', 'profile=' + stress_config.name, 
'n=10k', 'no-warmup',
-                                    'ops(insert=1)', '-rate', 'threads=50'])
+                                    'ops(insert=1)', '-rate', 'threads=5'])
+            # need to sleep for a bit to try and let things catch up as we 
frequently do a lot of
+            # GC after the stress invocation above causing the next step of 
the test to timeout.
+            # and then flush to make sure we really are fully caught up
+            time.sleep(30)
 
         # Save initial data
         table_name = 'keyspace1.users'
@@ -563,13 +589,13 @@ class TestReplaceAddress(BaseReplaceAddressTest):
         assert_bootstrap_state(self, self.replacement_node, 'COMPLETED')
 
         # Check that keyspace was replicated from dc1 to dc2
-        self.assertFalse(self.replacement_node.grep_log("Unable to find 
sufficient sources for streaming range"))
+        assert not self.replacement_node.grep_log("Unable to find sufficient 
sources for streaming range")
 
         self._verify_data(initial_data, table=table_name, 
cl=ConsistencyLevel.LOCAL_ONE)
 
     def _cleanup(self, node):
         commitlog_dir = os.path.join(node.get_path(), 'commitlogs')
         for data_dir in node.data_directories():
-            debug("Deleting {}".format(data_dir))
+            logger.debug("Deleting {}".format(data_dir))
             rmtree(data_dir)
         rmtree(commitlog_dir)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to