http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/scrub_test.py
----------------------------------------------------------------------
diff --git a/scrub_test.py b/scrub_test.py
index 4f9d22b..2df11c7 100644
--- a/scrub_test.py
+++ b/scrub_test.py
@@ -4,28 +4,33 @@ import re
 import subprocess
 import time
 import uuid
-
+import pytest
 import parse
+import logging
+
 from ccmlib import common
 
-from dtest import Tester, debug, create_ks, create_cf
+from dtest import Tester, create_ks, create_cf
 from tools.assertions import assert_length_equal, assert_stderr_clean
-from tools.decorators import since
+
+since = pytest.mark.since
+logger = logging.getLogger(__name__)
 
 KEYSPACE = 'ks'
 
 
 class TestHelper(Tester):
 
-    def setUp(self):
+    @pytest.fixture(scope='function', autouse=True)
+    def fixture_set_cluster_settings(self, fixture_dtest_setup):
         """
         disable JBOD configuration for scrub tests.
         range-aware JBOD can skip generation in SSTable,
         and some tests rely on generation numbers/
         (see CASSANDRA-11693 and increase_sstable_generations)
         """
-        super(TestHelper, self).setUp()
-        self.cluster.set_datadir_count(1)
+        fixture_dtest_setup.cluster.set_datadir_count(1)
+
 
     def get_table_paths(self, table):
         """
@@ -56,13 +61,13 @@ class TestHelper(Tester):
         Return the sstable files at a specific location
         """
         ret = []
-        debug('Checking sstables in {}'.format(paths))
+        logger.debug('Checking sstables in {}'.format(paths))
 
         for ext in ('*.db', '*.txt', '*.adler32', '*.sha1'):
             for path in paths:
                 for fname in glob.glob(os.path.join(path, ext)):
                     bname = os.path.basename(fname)
-                    debug('Found sstable file {}'.format(bname))
+                    logger.debug('Found sstable file {}'.format(bname))
                     ret.append(bname)
         return ret
 
@@ -77,7 +82,7 @@ class TestHelper(Tester):
                 for path in paths:
                     fullname = os.path.join(path, fname)
                     if (os.path.exists(fullname)):
-                        debug('Deleting {}'.format(fullname))
+                        logger.debug('Deleting {}'.format(fullname))
                         os.remove(fullname)
 
     def get_sstables(self, table, indexes):
@@ -86,12 +91,12 @@ class TestHelper(Tester):
         """
         sstables = {}
         table_sstables = self.get_sstable_files(self.get_table_paths(table))
-        self.assertGreater(len(table_sstables), 0)
+        assert len(table_sstables) > 0
         sstables[table] = sorted(table_sstables)
 
         for index in indexes:
             index_sstables = 
self.get_sstable_files(self.get_index_paths(table, index))
-            self.assertGreater(len(index_sstables), 0)
+            assert len(index_sstables) > 0
             sstables[index] = sorted('{}/{}'.format(index, sstable) for 
sstable in index_sstables)
 
         return sstables
@@ -112,16 +117,16 @@ class TestHelper(Tester):
         node1 = self.cluster.nodelist()[0]
         env = common.make_cassandra_env(node1.get_install_cassandra_root(), 
node1.get_node_cassandra_root())
         scrub_bin = node1.get_tool('sstablescrub')
-        debug(scrub_bin)
+        logger.debug(scrub_bin)
 
         args = [scrub_bin, ks, cf]
         p = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, 
stderr=subprocess.PIPE)
         out, err = p.communicate()
-        debug(out)
+        logger.debug(out.decode("utf-8"))
         # if we have less than 64G free space, we get this warning - ignore it
-        if err and "Consider adding more capacity" not in err:
-            debug(err)
-            assert_stderr_clean(err)
+        if err and "Consider adding more capacity" not in err.decode("utf-8"):
+            logger.debug(err.decode("utf-8"))
+            assert_stderr_clean(err.decode("utf-8"))
 
     def perform_node_tool_cmd(self, cmd, table, indexes):
         """
@@ -169,11 +174,11 @@ class TestHelper(Tester):
         After finding the number of existing sstables, increase all of the
         generations by that amount.
         """
-        for table_or_index, table_sstables in sstables.items():
+        for table_or_index, table_sstables in list(sstables.items()):
             increment_by = 
len(set(parse.search('{}-{increment_by}-{suffix}.{file_extention}', 
s).named['increment_by'] for s in table_sstables))
             sstables[table_or_index] = [self.increment_generation_by(s, 
increment_by) for s in table_sstables]
 
-        debug('sstables after increment {}'.format(str(sstables)))
+        logger.debug('sstables after increment {}'.format(str(sstables)))
 
 
 @since('2.2')
@@ -228,18 +233,18 @@ class TestScrubIndexes(TestHelper):
         scrubbed_sstables = self.scrub('users', 'gender_idx', 'state_idx', 
'birth_year_idx')
 
         self.increase_sstable_generations(initial_sstables)
-        self.assertEqual(initial_sstables, scrubbed_sstables)
+        assert initial_sstables == scrubbed_sstables
 
         users = self.query_users(session)
-        self.assertEqual(initial_users, users)
+        assert initial_users == users
 
         # Scrub and check sstables and data again
         scrubbed_sstables = self.scrub('users', 'gender_idx', 'state_idx', 
'birth_year_idx')
         self.increase_sstable_generations(initial_sstables)
-        self.assertEqual(initial_sstables, scrubbed_sstables)
+        assert initial_sstables == scrubbed_sstables
 
         users = self.query_users(session)
-        self.assertEqual(initial_users, users)
+        assert initial_users == users
 
         # Restart and check data again
         cluster.stop()
@@ -249,7 +254,7 @@ class TestScrubIndexes(TestHelper):
         session.execute('USE {}'.format(KEYSPACE))
 
         users = self.query_users(session)
-        self.assertEqual(initial_users, users)
+        assert initial_users == users
 
     def test_standalone_scrub(self):
         cluster = self.cluster
@@ -269,14 +274,14 @@ class TestScrubIndexes(TestHelper):
 
         scrubbed_sstables = self.standalonescrub('users', 'gender_idx', 
'state_idx', 'birth_year_idx')
         self.increase_sstable_generations(initial_sstables)
-        self.assertEqual(initial_sstables, scrubbed_sstables)
+        assert initial_sstables == scrubbed_sstables
 
         cluster.start()
         session = self.patient_cql_connection(node1)
         session.execute('USE {}'.format(KEYSPACE))
 
         users = self.query_users(session)
-        self.assertEqual(initial_users, users)
+        assert initial_users == users
 
     def test_scrub_collections_table(self):
         cluster = self.cluster
@@ -297,25 +302,25 @@ class TestScrubIndexes(TestHelper):
             session.execute(("UPDATE users set uuids = [{id}] where user_id = 
{user_id}").format(id=_id, user_id=user_uuid))
 
         initial_users = list(session.execute(("SELECT * from users where uuids 
contains {some_uuid}").format(some_uuid=_id)))
-        self.assertEqual(num_users, len(initial_users))
+        assert num_users == len(initial_users)
 
         initial_sstables = self.flush('users', 'user_uuids_idx')
         scrubbed_sstables = self.scrub('users', 'user_uuids_idx')
 
         self.increase_sstable_generations(initial_sstables)
-        self.assertEqual(initial_sstables, scrubbed_sstables)
+        assert initial_sstables == scrubbed_sstables
 
         users = list(session.execute(("SELECT * from users where uuids 
contains {some_uuid}").format(some_uuid=_id)))
-        self.assertEqual(initial_users, users)
+        assert initial_users == users
 
         scrubbed_sstables = self.scrub('users', 'user_uuids_idx')
 
         self.increase_sstable_generations(initial_sstables)
-        self.assertEqual(initial_sstables, scrubbed_sstables)
+        assert initial_sstables == scrubbed_sstables
 
         users = list(session.execute(("SELECT * from users where uuids 
contains {some_uuid}").format(some_uuid=_id)))
 
-        self.assertListEqual(initial_users, users)
+        assert initial_users == users
 
 
 class TestScrub(TestHelper):
@@ -365,18 +370,18 @@ class TestScrub(TestHelper):
         scrubbed_sstables = self.scrub('users')
 
         self.increase_sstable_generations(initial_sstables)
-        self.assertEqual(initial_sstables, scrubbed_sstables)
+        assert initial_sstables == scrubbed_sstables
 
         users = self.query_users(session)
-        self.assertEqual(initial_users, users)
+        assert initial_users == users
 
         # Scrub and check sstables and data again
         scrubbed_sstables = self.scrub('users')
         self.increase_sstable_generations(initial_sstables)
-        self.assertEqual(initial_sstables, scrubbed_sstables)
+        assert initial_sstables == scrubbed_sstables
 
         users = self.query_users(session)
-        self.assertEqual(initial_users, users)
+        assert initial_users == users
 
         # Restart and check data again
         cluster.stop()
@@ -386,7 +391,7 @@ class TestScrub(TestHelper):
         session.execute('USE {}'.format(KEYSPACE))
 
         users = self.query_users(session)
-        self.assertEqual(initial_users, users)
+        assert initial_users == users
 
     def test_standalone_scrub(self):
         cluster = self.cluster
@@ -406,14 +411,14 @@ class TestScrub(TestHelper):
 
         scrubbed_sstables = self.standalonescrub('users')
         self.increase_sstable_generations(initial_sstables)
-        self.assertEqual(initial_sstables, scrubbed_sstables)
+        assert initial_sstables == scrubbed_sstables
 
         cluster.start()
         session = self.patient_cql_connection(node1)
         session.execute('USE {}'.format(KEYSPACE))
 
         users = self.query_users(session)
-        self.assertEqual(initial_users, users)
+        assert initial_users == users
 
     def test_standalone_scrub_essential_files_only(self):
         cluster = self.cluster
@@ -435,14 +440,14 @@ class TestScrub(TestHelper):
 
         scrubbed_sstables = self.standalonescrub('users')
         self.increase_sstable_generations(initial_sstables)
-        self.assertEqual(initial_sstables, scrubbed_sstables)
+        assert initial_sstables == scrubbed_sstables
 
         cluster.start()
         session = self.patient_cql_connection(node1)
         session.execute('USE {}'.format(KEYSPACE))
 
         users = self.query_users(session)
-        self.assertEqual(initial_users, users)
+        assert initial_users == users
 
     def test_scrub_with_UDT(self):
         """
@@ -460,4 +465,4 @@ class TestScrub(TestHelper):
         node1.nodetool("scrub")
         time.sleep(2)
         match = 
node1.grep_log("org.apache.cassandra.serializers.MarshalException: Not enough 
bytes to read a set")
-        self.assertEqual(len(match), 0)
+        assert len(match) == 0

http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/secondary_indexes_test.py
----------------------------------------------------------------------
diff --git a/secondary_indexes_test.py b/secondary_indexes_test.py
index 9cb6cc0..9b0f326 100644
--- a/secondary_indexes_test.py
+++ b/secondary_indexes_test.py
@@ -3,7 +3,10 @@ import random
 import re
 import time
 import uuid
-from unittest import skipIf
+import pytest
+import logging
+
+from flaky import flaky
 
 from cassandra import InvalidRequest
 from cassandra.concurrent import (execute_concurrent,
@@ -11,12 +14,13 @@ from cassandra.concurrent import (execute_concurrent,
 from cassandra.protocol import ConfigurationException
 from cassandra.query import BatchStatement, SimpleStatement
 
-from dtest import (DISABLE_VNODES, OFFHEAP_MEMTABLES, Tester, debug, 
CASSANDRA_VERSION_FROM_BUILD, create_ks, create_cf)
+from dtest import Tester, create_ks, create_cf
 from tools.assertions import assert_bootstrap_state, assert_invalid, 
assert_none, assert_one, assert_row_count
 from tools.data import block_until_index_is_built, rows_to_list
-from tools.decorators import since
 from tools.misc import new_node
 
+since = pytest.mark.since
+logger = logging.getLogger(__name__)
 
 class TestSecondaryIndexes(Tester):
 
@@ -30,7 +34,7 @@ class TestSecondaryIndexes(Tester):
             files.extend(os.listdir(index_sstables_dir))
         return set(files)
 
-    def data_created_before_index_not_returned_in_where_query_test(self):
+    def test_data_created_before_index_not_returned_in_where_query(self):
         """
         @jira_ticket CASSANDRA-3367
         """
@@ -79,9 +83,10 @@ class TestSecondaryIndexes(Tester):
         session.execute("CREATE INDEX b_index ON ks.cf (b);")
         num_rows = 100
         for i in range(num_rows):
-            indexed_value = i % (num_rows / 3)
+            indexed_value = i % (num_rows // 3)
             # use the same indexed value three times
-            session.execute("INSERT INTO ks.cf (a, b) VALUES ('%d', '%d');" % 
(i, indexed_value))
+            session.execute("INSERT INTO ks.cf (a, b) VALUES ('{a}', '{b}');"
+                            .format(a=i, b=indexed_value))
 
         cluster.flush()
 
@@ -99,31 +104,32 @@ class TestSecondaryIndexes(Tester):
                 if match:
                     concurrency = int(match.group(1))
                     expected_per_range = float(match.group(2))
-                    self.assertTrue(concurrency > 1, "Expected more than 1 
concurrent range request, got %d" % concurrency)
-                    self.assertTrue(expected_per_range > 0)
+                    assert concurrency > 1, "Expected more than 1 concurrent 
range request, got %d" % concurrency
+                    assert expected_per_range > 0
                     break
             else:
                 self.fail("Didn't find matching trace event")
 
         query = SimpleStatement("SELECT * FROM ks.cf WHERE b='1';")
         result = session.execute(query, trace=True)
-        self.assertEqual(3, len(list(result)))
+        assert 3 == len(list(result))
         check_trace_events(result.get_query_trace())
 
         query = SimpleStatement("SELECT * FROM ks.cf WHERE b='1' LIMIT 100;")
         result = session.execute(query, trace=True)
-        self.assertEqual(3, len(list(result)))
+        assert 3 == len(list(result))
         check_trace_events(result.get_query_trace())
 
         query = SimpleStatement("SELECT * FROM ks.cf WHERE b='1' LIMIT 3;")
         result = session.execute(query, trace=True)
-        self.assertEqual(3, len(list(result)))
+        assert 3 == len(list(result))
         check_trace_events(result.get_query_trace())
 
         for limit in (1, 2):
             result = list(session.execute("SELECT * FROM ks.cf WHERE b='1' 
LIMIT %d;" % (limit,)))
-            self.assertEqual(limit, len(result))
+            assert limit == len(result)
 
+    @flaky(3)
     def test_6924_dropping_ks(self):
         """
         @jira_ticket CASSANDRA-6924
@@ -152,7 +158,7 @@ class TestSecondaryIndexes(Tester):
         # This only occurs when dropping and recreating with
         # the same name, so loop through this test a few times:
         for i in range(10):
-            debug("round %s" % i)
+            logger.debug("round %s" % i)
             try:
                 session.execute("DROP KEYSPACE ks")
             except ConfigurationException:
@@ -170,8 +176,9 @@ class TestSecondaryIndexes(Tester):
 
             rows = session.execute("select count(*) from ks.cf WHERE 
col1='asdf'")
             count = rows[0][0]
-            self.assertEqual(count, 10)
+            assert count == 10
 
+    @flaky
     def test_6924_dropping_cf(self):
         """
         @jira_ticket CASSANDRA-6924
@@ -190,7 +197,7 @@ class TestSecondaryIndexes(Tester):
         # This only occurs when dropping and recreating with
         # the same name, so loop through this test a few times:
         for i in range(10):
-            debug("round %s" % i)
+            logger.debug("round %s" % i)
             try:
                 session.execute("DROP COLUMNFAMILY ks.cf")
             except InvalidRequest:
@@ -207,7 +214,7 @@ class TestSecondaryIndexes(Tester):
 
             rows = session.execute("select count(*) from ks.cf WHERE 
col1='asdf'")
             count = rows[0][0]
-            self.assertEqual(count, 10)
+            assert count == 10
 
     def test_8280_validate_indexed_values(self):
         """
@@ -282,21 +289,8 @@ class TestSecondaryIndexes(Tester):
             pass
 
     def wait_for_schema_agreement(self, session):
-        rows = list(session.execute("SELECT schema_version FROM system.local"))
-        local_version = rows[0]
-
-        all_match = True
-        rows = list(session.execute("SELECT schema_version FROM system.peers"))
-        for peer_version in rows:
-            if peer_version != local_version:
-                all_match = False
-                break
-
-        if all_match:
-            return
-        else:
-            time.sleep(1)
-            self.wait_for_schema_agreement(session)
+        if not 
session.cluster.control_connection.wait_for_schema_agreement(wait_time=120):
+            raise AssertionError("Failed to reach schema agreement")
 
     @since('3.0')
     def test_manual_rebuild_index(self):
@@ -316,18 +310,18 @@ class TestSecondaryIndexes(Tester):
         block_until_index_is_built(node1, session, 'keyspace1', 'standard1', 
'ix_c0')
 
         stmt = session.prepare('select * from standard1 where "C0" = ?')
-        self.assertEqual(1, len(list(session.execute(stmt, [lookup_value]))))
+        assert 1 == len(list(session.execute(stmt, [lookup_value])))
         before_files = self._index_sstables_files(node1, 'keyspace1', 
'standard1', 'ix_c0')
 
         node1.nodetool("rebuild_index keyspace1 standard1 ix_c0")
         block_until_index_is_built(node1, session, 'keyspace1', 'standard1', 
'ix_c0')
 
         after_files = self._index_sstables_files(node1, 'keyspace1', 
'standard1', 'ix_c0')
-        self.assertNotEqual(before_files, after_files)
-        self.assertEqual(1, len(list(session.execute(stmt, [lookup_value]))))
+        assert before_files != after_files
+        assert 1 == len(list(session.execute(stmt, [lookup_value])))
 
         # verify that only the expected row is present in the build indexes 
table
-        self.assertEqual(1, len(list(session.execute("""SELECT * FROM 
system."IndexInfo";"""))))
+        assert 1 == len(list(session.execute("""SELECT * FROM 
system."IndexInfo";""")))
 
     @since('4.0')
     def test_failing_manual_rebuild_index(self):
@@ -355,12 +349,12 @@ class TestSecondaryIndexes(Tester):
         # Simulate a failing index rebuild
         before_files = self._index_sstables_files(node, 'k', 't', 'idx')
         node.byteman_submit(['./byteman/index_build_failure.btm'])
-        with self.assertRaises(Exception):
+        with pytest.raises(Exception):
             node.nodetool("rebuild_index k t idx")
         after_files = self._index_sstables_files(node, 'k', 't', 'idx')
 
         # Verify that the index is not rebuilt, not marked as built, and it 
still can answer queries
-        self.assertEqual(before_files, after_files)
+        assert before_files == after_files
         assert_none(session, """SELECT * FROM system."IndexInfo" WHERE 
table_name='k'""")
         assert_one(session, "SELECT * FROM k.t WHERE v = 1", [0, 1])
 
@@ -374,19 +368,19 @@ class TestSecondaryIndexes(Tester):
         after_files = self._index_sstables_files(node, 'k', 't', 'idx')
 
         # Verify that, the index is rebuilt, marked as built, and it can 
answer queries
-        self.assertNotEqual(before_files, after_files)
+        assert before_files != after_files
         assert_one(session, """SELECT table_name, index_name FROM 
system."IndexInfo" WHERE table_name='k'""", ['k', 'idx'])
         assert_one(session, "SELECT * FROM k.t WHERE v = 1", [0, 1])
 
         # Simulate another failing index rebuild
         before_files = self._index_sstables_files(node, 'k', 't', 'idx')
         node.byteman_submit(['./byteman/index_build_failure.btm'])
-        with self.assertRaises(Exception):
+        with pytest.raises(Exception):
             node.nodetool("rebuild_index k t idx")
         after_files = self._index_sstables_files(node, 'k', 't', 'idx')
 
         # Verify that the index is not rebuilt, not marked as built, and it 
still can answer queries
-        self.assertEqual(before_files, after_files)
+        assert before_files == after_files
         assert_none(session, """SELECT * FROM system."IndexInfo" WHERE 
table_name='k'""")
         assert_one(session, "SELECT * FROM k.t WHERE v = 1", [0, 1])
 
@@ -397,7 +391,7 @@ class TestSecondaryIndexes(Tester):
         after_files = self._index_sstables_files(node, 'k', 't', 'idx')
 
         # Verify that the index is rebuilt, marked as built, and it can answer 
queries
-        self.assertNotEqual(before_files, after_files)
+        assert before_files != after_files
         assert_one(session, """SELECT table_name, index_name FROM 
system."IndexInfo" WHERE table_name='k'""", ['k', 'idx'])
         assert_one(session, "SELECT * FROM k.t WHERE v = 1", [0, 1])
 
@@ -456,22 +450,22 @@ class TestSecondaryIndexes(Tester):
         session.execute("CREATE TABLE k.t (k int PRIMARY KEY, v int)")
         session.execute("INSERT INTO k.t(k, v) VALUES (0, 1)")
 
-        debug("Create the index")
+        logger.debug("Create the index")
         session.execute("CREATE INDEX idx ON k.t(v)")
         block_until_index_is_built(node, session, 'k', 't', 'idx')
         before_files = self._index_sstables_files(node, 'k', 't', 'idx')
 
-        debug("Verify the index is marked as built and it can be queried")
+        logger.debug("Verify the index is marked as built and it can be 
queried")
         assert_one(session, """SELECT table_name, index_name FROM 
system."IndexInfo" WHERE table_name='k'""", ['k', 'idx'])
         assert_one(session, "SELECT * FROM k.t WHERE v = 1", [0, 1])
 
-        debug("Restart the node and verify the index build is not submitted")
+        logger.debug("Restart the node and verify the index build is not 
submitted")
         node.stop()
         node.start(wait_for_binary_proto=True)
         after_files = self._index_sstables_files(node, 'k', 't', 'idx')
-        self.assertEqual(before_files, after_files)
+        assert before_files == after_files
 
-        debug("Verify the index is still marked as built and it can be 
queried")
+        logger.debug("Verify the index is still marked as built and it can be 
queried")
         session = self.patient_cql_connection(node)
         assert_one(session, """SELECT table_name, index_name FROM 
system."IndexInfo" WHERE table_name='k'""", ['k', 'idx'])
         assert_one(session, "SELECT * FROM k.t WHERE v = 1", [0, 1])
@@ -496,7 +490,7 @@ class TestSecondaryIndexes(Tester):
         session.execute("INSERT INTO tbl (id, c0, c1, c2) values (uuid(), 'a', 
'e', 'f');")
 
         rows = list(session.execute("SELECT * FROM tbl WHERE c0 = 'a';"))
-        self.assertEqual(4, len(rows))
+        assert 4 == len(rows)
 
         stmt = "SELECT * FROM tbl WHERE c0 = 'a' AND c1 = 'b';"
         assert_invalid(session, stmt, "Cannot execute this query as it might 
involve data filtering and thus may have "
@@ -504,7 +498,7 @@ class TestSecondaryIndexes(Tester):
                                       "performance unpredictability, use ALLOW 
FILTERING")
 
         rows = list(session.execute("SELECT * FROM tbl WHERE c0 = 'a' AND c1 = 
'b' ALLOW FILTERING;"))
-        self.assertEqual(2, len(rows))
+        assert 2 == len(rows)
 
     @since('3.0')
     def test_only_coordinator_chooses_index_for_query(self):
@@ -523,7 +517,7 @@ class TestSecondaryIndexes(Tester):
         session.execute("CREATE INDEX b_index ON ks.cf (b);")
         num_rows = 100
         for i in range(num_rows):
-            indexed_value = i % (num_rows / 3)
+            indexed_value = i % (num_rows // 3)
             # use the same indexed value three times
             session.execute("INSERT INTO ks.cf (a, b) VALUES ('{a}', '{b}');"
                             .format(a=i, b=indexed_value))
@@ -560,7 +554,7 @@ class TestSecondaryIndexes(Tester):
                               actual=match_counts[event_source], 
all=match_counts))
 
         def retry_on_failure(trace, regex, expected_matches, match_counts, 
event_source, min_expected, max_expected):
-            debug("Trace event inspection did not match expected, sleeping 
before re-fetching trace events. "
+            logger.debug("Trace event inspection did not match expected, 
sleeping before re-fetching trace events. "
                   "Expected: {expected} Actual: 
{actual}".format(expected=expected_matches, actual=match_counts))
             time.sleep(2)
             trace.populate(max_wait=2.0)
@@ -568,7 +562,7 @@ class TestSecondaryIndexes(Tester):
 
         query = SimpleStatement("SELECT * FROM ks.cf WHERE b='1';")
         result = session.execute(query, trace=True)
-        self.assertEqual(3, len(list(result)))
+        assert 3 == len(list(result))
 
         trace = result.get_query_trace()
 
@@ -586,7 +580,7 @@ class TestSecondaryIndexes(Tester):
                            [("127.0.0.1", 1, 200), ("127.0.0.2", 1, 200), 
("127.0.0.3", 1, 200)],
                            retry_on_failure)
 
-    @skipIf(DISABLE_VNODES, "Test should only run with vnodes")
+    @pytest.mark.vnodes
     def test_query_indexes_with_vnodes(self):
         """
         Verifies correct query behaviour in the presence of vnodes
@@ -597,7 +591,7 @@ class TestSecondaryIndexes(Tester):
         node1, node2 = cluster.nodelist()
         session = self.patient_cql_connection(node1)
         session.execute("CREATE KEYSPACE ks WITH REPLICATION = {'class': 
'SimpleStrategy', 'replication_factor': '1'};")
-        session.execute("CREATE TABLE ks.compact_table (a int PRIMARY KEY, b 
int) WITH COMPACT STORAGE;")
+        session.execute("CREATE TABLE ks.compact_table (a int PRIMARY KEY, b 
int);")
         session.execute("CREATE INDEX keys_index ON ks.compact_table (b);")
         session.execute("CREATE TABLE ks.regular_table (a int PRIMARY KEY, b 
int)")
         session.execute("CREATE INDEX composites_index on ks.regular_table 
(b)")
@@ -605,7 +599,7 @@ class TestSecondaryIndexes(Tester):
         for node in cluster.nodelist():
             block_until_index_is_built(node, session, 'ks', 'regular_table', 
'composites_index')
 
-        insert_args = [(i, i % 2) for i in xrange(100)]
+        insert_args = [(i, i % 2) for i in range(100)]
         execute_concurrent_with_args(session,
                                      session.prepare("INSERT INTO 
ks.compact_table (a, b) VALUES (?, ?)"),
                                      insert_args)
@@ -614,9 +608,9 @@ class TestSecondaryIndexes(Tester):
                                      insert_args)
 
         res = session.execute("SELECT * FROM ks.compact_table WHERE b = 0")
-        self.assertEqual(len(rows_to_list(res)), 50)
+        assert len(rows_to_list(res)) == 50
         res = session.execute("SELECT * FROM ks.regular_table WHERE b = 0")
-        self.assertEqual(len(rows_to_list(res)), 50)
+        assert len(rows_to_list(res)) == 50
 
 
 class TestSecondaryIndexesOnCollections(Tester):
@@ -649,7 +643,7 @@ class TestSecondaryIndexesOnCollections(Tester):
         results = execute_concurrent(session, cmds * 5, 
raise_on_first_error=True, concurrency=200)
 
         for (success, result) in results:
-            self.assertTrue(success, "didn't get success on insert: 
{0}".format(result))
+            assert success, "didn't get success on insert: {0}".format(result)
 
         session.execute("CREATE INDEX idx_single_tuple ON 
simple_with_tuple(single_tuple);")
         session.execute("CREATE INDEX idx_double_tuple ON 
simple_with_tuple(double_tuple);")
@@ -659,25 +653,25 @@ class TestSecondaryIndexesOnCollections(Tester):
 
         # check if indexes work on existing data
         for n in range(50):
-            self.assertEqual(5, len(list(session.execute("select * from 
simple_with_tuple where single_tuple = ({0});".format(n)))))
-            self.assertEqual(0, len(list(session.execute("select * from 
simple_with_tuple where single_tuple = (-1);".format(n)))))
-            self.assertEqual(5, len(list(session.execute("select * from 
simple_with_tuple where double_tuple = ({0},{0});".format(n)))))
-            self.assertEqual(0, len(list(session.execute("select * from 
simple_with_tuple where double_tuple = ({0},-1);".format(n)))))
-            self.assertEqual(5, len(list(session.execute("select * from 
simple_with_tuple where triple_tuple = ({0},{0},{0});".format(n)))))
-            self.assertEqual(0, len(list(session.execute("select * from 
simple_with_tuple where triple_tuple = ({0},{0},-1);".format(n)))))
-            self.assertEqual(5, len(list(session.execute("select * from 
simple_with_tuple where nested_one = ({0},({0},{0}));".format(n)))))
-            self.assertEqual(0, len(list(session.execute("select * from 
simple_with_tuple where nested_one = ({0},({0},-1));".format(n)))))
+            assert 5 == len(list(session.execute("select * from 
simple_with_tuple where single_tuple = ({0});".format(n))))
+            assert 0 == len(list(session.execute("select * from 
simple_with_tuple where single_tuple = (-1);".format(n))))
+            assert 5 == len(list(session.execute("select * from 
simple_with_tuple where double_tuple = ({0},{0});".format(n))))
+            assert 0 == len(list(session.execute("select * from 
simple_with_tuple where double_tuple = ({0},-1);".format(n))))
+            assert 5 == len(list(session.execute("select * from 
simple_with_tuple where triple_tuple = ({0},{0},{0});".format(n))))
+            assert 0 == len(list(session.execute("select * from 
simple_with_tuple where triple_tuple = ({0},{0},-1);".format(n))))
+            assert 5 == len(list(session.execute("select * from 
simple_with_tuple where nested_one = ({0},({0},{0}));".format(n))))
+            assert 0 == len(list(session.execute("select * from 
simple_with_tuple where nested_one = ({0},({0},-1));".format(n))))
 
         # check if indexes work on new data inserted after index creation
         results = execute_concurrent(session, cmds * 3, 
raise_on_first_error=True, concurrency=200)
         for (success, result) in results:
-            self.assertTrue(success, "didn't get success on insert: 
{0}".format(result))
+            assert success, "didn't get success on insert: {0}".format(result)
         time.sleep(5)
         for n in range(50):
-            self.assertEqual(8, len(list(session.execute("select * from 
simple_with_tuple where single_tuple = ({0});".format(n)))))
-            self.assertEqual(8, len(list(session.execute("select * from 
simple_with_tuple where double_tuple = ({0},{0});".format(n)))))
-            self.assertEqual(8, len(list(session.execute("select * from 
simple_with_tuple where triple_tuple = ({0},{0},{0});".format(n)))))
-            self.assertEqual(8, len(list(session.execute("select * from 
simple_with_tuple where nested_one = ({0},({0},{0}));".format(n)))))
+            assert 8 == len(list(session.execute("select * from 
simple_with_tuple where single_tuple = ({0});".format(n))))
+            assert 8 == len(list(session.execute("select * from 
simple_with_tuple where double_tuple = ({0},{0});".format(n))))
+            assert 8 == len(list(session.execute("select * from 
simple_with_tuple where triple_tuple = ({0},{0},{0});".format(n))))
+            assert 8 == len(list(session.execute("select * from 
simple_with_tuple where nested_one = ({0},({0},{0}));".format(n))))
 
         # check if indexes work on mutated data
         for n in range(5):
@@ -698,15 +692,15 @@ class TestSecondaryIndexesOnCollections(Tester):
                 session.execute("update simple_with_tuple set nested_one = 
(-999,(-999,-999)) where id = {0}".format(row.id))
 
         for n in range(5):
-            self.assertEqual(0, len(list(session.execute("select * from 
simple_with_tuple where single_tuple = ({0});".format(n)))))
-            self.assertEqual(0, len(list(session.execute("select * from 
simple_with_tuple where double_tuple = ({0},{0});".format(n)))))
-            self.assertEqual(0, len(list(session.execute("select * from 
simple_with_tuple where triple_tuple = ({0},{0},{0});".format(n)))))
-            self.assertEqual(0, len(list(session.execute("select * from 
simple_with_tuple where nested_one = ({0},({0},{0}));".format(n)))))
+            assert 0 == len(list(session.execute("select * from 
simple_with_tuple where single_tuple = ({0});".format(n))))
+            assert 0 == len(list(session.execute("select * from 
simple_with_tuple where double_tuple = ({0},{0});".format(n))))
+            assert 0 == len(list(session.execute("select * from 
simple_with_tuple where triple_tuple = ({0},{0},{0});".format(n))))
+            assert 0 == len(list(session.execute("select * from 
simple_with_tuple where nested_one = ({0},({0},{0}));".format(n))))
 
-        self.assertEqual(40, len(list(session.execute("select * from 
simple_with_tuple where single_tuple = (-999);"))))
-        self.assertEqual(40, len(list(session.execute("select * from 
simple_with_tuple where double_tuple = (-999,-999);"))))
-        self.assertEqual(40, len(list(session.execute("select * from 
simple_with_tuple where triple_tuple = (-999,-999,-999);"))))
-        self.assertEqual(40, len(list(session.execute("select * from 
simple_with_tuple where nested_one = (-999,(-999,-999));"))))
+        assert 40 == len(list(session.execute("select * from simple_with_tuple 
where single_tuple = (-999);")))
+        assert 40 == len(list(session.execute("select * from simple_with_tuple 
where double_tuple = (-999,-999);")))
+        assert 40 == len(list(session.execute("select * from simple_with_tuple 
where triple_tuple = (-999,-999,-999);")))
+        assert 40 == len(list(session.execute("select * from simple_with_tuple 
where nested_one = (-999,(-999,-999));")))
 
     def test_list_indexes(self):
         """
@@ -731,7 +725,7 @@ class TestSecondaryIndexesOnCollections(Tester):
 
         stmt = ("SELECT * from list_index_search.users where uuids contains 
{some_uuid}").format(some_uuid=uuid.uuid4())
         row = list(session.execute(stmt))
-        self.assertEqual(0, len(row))
+        assert 0 == len(row)
 
         # add a row which doesn't specify data for the indexed column, and 
query again
         user1_uuid = uuid.uuid4()
@@ -742,7 +736,7 @@ class TestSecondaryIndexesOnCollections(Tester):
 
         stmt = ("SELECT * from list_index_search.users where uuids contains 
{some_uuid}").format(some_uuid=uuid.uuid4())
         row = list(session.execute(stmt))
-        self.assertEqual(0, len(row))
+        assert 0 == len(row)
 
         _id = uuid.uuid4()
         # alter the row to add a single item to the indexed list
@@ -752,7 +746,7 @@ class TestSecondaryIndexesOnCollections(Tester):
 
         stmt = ("SELECT * from list_index_search.users where uuids contains 
{some_uuid}").format(some_uuid=_id)
         row = list(session.execute(stmt))
-        self.assertEqual(1, len(row))
+        assert 1 == len(row)
 
         # add a bunch of user records and query them back
         shared_uuid = uuid.uuid4()  # this uuid will be on all records
@@ -779,7 +773,7 @@ class TestSecondaryIndexesOnCollections(Tester):
         stmt = ("SELECT * from list_index_search.users where uuids contains 
{shared_uuid}").format(shared_uuid=shared_uuid)
         rows = list(session.execute(stmt))
         result = [row for row in rows]
-        self.assertEqual(50000, len(result))
+        assert 50000 == len(result)
 
         # shuffle the log in-place, and double-check a slice of records by 
querying the secondary index
         random.shuffle(log)
@@ -789,14 +783,14 @@ class TestSecondaryIndexesOnCollections(Tester):
                     ).format(unshared_uuid=log_entry['unshared_uuid'])
             rows = list(session.execute(stmt))
 
-            self.assertEqual(1, len(rows))
+            assert 1 == len(rows)
 
             db_user_id, db_email, db_uuids = rows[0]
 
-            self.assertEqual(db_user_id, log_entry['user_id'])
-            self.assertEqual(db_email, log_entry['email'])
-            self.assertEqual(str(db_uuids[0]), str(shared_uuid))
-            self.assertEqual(str(db_uuids[1]), str(log_entry['unshared_uuid']))
+            assert db_user_id == log_entry['user_id']
+            assert db_email == log_entry['email']
+            assert str(db_uuids[0]) == str(shared_uuid)
+            assert str(db_uuids[1]) == str(log_entry['unshared_uuid'])
 
     def test_set_indexes(self):
         """
@@ -820,7 +814,7 @@ class TestSecondaryIndexesOnCollections(Tester):
 
         stmt = ("SELECT * from set_index_search.users where uuids contains 
{some_uuid}").format(some_uuid=uuid.uuid4())
         row = list(session.execute(stmt))
-        self.assertEqual(0, len(row))
+        assert 0 == len(row)
 
         # add a row which doesn't specify data for the indexed column, and 
query again
         user1_uuid = uuid.uuid4()
@@ -830,7 +824,7 @@ class TestSecondaryIndexesOnCollections(Tester):
 
         stmt = ("SELECT * from set_index_search.users where uuids contains 
{some_uuid}").format(some_uuid=uuid.uuid4())
         row = list(session.execute(stmt))
-        self.assertEqual(0, len(row))
+        assert 0 == len(row)
 
         _id = uuid.uuid4()
         # alter the row to add a single item to the indexed set
@@ -839,7 +833,7 @@ class TestSecondaryIndexesOnCollections(Tester):
 
         stmt = ("SELECT * from set_index_search.users where uuids contains 
{some_uuid}").format(some_uuid=_id)
         row = list(session.execute(stmt))
-        self.assertEqual(1, len(row))
+        assert 1 == len(row)
 
         # add a bunch of user records and query them back
         shared_uuid = uuid.uuid4()  # this uuid will be on all records
@@ -866,7 +860,7 @@ class TestSecondaryIndexesOnCollections(Tester):
         stmt = ("SELECT * from set_index_search.users where uuids contains 
{shared_uuid}").format(shared_uuid=shared_uuid)
         rows = session.execute(stmt)
         result = [row for row in rows]
-        self.assertEqual(50000, len(result))
+        assert 50000 == len(result)
 
         # shuffle the log in-place, and double-check a slice of records by 
querying the secondary index
         random.shuffle(log)
@@ -876,14 +870,14 @@ class TestSecondaryIndexesOnCollections(Tester):
                     ).format(unshared_uuid=log_entry['unshared_uuid'])
             rows = list(session.execute(stmt))
 
-            self.assertEqual(1, len(rows))
+            assert 1 == len(rows)
 
             db_user_id, db_email, db_uuids = rows[0]
 
-            self.assertEqual(db_user_id, log_entry['user_id'])
-            self.assertEqual(db_email, log_entry['email'])
-            self.assertTrue(shared_uuid in db_uuids)
-            self.assertTrue(log_entry['unshared_uuid'] in db_uuids)
+            assert db_user_id == log_entry['user_id']
+            assert db_email == log_entry['email']
+            assert shared_uuid in db_uuids
+            assert log_entry['unshared_uuid'] in db_uuids
 
     @since('3.0')
     def test_multiple_indexes_on_single_map_column(self):
@@ -916,29 +910,29 @@ class TestSecondaryIndexesOnCollections(Tester):
         session.execute("INSERT INTO map_tbl (id, amap) values (uuid(), 
{'faz': 1, 'baz': 2});")
 
         value_search = list(session.execute("SELECT * FROM map_tbl WHERE amap 
CONTAINS 1"))
-        self.assertEqual(2, len(value_search), "incorrect number of rows when 
querying on map values")
+        assert 2 == len(value_search), "incorrect number of rows when querying 
on map values"
 
         key_search = list(session.execute("SELECT * FROM map_tbl WHERE amap 
CONTAINS KEY 'foo'"))
-        self.assertEqual(1, len(key_search), "incorrect number of rows when 
querying on map keys")
+        assert 1 == len(key_search), "incorrect number of rows when querying 
on map keys"
 
         entries_search = list(session.execute("SELECT * FROM map_tbl WHERE 
amap['foo'] = 1"))
-        self.assertEqual(1, len(entries_search), "incorrect number of rows 
when querying on map entries")
+        assert 1 == len(entries_search), "incorrect number of rows when 
querying on map entries"
 
         session.cluster.refresh_schema_metadata()
         table_meta = 
session.cluster.metadata.keyspaces["map_double_index"].tables["map_tbl"]
-        self.assertEqual(3, len(table_meta.indexes))
-        self.assertItemsEqual(['map_keys', 'map_values', 'map_entries'], 
table_meta.indexes)
-        self.assertEqual(3, 
len(session.cluster.metadata.keyspaces["map_double_index"].indexes))
+        assert 3 == len(table_meta.indexes)
+        assert {'map_keys', 'map_values', 'map_entries'} == 
set(table_meta.indexes.keys())
+        assert 3 == 
len(session.cluster.metadata.keyspaces["map_double_index"].indexes)
 
-        self.assertTrue('map_keys' in table_meta.export_as_string())
-        self.assertTrue('map_values' in table_meta.export_as_string())
-        self.assertTrue('map_entries' in table_meta.export_as_string())
+        assert 'map_keys' in table_meta.export_as_string()
+        assert 'map_values' in table_meta.export_as_string()
+        assert 'map_entries' in table_meta.export_as_string()
 
         session.execute("DROP TABLE map_tbl")
         session.cluster.refresh_schema_metadata()
-        self.assertEqual(0, 
len(session.cluster.metadata.keyspaces["map_double_index"].indexes))
+        assert 0 == 
len(session.cluster.metadata.keyspaces["map_double_index"].indexes)
 
-    @skipIf(OFFHEAP_MEMTABLES, 'Hangs with offheap memtables')
+    @pytest.mark.no_offheap_memtables
     def test_map_indexes(self):
         """
         Checks that secondary indexes on maps work for querying on both keys 
and values
@@ -961,7 +955,7 @@ class TestSecondaryIndexesOnCollections(Tester):
 
         stmt = "SELECT * from map_index_search.users where uuids contains key 
{some_uuid}".format(some_uuid=uuid.uuid4())
         rows = list(session.execute(stmt))
-        self.assertEqual(0, len(rows))
+        assert 0 == len(rows)
 
         # add a row which doesn't specify data for the indexed column, and 
query again
         user1_uuid = uuid.uuid4()
@@ -972,7 +966,7 @@ class TestSecondaryIndexesOnCollections(Tester):
 
         stmt = ("SELECT * from map_index_search.users where uuids contains key 
{some_uuid}").format(some_uuid=uuid.uuid4())
         rows = list(session.execute(stmt))
-        self.assertEqual(0, len(rows))
+        assert 0 == len(rows)
 
         _id = uuid.uuid4()
 
@@ -983,7 +977,7 @@ class TestSecondaryIndexesOnCollections(Tester):
 
         stmt = ("SELECT * from map_index_search.users where uuids contains key 
{some_uuid}").format(some_uuid=_id)
         rows = list(session.execute(stmt))
-        self.assertEqual(1, len(rows))
+        assert 1 == len(rows)
 
         # add a bunch of user records and query them back
         shared_uuid = uuid.uuid4()  # this uuid will be on all records
@@ -1012,7 +1006,7 @@ class TestSecondaryIndexesOnCollections(Tester):
                 ).format(shared_uuid=shared_uuid)
         rows = session.execute(stmt)
         result = [row for row in rows]
-        self.assertEqual(50000, len(result))
+        assert 50000 == len(result)
 
         # shuffle the log in-place, and double-check a slice of records by 
querying the secondary index on keys
         random.shuffle(log)
@@ -1023,15 +1017,15 @@ class TestSecondaryIndexesOnCollections(Tester):
             row = session.execute(stmt)
 
             result = list(row)
-            rows = self.assertEqual(1, len(result))
+            assert 1 == len(result)
 
             db_user_id, db_email, db_uuids = result[0]
 
-            self.assertEqual(db_user_id, log_entry['user_id'])
-            self.assertEqual(db_email, log_entry['email'])
+            assert db_user_id == log_entry['user_id']
+            assert db_email == log_entry['email']
 
-            self.assertTrue(shared_uuid in db_uuids)
-            self.assertTrue(log_entry['unshared_uuid1'] in db_uuids)
+            assert shared_uuid in db_uuids
+            assert log_entry['unshared_uuid1'] in db_uuids
 
         # attempt to add an index on map values as well (should fail pre 3.0)
         stmt = "CREATE INDEX user_uuids_values on map_index_search.users 
(uuids);"
@@ -1066,14 +1060,14 @@ class TestSecondaryIndexesOnCollections(Tester):
                     ).format(unshared_uuid2=log_entry['unshared_uuid2'])
 
             rows = list(session.execute(stmt))
-            self.assertEqual(1, len(rows), rows)
+            assert 1 == len(rows), rows
 
             db_user_id, db_email, db_uuids = rows[0]
-            self.assertEqual(db_user_id, log_entry['user_id'])
-            self.assertEqual(db_email, log_entry['email'])
+            assert db_user_id == log_entry['user_id']
+            assert db_email == log_entry['email']
 
-            self.assertTrue(shared_uuid in db_uuids)
-            self.assertTrue(log_entry['unshared_uuid2'] in db_uuids.values())
+            assert shared_uuid in db_uuids
+            assert log_entry['unshared_uuid2'] in list(db_uuids.values())
 
 
 class TestUpgradeSecondaryIndexes(Tester):
@@ -1106,22 +1100,22 @@ class TestUpgradeSecondaryIndexes(Tester):
         node1.drain()
         node1.watch_log_for("DRAINED")
         node1.stop(wait_other_notice=False)
-        debug("Upgrading to current version")
+        logger.debug("Upgrading to current version")
         self.set_node_to_current_version(node1)
         node1.start(wait_other_notice=True)
 
         [node1] = cluster.nodelist()
         session = self.patient_cql_connection(node1)
-        debug(cluster.cassandra_version())
+        logger.debug(cluster.cassandra_version())
         assert_one(session, query, [0, 0])
 
     def upgrade_to_version(self, tag, nodes=None):
-        debug('Upgrading to ' + tag)
+        logger.debug('Upgrading to ' + tag)
         if nodes is None:
             nodes = self.cluster.nodelist()
 
         for node in nodes:
-            debug('Shutting down node: ' + node.name)
+            logger.debug('Shutting down node: ' + node.name)
             node.drain()
             node.watch_log_for("DRAINED")
             node.stop(wait_other_notice=False)
@@ -1129,25 +1123,24 @@ class TestUpgradeSecondaryIndexes(Tester):
         # Update Cassandra Directory
         for node in nodes:
             node.set_install_dir(version=tag)
-            debug("Set new cassandra dir for %s: %s" % (node.name, 
node.get_install_dir()))
+            logger.debug("Set new cassandra dir for %s: %s" % (node.name, 
node.get_install_dir()))
         self.cluster.set_install_dir(version=tag)
 
         # Restart nodes on new version
         for node in nodes:
-            debug('Starting %s on new version (%s)' % (node.name, tag))
+            logger.debug('Starting %s on new version (%s)' % (node.name, tag))
             # Setup log4j / logback again (necessary moving from 2.0 -> 2.1):
             node.set_log_level("INFO")
             node.start(wait_other_notice=True)
             # node.nodetool('upgradesstables -a')
 
 
-@skipIf(CASSANDRA_VERSION_FROM_BUILD == '3.9', "Test doesn't run on 3.9")
 @since('3.10')
 class TestPreJoinCallback(Tester):
 
-    def __init__(self, *args, **kwargs):
-        # Ignore these log patterns:
-        self.ignore_log_patterns = [
+    @pytest.fixture(autouse=True)
+    def fixture_add_additional_log_patterns(self, fixture_dtest_setup):
+        fixture_dtest_setup.ignore_log_patterns = [
             # ignore all streaming errors during bootstrap
             r'Exception encountered during startup',
             r'Streaming error occurred',
@@ -1156,7 +1149,6 @@ class TestPreJoinCallback(Tester):
             r'\[Stream.*\] Remote peer 127.0.0.\d:7000 failed stream session',
             r'Error while waiting on bootstrap to complete. Bootstrap will 
have to be restarted.'
         ]
-        Tester.__init__(self, *args, **kwargs)
 
     def _base_test(self, joinFn):
         cluster = self.cluster
@@ -1182,16 +1174,16 @@ class TestPreJoinCallback(Tester):
         # Run the join function to test
         joinFn(cluster, tokens[1])
 
-    def bootstrap_test(self):
+    def test_bootstrap(self):
         def bootstrap(cluster, token):
             node2 = new_node(cluster)
             node2.set_configuration_options(values={'initial_token': token})
             node2.start(wait_for_binary_proto=True)
-            self.assertTrue(node2.grep_log('Executing pre-join post-bootstrap 
tasks'))
+            assert node2.grep_log('Executing pre-join post-bootstrap tasks')
 
         self._base_test(bootstrap)
 
-    def resume_test(self):
+    def test_resume(self):
         def resume(cluster, token):
             node1 = cluster.nodes['node1']
             # set up byteman on node1 to inject a failure when streaming to 
node2
@@ -1217,33 +1209,33 @@ class TestPreJoinCallback(Tester):
 
             node2.nodetool("bootstrap resume")
             assert_bootstrap_state(self, node2, 'COMPLETED')
-            self.assertTrue(node2.grep_log('Executing pre-join post-bootstrap 
tasks'))
+            assert node2.grep_log('Executing pre-join post-bootstrap tasks')
 
         self._base_test(resume)
 
-    def manual_join_test(self):
+    def test_manual_join(self):
         def manual_join(cluster, token):
             node2 = new_node(cluster)
             node2.set_configuration_options(values={'initial_token': token})
             node2.start(join_ring=False, wait_for_binary_proto=True, 
wait_other_notice=240)
-            self.assertTrue(node2.grep_log('Not joining ring as requested'))
-            self.assertFalse(node2.grep_log('Executing pre-join'))
+            assert node2.grep_log('Not joining ring as requested')
+            assert not node2.grep_log('Executing pre-join')
 
             node2.nodetool("join")
-            self.assertTrue(node2.grep_log('Executing pre-join post-bootstrap 
tasks'))
+            assert node2.grep_log('Executing pre-join post-bootstrap tasks')
 
         self._base_test(manual_join)
 
-    def write_survey_test(self):
+    def test_write_survey(self):
         def write_survey_and_join(cluster, token):
             node2 = new_node(cluster)
             node2.set_configuration_options(values={'initial_token': token})
             node2.start(jvm_args=["-Dcassandra.write_survey=true"], 
wait_for_binary_proto=True)
-            self.assertTrue(node2.grep_log('Startup complete, but write survey 
mode is active, not becoming an active ring member.'))
-            self.assertFalse(node2.grep_log('Executing pre-join'))
+            assert node2.grep_log('Startup complete, but write survey mode is 
active, not becoming an active ring member.')
+            assert not node2.grep_log('Executing pre-join')
 
             node2.nodetool("join")
-            self.assertTrue(node2.grep_log('Leaving write survey mode and 
joining ring at operator request'))
-            self.assertTrue(node2.grep_log('Executing pre-join post-bootstrap 
tasks'))
+            assert node2.grep_log('Leaving write survey mode and joining ring 
at operator request')
+            assert node2.grep_log('Executing pre-join post-bootstrap tasks')
 
         self._base_test(write_survey_and_join)

http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/snapshot_test.py
----------------------------------------------------------------------
diff --git a/snapshot_test.py b/snapshot_test.py
index 1aa5a70..9b561ce 100644
--- a/snapshot_test.py
+++ b/snapshot_test.py
@@ -4,16 +4,20 @@ import os
 import shutil
 import subprocess
 import time
+import pytest
+import logging
 
 from cassandra.concurrent import execute_concurrent_with_args
 
-from dtest import (Tester, cleanup_cluster, create_ccm_cluster, create_ks,
-                   debug, get_test_path)
+from dtest_setup_overrides import DTestSetupOverrides
+from dtest import Tester, create_ks
 from tools.assertions import assert_one
 from tools.files import replace_in_file, safe_mkdtemp
 from tools.hacks import advance_to_next_cl_segment
-from tools.misc import ImmutableMapping
-from tools.decorators import since
+from tools.misc import ImmutableMapping, get_current_test_name
+
+since = pytest.mark.since
+logger = logging.getLogger(__name__)
 
 
 class SnapshotTester(Tester):
@@ -28,10 +32,10 @@ class SnapshotTester(Tester):
         execute_concurrent_with_args(session, insert_statement, args, 
concurrency=20)
 
     def make_snapshot(self, node, ks, cf, name):
-        debug("Making snapshot....")
+        logger.debug("Making snapshot....")
         node.flush()
         snapshot_cmd = 'snapshot {ks} -cf {cf} -t {name}'.format(ks=ks, cf=cf, 
name=name)
-        debug("Running snapshot cmd: 
{snapshot_cmd}".format(snapshot_cmd=snapshot_cmd))
+        logger.debug("Running snapshot cmd: 
{snapshot_cmd}".format(snapshot_cmd=snapshot_cmd))
         node.nodetool(snapshot_cmd)
         tmpdir = safe_mkdtemp()
         os.mkdir(os.path.join(tmpdir, ks))
@@ -47,8 +51,8 @@ class SnapshotTester(Tester):
                     snapshot_dir = snapshot_dirs[0]
                 else:
                     continue
-            debug("snapshot_dir is : " + snapshot_dir)
-            debug("snapshot copy is : " + tmpdir)
+            logger.debug("snapshot_dir is : " + snapshot_dir)
+            logger.debug("snapshot copy is : " + tmpdir)
 
             # Copy files from the snapshot dir to existing temp dir
             distutils.dir_util.copy_tree(str(snapshot_dir), 
os.path.join(tmpdir, str(x), ks, cf))
@@ -57,8 +61,8 @@ class SnapshotTester(Tester):
         return tmpdir
 
     def restore_snapshot(self, snapshot_dir, node, ks, cf):
-        debug("Restoring snapshot....")
-        for x in xrange(0, self.cluster.data_dir_count):
+        logger.debug("Restoring snapshot....")
+        for x in range(0, self.cluster.data_dir_count):
             snap_dir = os.path.join(snapshot_dir, str(x), ks, cf)
             if os.path.exists(snap_dir):
                 ip = node.address()
@@ -70,11 +74,11 @@ class SnapshotTester(Tester):
 
                 if exit_status != 0:
                     raise Exception("sstableloader command '%s' failed; exit 
status: %d'; stdout: %s; stderr: %s" %
-                                    (" ".join(args), exit_status, stdout, 
stderr))
+                                    (" ".join(args), exit_status, 
stdout.decode("utf-8"), stderr.decode("utf-8")))
 
     def restore_snapshot_schema(self, snapshot_dir, node, ks, cf):
-        debug("Restoring snapshot schema....")
-        for x in xrange(0, self.cluster.data_dir_count):
+        logger.debug("Restoring snapshot schema....")
+        for x in range(0, self.cluster.data_dir_count):
             schema_path = os.path.join(snapshot_dir, str(x), ks, cf, 
'schema.cql')
             if os.path.exists(schema_path):
                 node.run_cqlsh(cmds="SOURCE '%s'" % schema_path)
@@ -96,13 +100,13 @@ class TestSnapshot(SnapshotTester):
         # away when we restore:
         self.insert_rows(session, 100, 200)
         rows = session.execute('SELECT count(*) from ks.cf')
-        self.assertEqual(rows[0][0], 200)
+        assert rows[0][0] == 200
 
         # Drop the keyspace, make sure we have no data:
         session.execute('DROP KEYSPACE ks')
         self.create_schema(session)
         rows = session.execute('SELECT count(*) from ks.cf')
-        self.assertEqual(rows[0][0], 0)
+        assert rows[0][0] == 0
 
         # Restore data from snapshot:
         self.restore_snapshot(snapshot_dir, node1, 'ks', 'cf')
@@ -110,10 +114,10 @@ class TestSnapshot(SnapshotTester):
         rows = session.execute('SELECT count(*) from ks.cf')
 
         # clean up
-        debug("removing snapshot_dir: " + snapshot_dir)
+        logger.debug("removing snapshot_dir: " + snapshot_dir)
         shutil.rmtree(snapshot_dir)
 
-        self.assertEqual(rows[0][0], 100)
+        assert rows[0][0] == 100
 
     @since('3.0')
     def test_snapshot_and_restore_drop_table_remove_dropped_column(self):
@@ -146,7 +150,7 @@ class TestSnapshot(SnapshotTester):
         assert_one(session, "SELECT * FROM ks.cf", [1, "a", "b"])
 
         # Clean up
-        debug("removing snapshot_dir: " + snapshot_dir)
+        logger.debug("removing snapshot_dir: " + snapshot_dir)
         shutil.rmtree(snapshot_dir)
 
     @since('3.11')
@@ -182,22 +186,27 @@ class TestSnapshot(SnapshotTester):
         assert_one(session, "SELECT * FROM ks.cf", [1, "a"])
 
         # Clean up
-        debug("removing snapshot_dir: " + snapshot_dir)
+        logger.debug("removing snapshot_dir: " + snapshot_dir)
         shutil.rmtree(snapshot_dir)
 
 
 class TestArchiveCommitlog(SnapshotTester):
-    cluster_options = ImmutableMapping({'commitlog_segment_size_in_mb': 1})
+
+    @pytest.fixture(scope='function', autouse=True)
+    def fixture_dtest_setup_overrides(self):
+        dtest_setup_overrides = DTestSetupOverrides()
+        dtest_setup_overrides.cluster_options = ImmutableMapping({'start_rpc': 
'true'})
+        return dtest_setup_overrides
 
     def make_snapshot(self, node, ks, cf, name):
-        debug("Making snapshot....")
+        logger.debug("Making snapshot....")
         node.flush()
         snapshot_cmd = 'snapshot {ks} -cf {cf} -t {name}'.format(ks=ks, cf=cf, 
name=name)
-        debug("Running snapshot cmd: 
{snapshot_cmd}".format(snapshot_cmd=snapshot_cmd))
+        logger.debug("Running snapshot cmd: 
{snapshot_cmd}".format(snapshot_cmd=snapshot_cmd))
         node.nodetool(snapshot_cmd)
         tmpdirs = []
         base_tmpdir = safe_mkdtemp()
-        for x in xrange(0, self.cluster.data_dir_count):
+        for x in range(0, self.cluster.data_dir_count):
             tmpdir = os.path.join(base_tmpdir, str(x))
             os.mkdir(tmpdir)
             # Copy files from the snapshot dir to existing temp dir
@@ -207,7 +216,7 @@ class TestArchiveCommitlog(SnapshotTester):
         return tmpdirs
 
     def restore_snapshot(self, snapshot_dir, node, ks, cf, name):
-        debug("Restoring snapshot for cf ....")
+        logger.debug("Restoring snapshot for cf ....")
         data_dir = os.path.join(node.get_path(), 
'data{0}'.format(os.path.basename(snapshot_dir)))
         cfs = [s for s in os.listdir(snapshot_dir) if s.startswith(cf + "-")]
         if len(cfs) > 0:
@@ -220,7 +229,7 @@ class TestArchiveCommitlog(SnapshotTester):
                     os.mkdir(os.path.join(data_dir, ks))
                 os.mkdir(os.path.join(data_dir, ks, cf_id))
 
-                debug("snapshot_dir is : " + snapshot_dir)
+                logger.debug("snapshot_dir is : " + snapshot_dir)
                 distutils.dir_util.copy_tree(snapshot_dir, 
os.path.join(data_dir, ks, cf_id))
 
     def test_archive_commitlog(self):
@@ -232,7 +241,7 @@ class TestArchiveCommitlog(SnapshotTester):
         """
         self.run_archive_commitlog(restore_point_in_time=False, 
archive_active_commitlogs=True)
 
-    def dont_test_archive_commitlog(self):
+    def test_dont_archive_commitlog(self):
         """
         Run the archive commitlog test, but forget to add the restore commands
         """
@@ -267,7 +276,7 @@ class TestArchiveCommitlog(SnapshotTester):
 
         # Create a temp directory for storing commitlog archives:
         tmp_commitlog = safe_mkdtemp()
-        debug("tmp_commitlog: " + tmp_commitlog)
+        logger.debug("tmp_commitlog: " + tmp_commitlog)
 
         # Edit commitlog_archiving.properties and set an archive
         # command:
@@ -289,14 +298,14 @@ class TestArchiveCommitlog(SnapshotTester):
         )
 
         session.execute('CREATE TABLE ks.cf ( key bigint PRIMARY KEY, val 
text);')
-        debug("Writing first 30,000 rows...")
+        logger.debug("Writing first 30,000 rows...")
         self.insert_rows(session, 0, 30000)
         # Record when this first set of inserts finished:
         insert_cutoff_times = [time.gmtime()]
 
         # Delete all commitlog backups so far:
         for f in glob.glob(tmp_commitlog + "/*"):
-            debug('Removing {}'.format(f))
+            logger.debug('Removing {}'.format(f))
             os.remove(f)
 
         snapshot_dirs = self.make_snapshot(node1, 'ks', 'cf', 'basic')
@@ -323,14 +332,14 @@ class TestArchiveCommitlog(SnapshotTester):
 
         try:
             # Write more data:
-            debug("Writing second 30,000 rows...")
+            logger.debug("Writing second 30,000 rows...")
             self.insert_rows(session, 30000, 60000)
             node1.flush()
             time.sleep(10)
             # Record when this second set of inserts finished:
             insert_cutoff_times.append(time.gmtime())
 
-            debug("Writing final 5,000 rows...")
+            logger.debug("Writing final 5,000 rows...")
             self.insert_rows(session, 60000, 65000)
             # Record when the third set of inserts finished:
             insert_cutoff_times.append(time.gmtime())
@@ -340,17 +349,16 @@ class TestArchiveCommitlog(SnapshotTester):
 
             rows = session.execute('SELECT count(*) from ks.cf')
             # Make sure we have the same amount of rows as when we snapshotted:
-            self.assertEqual(rows[0][0], 65000)
+            assert rows[0][0] == 65000
 
             # Check that there are at least one commit log backed up that
             # is not one of the active commit logs:
             commitlog_dir = os.path.join(node1.get_path(), 'commitlogs')
-            debug("node1 commitlog dir: " + commitlog_dir)
-            debug("node1 commitlog dir contents: " + 
str(os.listdir(commitlog_dir)))
-            debug("tmp_commitlog contents: " + str(os.listdir(tmp_commitlog)))
+            logger.debug("node1 commitlog dir: " + commitlog_dir)
+            logger.debug("node1 commitlog dir contents: " + 
str(os.listdir(commitlog_dir)))
+            logger.debug("tmp_commitlog contents: " + 
str(os.listdir(tmp_commitlog)))
 
-            self.assertNotEqual(set(os.listdir(tmp_commitlog)) - 
set(os.listdir(commitlog_dir)),
-                                set())
+            assert_directory_not_empty(tmp_commitlog, commitlog_dir)
 
             cluster.flush()
             cluster.compact()
@@ -358,15 +366,16 @@ class TestArchiveCommitlog(SnapshotTester):
 
             # Destroy the cluster
             cluster.stop()
-            debug("node1 commitlog dir contents after stopping: " + 
str(os.listdir(commitlog_dir)))
-            debug("tmp_commitlog contents after stopping: " + 
str(os.listdir(tmp_commitlog)))
+            logger.debug("node1 commitlog dir contents after stopping: " + 
str(os.listdir(commitlog_dir)))
+            logger.debug("tmp_commitlog contents after stopping: " + 
str(os.listdir(tmp_commitlog)))
 
-            self.copy_logs(self.cluster, name=self.id().split(".")[0] + 
"_pre-restore")
-            cleanup_cluster(self.cluster, self.test_path)
-            self.test_path = get_test_path()
-            cluster = self.cluster = create_ccm_cluster(self.test_path, 
name='test')
+            self.copy_logs(name=get_current_test_name() + "_pre-restore")
+            self.fixture_dtest_setup.cleanup_and_replace_cluster()
+            cluster = self.cluster
             cluster.populate(1)
-            node1, = cluster.nodelist()
+            nodes = cluster.nodelist()
+            assert len(nodes) == 1
+            node1 = nodes[0]
 
             # Restore schema from snapshots:
             for system_ks_snapshot_dir in system_ks_snapshot_dirs:
@@ -400,7 +409,7 @@ class TestArchiveCommitlog(SnapshotTester):
 
             rows = session.execute('SELECT count(*) from ks.cf')
             # Make sure we have the same amount of rows as when we snapshotted:
-            self.assertEqual(rows[0][0], 30000)
+            assert rows[0][0] == 30000
 
             # Edit commitlog_archiving.properties. Remove the archive
             # command  and set a restore command and restore_directories:
@@ -416,7 +425,7 @@ class TestArchiveCommitlog(SnapshotTester):
                     replace_in_file(os.path.join(node1.get_path(), 'conf', 
'commitlog_archiving.properties'),
                                     [(r'^restore_point_in_time=.*$', 
'restore_point_in_time={restore_time}'.format(restore_time=restore_time))])
 
-            debug("Restarting node1..")
+            logger.debug("Restarting node1..")
             node1.stop()
             node1.start(wait_for_binary_proto=True)
 
@@ -428,31 +437,31 @@ class TestArchiveCommitlog(SnapshotTester):
             # Now we should have 30000 rows from the snapshot + 30000 rows
             # from the commitlog backups:
             if not restore_archived_commitlog:
-                self.assertEqual(rows[0][0], 30000)
+                assert rows[0][0] == 30000
             elif restore_point_in_time:
-                self.assertEqual(rows[0][0], 60000)
+                assert rows[0][0] == 60000
             else:
-                self.assertEqual(rows[0][0], 65000)
+                assert rows[0][0] == 65000
 
         finally:
             # clean up
-            debug("removing snapshot_dir: " + ",".join(snapshot_dirs))
+            logger.debug("removing snapshot_dir: " + ",".join(snapshot_dirs))
             for snapshot_dir in snapshot_dirs:
                 shutil.rmtree(snapshot_dir)
-            debug("removing snapshot_dir: " + 
",".join(system_ks_snapshot_dirs))
+            logger.debug("removing snapshot_dir: " + 
",".join(system_ks_snapshot_dirs))
             for system_ks_snapshot_dir in system_ks_snapshot_dirs:
                 shutil.rmtree(system_ks_snapshot_dir)
-            debug("removing snapshot_dir: " + 
",".join(system_cfs_snapshot_dirs))
+            logger.debug("removing snapshot_dir: " + 
",".join(system_cfs_snapshot_dirs))
             for system_cfs_snapshot_dir in system_cfs_snapshot_dirs:
                 shutil.rmtree(system_cfs_snapshot_dir)
-            debug("removing snapshot_dir: " + 
",".join(system_ut_snapshot_dirs))
+            logger.debug("removing snapshot_dir: " + 
",".join(system_ut_snapshot_dirs))
             for system_ut_snapshot_dir in system_ut_snapshot_dirs:
                 shutil.rmtree(system_ut_snapshot_dir)
-            debug("removing snapshot_dir: " + 
",".join(system_col_snapshot_dirs))
+            logger.debug("removing snapshot_dir: " + 
",".join(system_col_snapshot_dirs))
             for system_col_snapshot_dir in system_col_snapshot_dirs:
                 shutil.rmtree(system_col_snapshot_dir)
 
-            debug("removing tmp_commitlog: " + tmp_commitlog)
+            logger.debug("removing tmp_commitlog: " + tmp_commitlog)
             shutil.rmtree(tmp_commitlog)
 
     def test_archive_and_restore_commitlog_repeatedly(self):
@@ -461,14 +470,13 @@ class TestArchiveCommitlog(SnapshotTester):
         Run archive commit log restoration test repeatedly to make sure it is 
idempotent
         and doesn't fail if done repeatedly
         """
-
         cluster = self.cluster
         cluster.populate(1)
         node1 = cluster.nodelist()[0]
 
         # Create a temp directory for storing commitlog archives:
         tmp_commitlog = safe_mkdtemp()
-        debug("tmp_commitlog: {}".format(tmp_commitlog))
+        logger.debug("tmp_commitlog: {}".format(tmp_commitlog))
 
         # Edit commitlog_archiving.properties and set an archive
         # command:
@@ -481,32 +489,31 @@ class TestArchiveCommitlog(SnapshotTester):
 
         cluster.start(wait_for_binary_proto=True)
 
-        debug("Creating initial connection")
+        logger.debug("Creating initial connection")
         session = self.patient_cql_connection(node1)
         create_ks(session, 'ks', 1)
         session.execute('CREATE TABLE ks.cf ( key bigint PRIMARY KEY, val 
text);')
-        debug("Writing 30,000 rows...")
+        logger.debug("Writing 30,000 rows...")
         self.insert_rows(session, 0, 60000)
 
         try:
             # Check that there are at least one commit log backed up that
             # is not one of the active commit logs:
             commitlog_dir = os.path.join(node1.get_path(), 'commitlogs')
-            debug("node1 commitlog dir: " + commitlog_dir)
+            logger.debug("node1 commitlog dir: " + commitlog_dir)
 
             cluster.flush()
 
-            self.assertNotEqual(set(os.listdir(tmp_commitlog)) - 
set(os.listdir(commitlog_dir)),
-                                set())
+            assert_directory_not_empty(tmp_commitlog, commitlog_dir)
 
-            debug("Flushing and doing first restart")
+            logger.debug("Flushing and doing first restart")
             cluster.compact()
             node1.drain()
             # restart the node which causes the active commitlogs to be 
archived
             node1.stop()
             node1.start(wait_for_binary_proto=True)
 
-            debug("Stopping and second restart")
+            logger.debug("Stopping and second restart")
             node1.stop()
             node1.start(wait_for_binary_proto=True)
 
@@ -514,7 +521,14 @@ class TestArchiveCommitlog(SnapshotTester):
             session = self.patient_cql_connection(node1)
 
             rows = session.execute('SELECT count(*) from ks.cf')
-            self.assertEqual(rows[0][0], 60000)
+            assert rows[0][0] == 60000
         finally:
-            debug("removing tmp_commitlog: " + tmp_commitlog)
+            logger.debug("removing tmp_commitlog: " + tmp_commitlog)
             shutil.rmtree(tmp_commitlog)
+
+
+def assert_directory_not_empty(tmp_commitlog, commitlog_dir):
+    commitlog_dir_ret = set(commitlog_dir)
+    for tmp_commitlog_file in set(os.listdir(tmp_commitlog)):
+        commitlog_dir_ret.discard(tmp_commitlog_file)
+    assert len(commitlog_dir_ret) != 0

http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/snitch_test.py
----------------------------------------------------------------------
diff --git a/snitch_test.py b/snitch_test.py
index 334a1a1..164c81e 100644
--- a/snitch_test.py
+++ b/snitch_test.py
@@ -1,16 +1,17 @@
 import os
 import socket
 import time
-
-from nose.plugins.attrib import attr
+import pytest
+import logging
 
 from cassandra import ConsistencyLevel
-from dtest import Tester, debug
-from nose.tools import assert_true, assert_equal, assert_greater_equal
-from tools.decorators import since
+from dtest import Tester
 from tools.jmxutils import (JolokiaAgent, make_mbean,
                             remove_perf_disable_shared_mem)
 
+since = pytest.mark.since
+logger = logging.getLogger(__name__)
+
 
 @since('2.2.5')
 class TestGossipingPropertyFileSnitch(Tester):
@@ -103,21 +104,21 @@ class TestGossipingPropertyFileSnitch(Tester):
         # read data from node2 just to make sure data and connectivity is OK
         session = self.patient_exclusive_cql_connection(node2)
         new_rows = list(session.execute("SELECT * FROM 
{}".format(stress_table)))
-        self.assertEquals(original_rows, new_rows)
+        assert original_rows == new_rows
 
         out, err, _ = node1.nodetool('gossipinfo')
-        self.assertEqual(0, len(err), err)
-        debug(out)
+        assert 0 == len(err), err
+        logger.debug(out)
 
-        self.assertIn("/{}".format(NODE1_BROADCAST_ADDRESS), out)
-        self.assertIn("INTERNAL_IP:{}:{}".format('9' if running40 else '6', 
NODE1_LISTEN_ADDRESS), out)
-        
self.assertIn("INTERNAL_ADDRESS_AND_PORT:7:{}".format(NODE1_40_LISTEN_ADDRESS), 
out)
-        self.assertIn("/{}".format(NODE2_BROADCAST_ADDRESS), out)
-        self.assertIn("INTERNAL_IP:{}:{}".format('9' if running40 else '6', 
NODE2_LISTEN_ADDRESS), out)
-        
self.assertIn("INTERNAL_ADDRESS_AND_PORT:7:{}".format(NODE1_40_LISTEN_ADDRESS), 
out)
+        assert "/{}".format(NODE1_BROADCAST_ADDRESS) in out
+        assert "INTERNAL_IP:{}:{}".format('9' if running40 else '6', 
NODE1_LISTEN_ADDRESS) in out
+        assert 
"INTERNAL_ADDRESS_AND_PORT:7:{}".format(NODE1_40_LISTEN_ADDRESS) in out
+        assert "/{}".format(NODE2_BROADCAST_ADDRESS) in out
+        assert "INTERNAL_IP:{}:{}".format('9' if running40 else '6', 
NODE2_LISTEN_ADDRESS) in out
+        assert 
"INTERNAL_ADDRESS_AND_PORT:7:{}".format(NODE1_40_LISTEN_ADDRESS) in out
 
 class TestDynamicEndpointSnitch(Tester):
-    @attr('resource-intensive')
+    @pytest.mark.resource_intensive
     @since('3.10')
     def test_multidatacenter_local_quorum(self):
         '''
@@ -175,20 +176,18 @@ class TestDynamicEndpointSnitch(Tester):
                 for x in range(0, 300):
                     degraded_reads_before = bad_jmx.read_attribute(read_stage, 
'Value')
                     scores_before = jmx.read_attribute(des, 'Scores')
-                    assert_true(no_cross_dc(scores_before, [node4, node5, 
node6]),
-                                "Cross DC scores were present: " + 
str(scores_before))
+                    assert no_cross_dc(scores_before, [node4, node5, node6]), 
"Cross DC scores were present: " + str(scores_before)
                     future = session.execute_async(read_stmt, [x])
                     future.result()
                     scores_after = jmx.read_attribute(des, 'Scores')
-                    assert_true(no_cross_dc(scores_after, [node4, node5, 
node6]),
-                                "Cross DC scores were present: " + 
str(scores_after))
+                    assert no_cross_dc(scores_after, [node4, node5, node6]), 
"Cross DC scores were present: " + str(scores_after)
 
                     if snitchable(scores_before, scores_after,
                                   [coordinator_node, healthy_node, 
degraded_node]):
                         snitchable_count = snitchable_count + 1
                         # If the DES correctly routed the read around the 
degraded node,
                         # it shouldn't have another completed read request in 
metrics
-                        assert_equal(degraded_reads_before,
+                        assert (degraded_reads_before ==
                                      bad_jmx.read_attribute(read_stage, 
'Value'))
                     else:
                         # sleep to give dynamic snitch time to recalculate 
scores
@@ -196,4 +195,4 @@ class TestDynamicEndpointSnitch(Tester):
 
                 # check that most reads were snitchable, with some
                 # room allowed in case score recalculation is slow
-                assert_greater_equal(snitchable_count, 250)
+                assert snitchable_count >= 250

http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/sslnodetonode_test.py
----------------------------------------------------------------------
diff --git a/sslnodetonode_test.py b/sslnodetonode_test.py
index 4c4a188..c34fa11 100644
--- a/sslnodetonode_test.py
+++ b/sslnodetonode_test.py
@@ -2,10 +2,14 @@ import os
 import os.path
 import shutil
 import time
+import pytest
+import logging
 
 from dtest import Tester
 from tools import sslkeygen
-from tools.decorators import since
+
+since = pytest.mark.since
+logger = logging.getLogger(__name__)
 
 # as the error message logged will be different per netty ssl implementation 
(jdk vs openssl (libre vs boring vs ...)),
 # the best we can do is just look for a SSLHandshakeException
@@ -16,9 +20,8 @@ _LOG_ERR_GENERAL = "javax.net.ssl.SSLException"
 @since('3.6')
 class TestNodeToNodeSSLEncryption(Tester):
 
-    def ssl_enabled_test(self):
+    def test_ssl_enabled(self):
         """Should be able to start with valid ssl options"""
-
         credNode1 = sslkeygen.generate_credentials("127.0.0.1")
         credNode2 = sslkeygen.generate_credentials("127.0.0.2", 
credNode1.cakeystore, credNode1.cacert)
 
@@ -26,21 +29,19 @@ class TestNodeToNodeSSLEncryption(Tester):
         self.cluster.start()
         self.cql_connection(self.node1)
 
-    def ssl_correct_hostname_with_validation_test(self):
+    def test_ssl_correct_hostname_with_validation(self):
         """Should be able to start with valid ssl options"""
-
         credNode1 = sslkeygen.generate_credentials("127.0.0.1")
         credNode2 = sslkeygen.generate_credentials("127.0.0.2", 
credNode1.cakeystore, credNode1.cacert)
 
         self.setup_nodes(credNode1, credNode2, endpoint_verification=True)
-        self.allow_log_errors = False
+        self.fixture_dtest_setup.allow_log_errors = False
         self.cluster.start()
         time.sleep(2)
         self.cql_connection(self.node1)
 
-    def ssl_wrong_hostname_no_validation_test(self):
+    def test_ssl_wrong_hostname_no_validation(self):
         """Should be able to start with valid ssl options"""
-
         credNode1 = sslkeygen.generate_credentials("127.0.0.80")
         credNode2 = sslkeygen.generate_credentials("127.0.0.81", 
credNode1.cakeystore, credNode1.cacert)
 
@@ -49,49 +50,46 @@ class TestNodeToNodeSSLEncryption(Tester):
         time.sleep(2)
         self.cql_connection(self.node1)
 
-    def ssl_wrong_hostname_with_validation_test(self):
+    def test_ssl_wrong_hostname_with_validation(self):
         """Should be able to start with valid ssl options"""
-
         credNode1 = sslkeygen.generate_credentials("127.0.0.80")
         credNode2 = sslkeygen.generate_credentials("127.0.0.81", 
credNode1.cakeystore, credNode1.cacert)
 
         self.setup_nodes(credNode1, credNode2, endpoint_verification=True)
 
-        self.allow_log_errors = True
+        self.fixture_dtest_setup.allow_log_errors = True
         self.cluster.start(no_wait=True)
 
         found = self._grep_msg(self.node1, _LOG_ERR_HANDSHAKE, 
_LOG_ERR_GENERAL)
-        self.assertTrue(found)
+        assert found
 
         found = self._grep_msg(self.node2, _LOG_ERR_HANDSHAKE, 
_LOG_ERR_GENERAL)
-        self.assertTrue(found)
+        assert found
 
         self.cluster.stop()
 
-    def ssl_client_auth_required_fail_test(self):
+    def test_ssl_client_auth_required_fail(self):
         """peers need to perform mutual auth (cient auth required), but do not 
supply the local cert"""
-
         credNode1 = sslkeygen.generate_credentials("127.0.0.1")
         credNode2 = sslkeygen.generate_credentials("127.0.0.2")
 
         self.setup_nodes(credNode1, credNode2, client_auth=True)
 
-        self.allow_log_errors = True
+        self.fixture_dtest_setup.allow_log_errors = True
         self.cluster.start(no_wait=True)
         time.sleep(2)
 
         found = self._grep_msg(self.node1, _LOG_ERR_HANDSHAKE, 
_LOG_ERR_GENERAL)
-        self.assertTrue(found)
+        assert found
 
         found = self._grep_msg(self.node2, _LOG_ERR_HANDSHAKE, 
_LOG_ERR_GENERAL)
-        self.assertTrue(found)
+        assert found
 
         self.cluster.stop()
-        self.assertTrue(found)
+        assert found
 
-    def ssl_client_auth_required_succeed_test(self):
+    def test_ssl_client_auth_required_succeed(self):
         """peers need to perform mutual auth (cient auth required), but do not 
supply the loca cert"""
-
         credNode1 = sslkeygen.generate_credentials("127.0.0.1")
         credNode2 = sslkeygen.generate_credentials("127.0.0.2", 
credNode1.cakeystore, credNode1.cacert)
         sslkeygen.import_cert(credNode1.basedir, 'ca127.0.0.2', 
credNode2.cacert, credNode1.cakeystore)
@@ -102,23 +100,22 @@ class TestNodeToNodeSSLEncryption(Tester):
         self.cluster.start()
         self.cql_connection(self.node1)
 
-    def ca_mismatch_test(self):
+    def test_ca_mismatch(self):
         """CA mismatch should cause nodes to fail to connect"""
-
         credNode1 = sslkeygen.generate_credentials("127.0.0.1")
         credNode2 = sslkeygen.generate_credentials("127.0.0.2")  # mismatching 
CA!
 
         self.setup_nodes(credNode1, credNode2)
 
-        self.allow_log_errors = True
+        self.fixture_dtest_setup.allow_log_errors = True
         self.cluster.start(no_wait=True)
 
         found = self._grep_msg(self.node1, _LOG_ERR_HANDSHAKE)
         self.cluster.stop()
-        self.assertTrue(found)
+        assert found
 
     @since('4.0')
-    def optional_outbound_tls_test(self):
+    def test_optional_outbound_tls(self):
         """listen on TLS port, but optionally connect using TLS. this supports 
the upgrade case of starting with a non-encrypted cluster and then upgrading 
each node to use encryption.
 
         @jira_ticket CASSANDRA-10404


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to