http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/meta_tests/assertion_test.py
----------------------------------------------------------------------
diff --git a/meta_tests/assertion_test.py b/meta_tests/assertion_test.py
index a056e47..2c3fe09 100644
--- a/meta_tests/assertion_test.py
+++ b/meta_tests/assertion_test.py
@@ -34,14 +34,14 @@ class TestAssertStderrClean(TestCase):
 
     def test_invalid_error(self):
         err = "This string is no good and should fail."
-        with self.assertRaises(AssertionError):
+        with pytest.raises(AssertionError):
             assert_stderr_clean(err)
 
     def test_valid_and_invalid_errors_same_line(self):
         err = ("This string is no good and should fail.objc[36358]: Class 
JavaLaunchHelper is implemented in both 
/Library/Java/JavaVirtualMachines/jdk1.8.0_91.jdk"
                "/Contents/Home/bin/java and 
/Library/Java/JavaVirtualMachines/jdk1.8.0_91.jdk/Contents/Home/jre/lib/libinstrument.dylib."
                "One of the two will be used. Which one is undefined.")
-        with self.assertRaises(AssertionError):
+        with pytest.raises(AssertionError):
             assert_stderr_clean(err)
 
     def test_invalid_error_after_valid_error(self):
@@ -49,7 +49,7 @@ class TestAssertStderrClean(TestCase):
                /Contents/Home/bin/java and 
/Library/Java/JavaVirtualMachines/jdk1.8.0_91.jdk/Contents/Home/jre/lib/libinstrument.dylib.
                One of the two will be used. Which one is undefined.
                This string is no good and should fail."""
-        with self.assertRaises(AssertionError):
+        with pytest.raises(AssertionError):
             assert_stderr_clean(err)
 
     def test_invalid_error_before_valid_errors(self):
@@ -57,13 +57,13 @@ class TestAssertStderrClean(TestCase):
                  Class JavaLaunchHelper is implemented in both 
/Library/Java/JavaVirtualMachines/jdk1.8.0_91.jdk/Contents/Home/bin/java
                  and 
/Library/Java/JavaVirtualMachines/jdk1.8.0_91.jdk/Contents/Home/jre/lib/libinstrument.dylib.
 One of the two will be used. Which one is undefined.
                  """
-        with self.assertRaises(AssertionError):
+        with pytest.raises(AssertionError):
             assert_stderr_clean(err)
 
 
 class TestAssertionMethods(TestCase):
 
-    def assertions_test(self):
+    def test_assertions(self):
         # assert_exception_test
         mock_session = Mock(**{'execute.side_effect': AlreadyExists("Dummy 
exception message.")})
         assert_exception(mock_session, "DUMMY QUERY", expected=AlreadyExists)
@@ -111,5 +111,5 @@ class TestAssertionMethods(TestCase):
         assert_almost_equal(1, 1.1, 1.3, error=.31)
 
     def test_almost_equal_expect_failure(self):
-        with self.assertRaises(AssertionError):
+        with pytest.raises(AssertionError):
             assert_almost_equal(1, 1.3, error=.1)

http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/meta_tests/utils_test/funcutils_test.py
----------------------------------------------------------------------
diff --git a/meta_tests/utils_test/funcutils_test.py 
b/meta_tests/utils_test/funcutils_test.py
index 3bbddd5..4d99c36 100644
--- a/meta_tests/utils_test/funcutils_test.py
+++ b/meta_tests/utils_test/funcutils_test.py
@@ -18,7 +18,7 @@ class Testget_rate_limited_function(TestCase):
         """
         self.assertIs(rate_limited_func_arg.func, self.mock_func)
         self.assertIs(rate_limited_func_arg.limit, self.mock_limit)
-        self.assertEqual(rate_limited_func_arg.last_called, False)
+        assert rate_limited_func_arg.last_called == False
 
     def test_init_with_positional_args(self):
         """
@@ -106,16 +106,16 @@ class Testget_rate_limited_function(TestCase):
         If you call a rate-limited function, last_called is set to a new value.
         """
         self.rate_limited_func.limit = 1
-        self.assertEqual(self.rate_limited_func.last_called, False)
+        assert self.rate_limited_func.last_called == False
         self.rate_limited_func()
-        self.assertAlmostEqual(self.rate_limited_func.last_called, time(), 
places=2)
+        assert abs(round(self.rate_limited_func.last_called, 2) - 
round(time(), 2)) <= 0.0
 
     def test_last_called_not_set_when_called_within_time_limit(self):
         """
         If you call a rate-limited function during the time limit, last_called 
is not set to a new value.
         """
         self.rate_limited_func.limit = 1
-        self.assertEqual(self.rate_limited_func.last_called, False)
+        assert self.rate_limited_func.last_called == False
         self.rate_limited_func()
         last_called = self.rate_limited_func.last_called
         self.rate_limited_func()

http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/meta_tests/utils_test/metadata_wrapper_test.py
----------------------------------------------------------------------
diff --git a/meta_tests/utils_test/metadata_wrapper_test.py 
b/meta_tests/utils_test/metadata_wrapper_test.py
index 9009eac..4a8afc5 100644
--- a/meta_tests/utils_test/metadata_wrapper_test.py
+++ b/meta_tests/utils_test/metadata_wrapper_test.py
@@ -10,7 +10,7 @@ from tools.metadata_wrapper import 
(UpdatingClusterMetadataWrapper,
 
 
 class UpdatingMetadataWrapperBaseTest(TestCase):
-    def all_subclasses_known_test(self):
+    def test_all_subclasses_known(self):
         """
         Test that all the subclasses of UpdatingMetadataWrapperBase are known
         to this test suite. Basically, this will slap us on the wrist in the
@@ -40,7 +40,7 @@ class UpdatingMetadataWrapperBaseTest(TestCase):
             init_args = [MagicMock() for _ in range(init_arg_len - 1)]
             yield klaus(*init_args)
 
-    def all_subclasses_defer_getattr_test(self):
+    def test_all_subclasses_defer_getattr(self):
         """
         Each subclass should defer its attribute accesses to the wrapped
         object.
@@ -48,7 +48,7 @@ class UpdatingMetadataWrapperBaseTest(TestCase):
         for wrapper in self._each_subclass_instantiated_with_mock_args():
             self.assertIs(wrapper.foo, wrapper._wrapped.foo)
 
-    def all_subclasses_defer_getitem_test(self):
+    def test_all_subclasses_defer_getitem(self):
         """
         Each subclass should defer its item accesses to the wrapped object.
         """
@@ -57,8 +57,8 @@ class UpdatingMetadataWrapperBaseTest(TestCase):
             # from _wrapped[Y] for all Y
             wrapper._wrapped.__getitem__.side_effect = hash
             # check mocking correctness
-            self.assertNotEqual(wrapper['foo'], wrapper._wrapped['bar'])
-            self.assertEqual(wrapper['bar'], wrapper._wrapped['bar'])
+            assert wrapper['foo'] != wrapper._wrapped['bar']
+            assert wrapper['bar'] == wrapper._wrapped['bar']
 
 
 class UpdatingTableMetadataWrapperTest(TestCase):
@@ -74,7 +74,7 @@ class UpdatingTableMetadataWrapperTest(TestCase):
             max_schema_agreement_wait=self.max_schema_agreement_wait_sentinel
         )
 
-    def wrapped_access_calls_refresh_test(self):
+    def test_wrapped_access_calls_refresh(self):
         """
         Accessing the wrapped object should call the table-refreshing method on
         the cluster.
@@ -87,7 +87,7 @@ class UpdatingTableMetadataWrapperTest(TestCase):
             max_schema_agreement_wait=self.max_schema_agreement_wait_sentinel
         )
 
-    def default_wrapper_max_schema_agreement_wait_is_None_test(self):
+    def test_default_wrapper_max_schema_agreement_wait_is_None(self):
         wrapper = UpdatingTableMetadataWrapper(
             cluster=self.cluster_mock,
             ks_name=self.ks_name_sentinel,
@@ -100,7 +100,7 @@ class UpdatingTableMetadataWrapperTest(TestCase):
             max_schema_agreement_wait=None
         )
 
-    def wrapped_returns_table_metadata_test(self):
+    def test_wrapped_returns_table_metadata(self):
         """
         The wrapped object is accessed correctly from the internal cluster 
object.
         """
@@ -115,17 +115,12 @@ class UpdatingTableMetadataWrapperTest(TestCase):
         
keyspaces_defaultdict[self.ks_name_sentinel].tables.__getitem__.side_effect = 
hash
 
         # check mocking correctness
-        self.assertNotEqual(
-            self.wrapper._wrapped,
-            
self.cluster_mock.metadata.keyspaces[self.ks_name_sentinel].tables['foo']
-        )
+        assert self.wrapper._wrapped != 
self.cluster_mock.metadata.keyspaces[self.ks_name_sentinel].tables['foo']
+
         # and this is the behavior we care about
-        self.assertEqual(
-            self.wrapper._wrapped,
-            
self.cluster_mock.metadata.keyspaces[self.ks_name_sentinel].tables[self.table_name_sentinel]
-        )
+        assert self.wrapper._wrapped 
==self.cluster_mock.metadata.keyspaces[self.ks_name_sentinel].tables[self.table_name_sentinel]
 
-    def repr_test(self):
+    def test_repr(self):
         self.assertEqual(
             repr(self.wrapper),
             'UpdatingTableMetadataWrapper(cluster={}, ks_name={}, 
table_name={}, max_schema_agreement_wait={})'.format(
@@ -145,7 +140,7 @@ class UpdatingKeyspaceMetadataWrapperTest(TestCase):
             max_schema_agreement_wait=self.max_schema_agreement_wait_sentinel
         )
 
-    def wrapped_access_calls_refresh_test(self):
+    def test_wrapped_access_calls_refresh(self):
         """
         Accessing the wrapped object should call the keyspace-refreshing method
         on the cluster.
@@ -157,7 +152,7 @@ class UpdatingKeyspaceMetadataWrapperTest(TestCase):
             max_schema_agreement_wait=self.max_schema_agreement_wait_sentinel
         )
 
-    def default_wrapper_max_schema_agreement_wait_is_None_test(self):
+    def test_default_wrapper_max_schema_agreement_wait_is_None(self):
         wrapper = UpdatingKeyspaceMetadataWrapper(
             cluster=self.cluster_mock,
             ks_name=self.ks_name_sentinel
@@ -168,7 +163,7 @@ class UpdatingKeyspaceMetadataWrapperTest(TestCase):
             max_schema_agreement_wait=None
         )
 
-    def wrapped_returns_keyspace_metadata_test(self):
+    def test_wrapped_returns_keyspace_metadata(self):
         """
         The wrapped object is accessed correctly from the internal cluster 
object.
         """
@@ -176,10 +171,10 @@ class UpdatingKeyspaceMetadataWrapperTest(TestCase):
         # from keyspaces[Y] for all Y
         self.cluster_mock.metadata.keyspaces.__getitem__.side_effect = hash
         # check mocking correctness
-        self.assertNotEqual(self.wrapper._wrapped, 
self.cluster_mock.metadata.keyspaces['foo'])
-        self.assertEqual(self.wrapper._wrapped, 
self.cluster_mock.metadata.keyspaces[self.ks_name_sentinel])
+        assert self.wrapper._wrapped != 
self.cluster_mock.metadata.keyspaces['foo']
+        assert self.wrapper._wrapped == 
self.cluster_mock.metadata.keyspaces[self.ks_name_sentinel]
 
-    def repr_test(self):
+    def test_repr(self):
         self.assertEqual(
             repr(self.wrapper),
             'UpdatingKeyspaceMetadataWrapper(cluster={}, ks_name={}, 
max_schema_agreement_wait={})'.format(
@@ -198,7 +193,7 @@ class UpdatingClusterMetadataWrapperTest(TestCase):
             max_schema_agreement_wait=self.max_schema_agreement_wait_sentinel
         )
 
-    def wrapped_access_calls_refresh_test(self):
+    def test_wrapped_access_calls_refresh(self):
         """
         Accessing the wrapped object should call the schema-refreshing method
         on the cluster.
@@ -209,20 +204,20 @@ class UpdatingClusterMetadataWrapperTest(TestCase):
             max_schema_agreement_wait=self.max_schema_agreement_wait_sentinel
         )
 
-    def default_wrapper_max_schema_agreement_wait_is_None_test(self):
+    def test_default_wrapper_max_schema_agreement_wait_is_None(self):
         wrapper = UpdatingClusterMetadataWrapper(cluster=self.cluster_mock)
         wrapper._wrapped
         self.cluster_mock.refresh_schema_metadata.assert_called_once_with(
             max_schema_agreement_wait=None
         )
 
-    def wrapped_returns_cluster_metadata_test(self):
+    def test_wrapped_returns_cluster_metadata(self):
         """
         The wrapped object is accessed correctly from the internal cluster 
object.
         """
         self.assertIs(self.wrapper._wrapped, self.cluster_mock.metadata)
 
-    def repr_test(self):
+    def test_repr(self):
         self.assertEqual(
             repr(self.wrapper),
             'UpdatingClusterMetadataWrapper(cluster={}, 
max_schema_agreement_wait={})'.format(

http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/metadata_test.py
----------------------------------------------------------------------
diff --git a/metadata_test.py b/metadata_test.py
new file mode 100644
index 0000000..90141e7
--- /dev/null
+++ b/metadata_test.py
@@ -0,0 +1,68 @@
+import threading
+import time
+import logging
+import pytest
+
+from dtest import Tester
+
+logger = logging.getLogger(__name__)
+
+
+class TestMetadata(Tester):
+
+    def force_compact(self):
+        cluster = self.cluster
+        (node1, node2) = cluster.nodelist()
+        node1.nodetool("compact keyspace1 standard1")
+
+    def force_repair(self):
+        cluster = self.cluster
+        (node1, node2) = cluster.nodelist()
+        node1.nodetool('repair keyspace1 standard1')
+
+    def do_read(self):
+        cluster = self.cluster
+        (node1, node2) = cluster.nodelist()
+
+        node1.stress(['read', 'no-warmup', 'n=30000', '-schema', 
'replication(factor=2)', 'compression=LZ4Compressor',
+                      '-rate', 'threads=1'])
+
+    @pytest.mark.skip(reason='hangs CI')
+    def test_metadata_reset_while_compact(self):
+        """
+        Resets the schema while a compact, read and repair happens.
+        All kinds of glorious things can fail.
+        """
+        # while the schema is being reset, there will inevitably be some
+        # queries that will error with this message
+        self.fixture_dtest_setup.ignore_log_patterns = ['.*Unknown keyspace/cf 
pair.*']
+
+        cluster = self.cluster
+        cluster.populate(2).start(wait_other_notice=True)
+        (node1, node2) = cluster.nodelist()
+
+        node1.nodetool("disableautocompaction")
+        node1.nodetool("setcompactionthroughput 1")
+
+        for i in range(3):
+            node1.stress(['write', 'no-warmup', 'n=30000', '-schema', 
'replication(factor=2)',
+                          'compression=LZ4Compressor', '-rate', 'threads=5', 
'-pop', 'seq=1..30000'])
+            node1.flush()
+
+        thread = threading.Thread(target=self.force_compact)
+        thread.start()
+        time.sleep(1)
+
+        thread2 = threading.Thread(target=self.force_repair)
+        thread2.start()
+        time.sleep(5)
+
+        thread3 = threading.Thread(target=self.do_read)
+        thread3.start()
+        time.sleep(5)
+
+        node1.nodetool("resetlocalschema")
+
+        thread.join()
+        thread2.join()
+        thread3.join()

http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/metadata_tests.py
----------------------------------------------------------------------
diff --git a/metadata_tests.py b/metadata_tests.py
deleted file mode 100644
index 9c5ee2e..0000000
--- a/metadata_tests.py
+++ /dev/null
@@ -1,65 +0,0 @@
-import threading
-import time
-from unittest import skip
-
-from dtest import Tester
-
-
-class TestMetadata(Tester):
-
-    def force_compact(self):
-        cluster = self.cluster
-        (node1, node2) = cluster.nodelist()
-        node1.nodetool("compact keyspace1 standard1")
-
-    def force_repair(self):
-        cluster = self.cluster
-        (node1, node2) = cluster.nodelist()
-        node1.nodetool('repair keyspace1 standard1')
-
-    def do_read(self):
-        cluster = self.cluster
-        (node1, node2) = cluster.nodelist()
-
-        node1.stress(['read', 'no-warmup', 'n=30000', '-schema', 
'replication(factor=2)', 'compression=LZ4Compressor',
-                      '-rate', 'threads=1'])
-
-    @skip('hangs CI')
-    def metadata_reset_while_compact_test(self):
-        """
-        Resets the schema while a compact, read and repair happens.
-        All kinds of glorious things can fail.
-        """
-
-        # while the schema is being reset, there will inevitably be some
-        # queries that will error with this message
-        self.ignore_log_patterns = '.*Unknown keyspace/cf pair.*'
-
-        cluster = self.cluster
-        cluster.populate(2).start(wait_other_notice=True)
-        (node1, node2) = cluster.nodelist()
-
-        node1.nodetool("disableautocompaction")
-        node1.nodetool("setcompactionthroughput 1")
-
-        for i in range(3):
-            node1.stress(['write', 'no-warmup', 'n=30000', '-schema', 
'replication(factor=2)', 'compression=LZ4Compressor', '-rate', 'threads=5', 
'-pop', 'seq=1..30000'])
-            node1.flush()
-
-        thread = threading.Thread(target=self.force_compact)
-        thread.start()
-        time.sleep(1)
-
-        thread2 = threading.Thread(target=self.force_repair)
-        thread2.start()
-        time.sleep(5)
-
-        thread3 = threading.Thread(target=self.do_read)
-        thread3.start()
-        time.sleep(5)
-
-        node1.nodetool("resetlocalschema")
-
-        thread.join()
-        thread2.join()
-        thread3.join()

http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/mixed_version_test.py
----------------------------------------------------------------------
diff --git a/mixed_version_test.py b/mixed_version_test.py
index 9da28b9..6fc656a 100644
--- a/mixed_version_test.py
+++ b/mixed_version_test.py
@@ -1,8 +1,13 @@
+import pytest
+import logging
+
 from cassandra import ConsistencyLevel, OperationTimedOut, ReadTimeout
 from cassandra.query import SimpleStatement
 
-from dtest import Tester, debug
-from tools.decorators import since
+from dtest import Tester
+
+since = pytest.mark.since
+logger = logging.getLogger(__name__)
 
 
 class TestSchemaChanges(Tester):
@@ -20,19 +25,20 @@ class TestSchemaChanges(Tester):
 
         node1, node2 = cluster.nodelist()
         original_version = node1.get_cassandra_version()
+        upgraded_version = None
         if original_version.vstring.startswith('2.0'):
             upgraded_version = 'github:apache/cassandra-2.1'
         elif original_version.vstring.startswith('2.1'):
             upgraded_version = 'github:apache/cassandra-2.2'
         else:
-            self.skip("This test is only designed to work with 2.0 and 2.1 
right now")
+            pytest.skip(msg="This test is only designed to work with 2.0 and 
2.1 right now")
 
         # start out with a major behind the previous version
 
         # upgrade node1
         node1.stop()
         node1.set_install_dir(version=upgraded_version)
-        debug("Set new cassandra dir for %s: %s" % (node1.name, 
node1.get_install_dir()))
+        logger.debug("Set new cassandra dir for %s: %s" % (node1.name, 
node1.get_install_dir()))
 
         node1.set_log_level("INFO")
         node1.start()
@@ -40,8 +46,9 @@ class TestSchemaChanges(Tester):
         session = self.patient_exclusive_cql_connection(node1)
         session.cluster.max_schema_agreement_wait = -1  # don't wait for 
schema agreement
 
-        debug("Creating keyspace and table")
-        session.execute("CREATE KEYSPACE test_upgrades WITH 
replication={'class': 'SimpleStrategy', 'replication_factor': '2'}")
+        logger.debug("Creating keyspace and table")
+        session.execute("CREATE KEYSPACE test_upgrades WITH 
replication={'class': 'SimpleStrategy', "
+                        "'replication_factor': '2'}")
         session.execute("CREATE TABLE test_upgrades.foo (a int primary key, b 
int)")
 
         pattern = r".*Got .* command for nonexistent table test_upgrades.foo.*"
@@ -50,15 +57,16 @@ class TestSchemaChanges(Tester):
             session.execute(SimpleStatement("SELECT * FROM test_upgrades.foo", 
consistency_level=ConsistencyLevel.ALL))
             self.fail("expected failure")
         except (ReadTimeout, OperationTimedOut):
-            debug("Checking node2 for warning in log")
+            logger.debug("Checking node2 for warning in log")
             node2.watch_log_for(pattern, timeout=10)
 
         # non-paged range slice
         try:
-            session.execute(SimpleStatement("SELECT * FROM test_upgrades.foo", 
consistency_level=ConsistencyLevel.ALL, fetch_size=None))
+            session.execute(SimpleStatement("SELECT * FROM test_upgrades.foo", 
consistency_level=ConsistencyLevel.ALL,
+                                            fetch_size=None))
             self.fail("expected failure")
         except (ReadTimeout, OperationTimedOut):
-            debug("Checking node2 for warning in log")
+            logger.debug("Checking node2 for warning in log")
             pattern = r".*Got .* command for nonexistent table 
test_upgrades.foo.*"
             node2.watch_log_for(pattern, timeout=10)
 
@@ -69,6 +77,6 @@ class TestSchemaChanges(Tester):
                                                 
consistency_level=ConsistencyLevel.ALL, fetch_size=None))
             self.fail("expected failure")
         except (ReadTimeout, OperationTimedOut):
-            debug("Checking node2 for warning in log")
+            logger.debug("Checking node2 for warning in log")
             pattern = r".*Got .* command for nonexistent table 
test_upgrades.foo.*"
             node2.watch_log_for(pattern, timeout=10)

http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/multidc_putget_test.py
----------------------------------------------------------------------
diff --git a/multidc_putget_test.py b/multidc_putget_test.py
index bad3fad..3af0c9a 100644
--- a/multidc_putget_test.py
+++ b/multidc_putget_test.py
@@ -1,10 +1,14 @@
+import logging
+
 from dtest import Tester, create_ks, create_cf
 from tools.data import putget
 
+logger = logging.getLogger(__name__)
+
 
 class TestMultiDCPutGet(Tester):
 
-    def putget_2dc_rf1_test(self):
+    def test_putget_2dc_rf1(self):
         """ Simple put-get test for 2 DC with one node each (RF=1) [catches 
#3539] """
         cluster = self.cluster
         cluster.populate([1, 1]).start()
@@ -15,7 +19,7 @@ class TestMultiDCPutGet(Tester):
 
         putget(cluster, session)
 
-    def putget_2dc_rf2_test(self):
+    def test_putget_2dc_rf2(self):
         """ Simple put-get test for 2 DC with 2 node each (RF=2) -- tests 
cross-DC efficient writes """
         cluster = self.cluster
         cluster.populate([2, 2]).start()

http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/native_transport_ssl_test.py
----------------------------------------------------------------------
diff --git a/native_transport_ssl_test.py b/native_transport_ssl_test.py
index 4716c80..284617d 100644
--- a/native_transport_ssl_test.py
+++ b/native_transport_ssl_test.py
@@ -1,20 +1,24 @@
 import os
+import pytest
+import logging
 
 from cassandra import ConsistencyLevel
 from cassandra.cluster import NoHostAvailable
 
 from dtest import Tester, create_ks, create_cf
 from tools.data import putget
-from tools.decorators import since
 from tools.misc import generate_ssl_stores
 
+since = pytest.mark.since
+logger = logging.getLogger(__name__)
 
-class NativeTransportSSL(Tester):
+
+class TestNativeTransportSSL(Tester):
     """
     Native transport integration tests, specifically for ssl and port 
configurations.
     """
 
-    def connect_to_ssl_test(self):
+    def test_connect_to_ssl(self):
         """
         Connecting to SSL enabled native transport port should only be 
possible using SSL enabled client
         """
@@ -30,13 +34,13 @@ class NativeTransportSSL(Tester):
         except NoHostAvailable:
             pass
 
-        
self.assertGreater(len(node1.grep_log("io.netty.handler.ssl.NotSslRecordException.*")),
 0, "Missing SSL handshake exception while connecting with non-SSL enabled 
client")
+        assert 
len(node1.grep_log("io.netty.handler.ssl.NotSslRecordException.*")), 0 > 
"Missing SSL handshake exception while connecting with non-SSL enabled client"
 
         # enabled ssl on the client and try again (this should work)
-        session = self.patient_cql_connection(node1, ssl_opts={'ca_certs': 
os.path.join(self.test_path, 'ccm_node.cer')})
+        session = self.patient_cql_connection(node1, ssl_opts={'ca_certs': 
os.path.join(self.fixture_dtest_setup.test_path, 'ccm_node.cer')})
         self._putget(cluster, session)
 
-    def connect_to_ssl_optional_test(self):
+    def test_connect_to_ssl_optional(self):
         """
         Connecting to SSL optional native transport port must be possible with 
SSL and non-SSL native clients
         @jira_ticket CASSANDRA-10559
@@ -50,14 +54,13 @@ class NativeTransportSSL(Tester):
         self._putget(cluster, session)
 
         # enabled ssl on the client and try again (this should work)
-        session = self.patient_cql_connection(node1, ssl_opts={'ca_certs': 
os.path.join(self.test_path, 'ccm_node.cer')})
+        session = self.patient_cql_connection(node1, ssl_opts={'ca_certs': 
os.path.join(self.fixture_dtest_setup.test_path, 'ccm_node.cer')})
         self._putget(cluster, session, ks='ks2')
 
-    def use_custom_port_test(self):
+    def test_use_custom_port(self):
         """
         Connect to non-default native transport port
         """
-
         cluster = self._populateCluster(nativePort=9567)
         node1 = cluster.nodelist()[0]
 
@@ -72,12 +75,11 @@ class NativeTransportSSL(Tester):
         self._putget(cluster, session)
 
     @since('3.0')
-    def use_custom_ssl_port_test(self):
+    def test_use_custom_ssl_port(self):
         """
         Connect to additional ssl enabled native transport port
         @jira_ticket CASSANDRA-9590
         """
-
         cluster = self._populateCluster(enableSSL=True, nativePortSSL=9666)
         node1 = cluster.nodelist()[0]
         cluster.start()
@@ -87,14 +89,14 @@ class NativeTransportSSL(Tester):
         self._putget(cluster, session)
 
         # connect to additional dedicated ssl port
-        session = self.patient_cql_connection(node1, port=9666, 
ssl_opts={'ca_certs': os.path.join(self.test_path, 'ccm_node.cer')})
+        session = self.patient_cql_connection(node1, port=9666, 
ssl_opts={'ca_certs': os.path.join(self.fixture_dtest_setup.test_path, 
'ccm_node.cer')})
         self._putget(cluster, session, ks='ks2')
 
     def _populateCluster(self, enableSSL=False, nativePort=None, 
nativePortSSL=None, sslOptional=False):
         cluster = self.cluster
 
         if enableSSL:
-            generate_ssl_stores(self.test_path)
+            generate_ssl_stores(self.fixture_dtest_setup.test_path)
             # C* versions before 3.0 (CASSANDRA-10559) do not know about
             # 'client_encryption_options.optional' - so we must not add that 
parameter
             if sslOptional:
@@ -102,7 +104,7 @@ class NativeTransportSSL(Tester):
                     'client_encryption_options': {
                         'enabled': True,
                         'optional': sslOptional,
-                        'keystore': os.path.join(self.test_path, 
'keystore.jks'),
+                        'keystore': 
os.path.join(self.fixture_dtest_setup.test_path, 'keystore.jks'),
                         'keystore_password': 'cassandra'
                     }
                 })
@@ -110,7 +112,7 @@ class NativeTransportSSL(Tester):
                 cluster.set_configuration_options({
                     'client_encryption_options': {
                         'enabled': True,
-                        'keystore': os.path.join(self.test_path, 
'keystore.jks'),
+                        'keystore': 
os.path.join(self.fixture_dtest_setup.test_path, 'keystore.jks'),
                         'keystore_password': 'cassandra'
                     }
                 })

http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/nodetool_test.py
----------------------------------------------------------------------
diff --git a/nodetool_test.py b/nodetool_test.py
index b00e442..e913b30 100644
--- a/nodetool_test.py
+++ b/nodetool_test.py
@@ -1,12 +1,19 @@
 import os
+import pytest
+import re
+import logging
+
 from cassandra import ConsistencyLevel
 from cassandra.query import SimpleStatement
 from ccmlib.node import ToolError
-from dtest import Tester, debug, create_ks
+
+from dtest import Tester, create_ks
 from tools.assertions import assert_all, assert_invalid, assert_none
-from tools.decorators import since
 from tools.jmxutils import JolokiaAgent, make_mbean, 
remove_perf_disable_shared_mem
 
+since = pytest.mark.since
+logger = logging.getLogger(__name__)
+
 
 class TestNodetool(Tester):
 
@@ -26,10 +33,10 @@ class TestNodetool(Tester):
 
         try:
             node.decommission()
-            self.assertFalse("Expected nodetool error")
+            assert not "Expected nodetool error"
         except ToolError as e:
-            self.assertEqual('', e.stderr)
-            self.assertTrue('Unsupported operation' in e.stdout)
+            assert '' == e.stderr
+            assert 'Unsupported operation' in e.stdout
 
     def test_correct_dc_rack_in_nodetool_info(self):
         """
@@ -51,16 +58,19 @@ class TestNodetool(Tester):
 
         for i, node in enumerate(cluster.nodelist()):
             out, err, _ = node.nodetool('info')
-            self.assertEqual(0, len(err), err)
-            debug(out)
-            for line in out.split(os.linesep):
+            assert 0 == len(err), err
+            out_str = out
+            if isinstance(out, (bytes, bytearray)):
+                out_str = out.decode("utf-8")
+            logger.debug(out_str)
+            for line in out_str.split(os.linesep):
                 if line.startswith('Data Center'):
-                    self.assertTrue(line.endswith(node.data_center),
-                                    "Expected dc {} for {} but got 
{}".format(node.data_center, node.address(), line.rsplit(None, 1)[-1]))
+                    assert line.endswith(node.data_center), \
+                        "Expected dc {} for {} but got 
{}".format(node.data_center, node.address(), line.rsplit(None, 1)[-1])
                 elif line.startswith('Rack'):
                     rack = "rack{}".format(i % 2)
-                    self.assertTrue(line.endswith(rack),
-                                    "Expected rack {} for {} but got 
{}".format(rack, node.address(), line.rsplit(None, 1)[-1]))
+                    assert line.endswith(rack), \
+                        "Expected rack {} for {} but got {}".format(rack, 
node.address(), line.rsplit(None, 1)[-1])
 
     @since('3.4')
     def test_nodetool_timeout_commands(self):
@@ -81,21 +91,21 @@ class TestNodetool(Tester):
         # read all of the timeouts, make sure we get a sane response
         for timeout_type in types:
             out, err, _ = node.nodetool('gettimeout {}'.format(timeout_type))
-            self.assertEqual(0, len(err), err)
-            debug(out)
-            self.assertRegexpMatches(out, r'.* \d+ ms')
+            assert 0 == len(err), err
+            logger.debug(out)
+            assert re.search(r'.* \d+ ms', out)
 
         # set all of the timeouts to 123
         for timeout_type in types:
             _, err, _ = node.nodetool('settimeout {} 123'.format(timeout_type))
-            self.assertEqual(0, len(err), err)
+            assert 0 == len(err), err
 
         # verify that they're all reported as 123
         for timeout_type in types:
             out, err, _ = node.nodetool('gettimeout {}'.format(timeout_type))
-            self.assertEqual(0, len(err), err)
-            debug(out)
-            self.assertRegexpMatches(out, r'.* 123 ms')
+            assert 0 == len(err), err
+            logger.debug(out)
+            assert re.search(r'.* 123 ms', out)
 
     @since('3.0')
     def test_cleanup_when_no_replica_with_index(self):
@@ -132,9 +142,9 @@ class TestNodetool(Tester):
         self.cluster.flush()
 
         for node in self.cluster.nodelist():
-            self.assertNotEqual(0, len(node.get_sstables('ks', 'cf')))
+            assert 0 != len(node.get_sstables('ks', 'cf'))
         if with_index:
-            self.assertEqual(len(list(session_dc2.execute("SELECT * FROM ks.cf 
WHERE value = 'value'"))), 100)
+            assert 100 == len(list(session_dc2.execute("SELECT * FROM ks.cf 
WHERE value = 'value'"))), 100
 
         # alter rf to only dc1
         session.execute("ALTER KEYSPACE ks WITH REPLICATION = {'class' : 
'NetworkTopologyStrategy', 'dc1' : 1, 'dc2' : 0};")
@@ -146,16 +156,16 @@ class TestNodetool(Tester):
         # check local data on dc2
         for node in self.cluster.nodelist():
             if node.data_center == 'dc2':
-                self.assertEqual(0, len(node.get_sstables('ks', 'cf')))
+                assert 0 == len(node.get_sstables('ks', 'cf'))
             else:
-                self.assertNotEqual(0, len(node.get_sstables('ks', 'cf')))
+                assert 0 != len(node.get_sstables('ks', 'cf'))
 
         # dc1 data remains
         statement = SimpleStatement("SELECT * FROM ks.cf", 
consistency_level=ConsistencyLevel.LOCAL_ONE)
-        self.assertEqual(len(list(session.execute(statement))), 100)
+        assert 100 == len(list(session.execute(statement)))
         if with_index:
             statement = SimpleStatement("SELECT * FROM ks.cf WHERE value = 
'value'", consistency_level=ConsistencyLevel.LOCAL_ONE)
-            self.assertEqual(len(list(session.execute(statement))), 100)
+            assert len(list(session.execute(statement))) == 100
 
         # alter rf back to query dc2, no data, no index
         session.execute("ALTER KEYSPACE ks WITH REPLICATION = {'class' : 
'NetworkTopologyStrategy', 'dc1' : 0, 'dc2' : 1};")
@@ -182,30 +192,30 @@ class TestNodetool(Tester):
 
         # Do a first try without any keypace, we shouldn't have the notice
         out, err, _ = node.nodetool('status')
-        self.assertEqual(0, len(err), err)
-        self.assertNotRegexpMatches(out, notice_message)
+        assert 0 == len(err), err
+        assert not re.search(notice_message, out)
 
         session = self.patient_cql_connection(node)
         session.execute("CREATE KEYSPACE ks1 WITH replication = { 
'class':'SimpleStrategy', 'replication_factor':1}")
 
         # With 1 keyspace, we should still not get the notice
         out, err, _ = node.nodetool('status')
-        self.assertEqual(0, len(err), err)
-        self.assertNotRegexpMatches(out, notice_message)
+        assert 0 == len(err), err
+        assert not re.search(notice_message, out)
 
         session.execute("CREATE KEYSPACE ks2 WITH replication = { 
'class':'SimpleStrategy', 'replication_factor':1}")
 
         # With 2 keyspaces with the same settings, we should not get the notice
         out, err, _ = node.nodetool('status')
-        self.assertEqual(0, len(err), err)
-        self.assertNotRegexpMatches(out, notice_message)
+        assert 0 == len(err), err
+        assert not re.search(notice_message, out)
 
         session.execute("CREATE KEYSPACE ks3 WITH replication = { 
'class':'SimpleStrategy', 'replication_factor':3}")
 
         # With a keyspace without the same replication factor, we should get 
the notice
         out, err, _ = node.nodetool('status')
-        self.assertEqual(0, len(err), err)
-        self.assertRegexpMatches(out, notice_message)
+        assert 0 == len(err), err
+        assert re.search(notice_message, out)
 
     @since('4.0')
     def test_set_get_batchlog_replay_throttle(self):
@@ -220,14 +230,14 @@ class TestNodetool(Tester):
         cluster.start()
 
         # Test that nodetool help messages are displayed
-        self.assertTrue('Set batchlog replay throttle' in node.nodetool('help 
setbatchlogreplaythrottle').stdout)
-        self.assertTrue('Print batchlog replay throttle' in 
node.nodetool('help getbatchlogreplaythrottle').stdout)
+        assert 'Set batchlog replay throttle' in node.nodetool('help 
setbatchlogreplaythrottle').stdout
+        assert 'Print batchlog replay throttle' in node.nodetool('help 
getbatchlogreplaythrottle').stdout
 
         # Set and get throttle with nodetool, ensuring that the rate change is 
logged
         node.nodetool('setbatchlogreplaythrottle 2048')
-        self.assertTrue(len(node.grep_log('Updating batchlog replay throttle 
to 2048 KB/s, 1024 KB/s per endpoint',
-                                          filename='debug.log')) > 0)
-        self.assertTrue('Batchlog replay throttle: 2048 KB/s' in 
node.nodetool('getbatchlogreplaythrottle').stdout)
+        assert len(node.grep_log('Updating batchlog replay throttle to 2048 
KB/s, 1024 KB/s per endpoint',
+                                 filename='debug.log')) >= 0
+        assert 'Batchlog replay throttle: 2048 KB/s' in 
node.nodetool('getbatchlogreplaythrottle').stdout
 
     @since('3.0')
     def test_reloadlocalschema(self):
@@ -244,7 +254,8 @@ class TestNodetool(Tester):
 
         session = self.patient_cql_connection(node)
 
-        query = "CREATE KEYSPACE IF NOT EXISTS test WITH replication = 
{'class': 'NetworkTopologyStrategy', 'datacenter1': 2};"
+        query = "CREATE KEYSPACE IF NOT EXISTS test WITH replication " \
+                "= {'class': 'NetworkTopologyStrategy', 'datacenter1': 2};"
         session.execute(query)
 
         query = 'CREATE TABLE test.test (pk int, ck int, PRIMARY KEY (pk, 
ck));'
@@ -252,8 +263,6 @@ class TestNodetool(Tester):
 
         ss = make_mbean('db', type='StorageService')
 
-        schema_version = ''
-
         # get initial schema version
         with JolokiaAgent(node) as jmx:
             schema_version = jmx.read_attribute(ss, 'SchemaVersion')
@@ -270,7 +279,7 @@ class TestNodetool(Tester):
 
         # validate that schema version wasn't automatically updated
         with JolokiaAgent(node) as jmx:
-            self.assertEqual(schema_version, jmx.read_attribute(ss, 
'SchemaVersion'))
+            assert schema_version == jmx.read_attribute(ss, 'SchemaVersion')
 
         # make sure the new column wasn't automagically picked up
         assert_invalid(session, 'INSERT INTO test.test (pk, ck, val) VALUES 
(0, 1, 2);')
@@ -280,7 +289,7 @@ class TestNodetool(Tester):
 
         # validate that schema version changed
         with JolokiaAgent(node) as jmx:
-            self.assertNotEqual(schema_version, jmx.read_attribute(ss, 
'SchemaVersion'))
+            assert schema_version != jmx.read_attribute(ss, 'SchemaVersion')
 
         # try an insert with the new column again and validate it succeeds 
this time
         session.execute('INSERT INTO test.test (pk, ck, val) VALUES (0, 1, 
2);')
@@ -299,19 +308,19 @@ class TestNodetool(Tester):
         cluster.start()
 
         # Test that nodetool help messages are displayed
-        self.assertTrue('Set the number of concurrent view' in 
node.nodetool('help setconcurrentviewbuilders').stdout)
-        self.assertTrue('Get the number of concurrent view' in 
node.nodetool('help getconcurrentviewbuilders').stdout)
+        assert 'Set the number of concurrent view' in node.nodetool('help 
setconcurrentviewbuilders').stdout
+        assert 'Get the number of concurrent view' in node.nodetool('help 
getconcurrentviewbuilders').stdout
 
         # Set and get throttle with nodetool, ensuring that the rate change is 
logged
         node.nodetool('setconcurrentviewbuilders 4')
-        self.assertTrue('Current number of concurrent view builders in the 
system is: \n4'
-                        in node.nodetool('getconcurrentviewbuilders').stdout)
+        assert 'Current number of concurrent view builders in the system is: 
\n4' \
+               in node.nodetool('getconcurrentviewbuilders').stdout
 
         # Try to set an invalid zero value
         try:
             node.nodetool('setconcurrentviewbuilders 0')
         except ToolError as e:
-            self.assertTrue('concurrent_view_builders should be great than 0.' 
in e.stdout)
-            self.assertTrue('Number of concurrent view builders should be 
greater than 0.', e.message)
+            assert 'concurrent_view_builders should be great than 0.' in 
e.stdout
+            assert 'Number of concurrent view builders should be greater than 
0.', e.message
         else:
             self.fail("Expected error when setting and invalid value")

http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/offline_tools_test.py
----------------------------------------------------------------------
diff --git a/offline_tools_test.py b/offline_tools_test.py
index 028027d..7c7cc8f 100644
--- a/offline_tools_test.py
+++ b/offline_tools_test.py
@@ -3,21 +3,29 @@ import os
 import random
 import re
 import subprocess
+import pytest
+import logging
 
 from ccmlib import common
 from ccmlib.node import ToolError
 
-from dtest import Tester, debug, create_ks
-from tools.decorators import since
+from dtest import Tester, create_ks
+
+since = pytest.mark.since
+logger = logging.getLogger(__name__)
 
 
 class TestOfflineTools(Tester):
 
-    # In 2.0, we will get this error log message due to jamm not being
-    # in the classpath
-    ignore_log_patterns = ["Unable to initialize MemoryMeter"]
+    @pytest.fixture(autouse=True)
+    def fixture_add_additional_log_patterns(self, fixture_dtest_setup):
+        fixture_dtest_setup.ignore_log_patterns = (
+            # In 2.0, we will get this error log message due to jamm not being
+            # in the classpath
+            "Unable to initialize MemoryMeter"
+        )
 
-    def sstablelevelreset_test(self):
+    def test_sstablelevelreset(self):
         """
         Insert data and call sstablelevelreset on a series of
         tables. Confirm level is reset to 0 using its output.
@@ -34,9 +42,9 @@ class TestOfflineTools(Tester):
         try:
             node1.run_sstablelevelreset("keyspace1", "standard1")
         except ToolError as e:
-            self.assertIn("ColumnFamily not found: keyspace1/standard1", 
e.message)
+            assert re.search("ColumnFamily not found: keyspace1/standard1", 
str(e))
             # this should return exit code 1
-            self.assertEqual(e.exit_status, 1, "Expected sstablelevelreset to 
have a return code of 1, but instead return code was {}".format(e.exit_status))
+            assert e.exit_status == 1, "Expected sstablelevelreset to have a 
return code of 1 == but instead return code was {}".format(e.exit_status)
 
         # now test by generating keyspace but not flushing sstables
         cluster.start(wait_for_binary_proto=True)
@@ -46,8 +54,8 @@ class TestOfflineTools(Tester):
 
         output, error, rc = node1.run_sstablelevelreset("keyspace1", 
"standard1")
         self._check_stderr_error(error)
-        self.assertIn("Found no sstables, did you give the correct keyspace", 
output)
-        self.assertEqual(rc, 0, msg=str(rc))
+        assert re.search("Found no sstables, did you give the correct 
keyspace", output.decode("utf-8"))
+        assert rc == 0, str(rc)
 
         # test by writing small amount of data and flushing (all sstables 
should be level 0)
         cluster.start(wait_for_binary_proto=True)
@@ -59,9 +67,9 @@ class TestOfflineTools(Tester):
         cluster.stop(gently=False)
 
         output, error, rc = node1.run_sstablelevelreset("keyspace1", 
"standard1")
-        self._check_stderr_error(error)
-        self.assertIn("since it is already on level 0", output)
-        self.assertEqual(rc, 0, msg=str(rc))
+        self._check_stderr_error(error.decode("utf-8"))
+        assert re.search("since it is already on level 0", 
output.decode("utf-8"))
+        assert rc == 0, str(rc)
 
         # test by loading large amount data so we have multiple levels and 
checking all levels are 0 at end
         cluster.start(wait_for_binary_proto=True)
@@ -74,21 +82,21 @@ class TestOfflineTools(Tester):
         initial_levels = 
self.get_levels(node1.run_sstablemetadata(keyspace="keyspace1", 
column_families=["standard1"]))
         _, error, rc = node1.run_sstablelevelreset("keyspace1", "standard1")
         final_levels = 
self.get_levels(node1.run_sstablemetadata(keyspace="keyspace1", 
column_families=["standard1"]))
-        self._check_stderr_error(error)
-        self.assertEqual(rc, 0, msg=str(rc))
+        self._check_stderr_error(error.decode("utf-8"))
+        assert rc == 0, str(rc)
 
-        debug(initial_levels)
-        debug(final_levels)
+        logger.debug(initial_levels)
+        logger.debug(final_levels)
 
         # let's make sure there was at least L1 beforing resetting levels
-        self.assertTrue(max(initial_levels) > 0)
+        assert max(initial_levels) > 0
 
         # let's check all sstables are on L0 after sstablelevelreset
-        self.assertTrue(max(final_levels) == 0)
+        assert max(final_levels) == 0
 
     def get_levels(self, data):
         (out, err, rc) = data
-        return map(int, re.findall("SSTable Level: ([0-9])", out))
+        return list(map(int, re.findall("SSTable Level: ([0-9])", 
out.decode("utf-8"))))
 
     def wait_for_compactions(self, node):
         pattern = re.compile("pending tasks: 0")
@@ -97,7 +105,7 @@ class TestOfflineTools(Tester):
             if pattern.search(output):
                 break
 
-    def sstableofflinerelevel_test(self):
+    def test_sstableofflinerelevel(self):
         """
         Generate sstables of varying levels.
         Reset sstables to L0 with sstablelevelreset
@@ -115,9 +123,9 @@ class TestOfflineTools(Tester):
         # test by trying to run on nonexistent keyspace
         # cluster.stop(gently=False)
         # output, error, rc = node1.run_sstableofflinerelevel("keyspace1", 
"standard1", output=True)
-        # self.assertTrue("java.lang.IllegalArgumentException: Unknown 
keyspace/columnFamily keyspace1.standard1" in error)
+        # assert "java.lang.IllegalArgumentException: Unknown 
keyspace/columnFamily keyspace1.standard1" in error
         # # this should return exit code 1
-        # self.assertEqual(rc, 1, msg=str(rc))
+        # assert rc, 1 == msg=str(rc)
         # cluster.start()
 
         # now test by generating keyspace but not flushing sstables
@@ -131,13 +139,13 @@ class TestOfflineTools(Tester):
         try:
             output, error, _ = node1.run_sstableofflinerelevel("keyspace1", 
"standard1")
         except ToolError as e:
-            self.assertIn("No sstables to relevel for keyspace1.standard1", 
e.stdout)
-            self.assertEqual(e.exit_status, 1, msg=str(e.exit_status))
+            assert re.search("No sstables to relevel for keyspace1.standard1", 
e.stdout.decode("utf-8"))
+            assert e.exit_status == 1, str(e.exit_status)
 
         # test by flushing (sstable should be level 0)
         cluster.start(wait_for_binary_proto=True)
         session = self.patient_cql_connection(node1)
-        debug("Altering compaction strategy to LCS")
+        logger.debug("Altering compaction strategy to LCS")
         session.execute("ALTER TABLE keyspace1.standard1 with 
compaction={'class': 'LeveledCompactionStrategy', 'sstable_size_in_mb':1};")
 
         node1.stress(['write', 'n=1K', 'no-warmup',
@@ -149,8 +157,8 @@ class TestOfflineTools(Tester):
         cluster.stop()
 
         output, _, rc = node1.run_sstableofflinerelevel("keyspace1", 
"standard1")
-        self.assertIn("L0=1", output)
-        self.assertEqual(rc, 0, msg=str(rc))
+        assert re.search("L0=1", output.decode("utf-8"))
+        assert rc == 0, str(rc)
 
         cluster.start(wait_for_binary_proto=True)
         # test by loading large amount data so we have multiple sstables
@@ -162,56 +170,55 @@ class TestOfflineTools(Tester):
                       '-rate', 'threads=8'])
 
         node1.flush()
-        debug("Waiting for compactions to finish")
+        logger.debug("Waiting for compactions to finish")
         self.wait_for_compactions(node1)
-        debug("Stopping node")
+        logger.debug("Stopping node")
         cluster.stop()
-        debug("Done stopping node")
+        logger.debug("Done stopping node")
 
         # Let's reset all sstables to L0
-        debug("Getting initial levels")
+        logger.debug("Getting initial levels")
         initial_levels = 
list(self.get_levels(node1.run_sstablemetadata(keyspace="keyspace1", 
column_families=["standard1"])))
-        self.assertNotEqual([], initial_levels)
-        debug('initial_levels:')
-        debug(initial_levels)
-        debug("Running sstablelevelreset")
+        assert [] != initial_levels
+        logger.debug('initial_levels:')
+        logger.debug(initial_levels)
+        logger.debug("Running sstablelevelreset")
         node1.run_sstablelevelreset("keyspace1", "standard1")
-        debug("Getting final levels")
+        logger.debug("Getting final levels")
         final_levels = 
list(self.get_levels(node1.run_sstablemetadata(keyspace="keyspace1", 
column_families=["standard1"])))
-        self.assertNotEqual([], final_levels)
-        debug('final levels:')
-        debug(final_levels)
+        assert [] != final_levels
+        logger.debug('final levels:')
+        logger.debug(final_levels)
 
         # let's make sure there was at least 3 levels (L0, L1 and L2)
-        self.assertGreater(max(initial_levels), 1)
+        assert max(initial_levels) > 1
         # let's check all sstables are on L0 after sstablelevelreset
-        self.assertEqual(max(final_levels), 0)
+        assert max(final_levels) == 0
 
         # time to relevel sstables
-        debug("Getting initial levels")
+        logger.debug("Getting initial levels")
         initial_levels = 
self.get_levels(node1.run_sstablemetadata(keyspace="keyspace1", 
column_families=["standard1"]))
-        debug("Running sstableofflinerelevel")
+        logger.debug("Running sstableofflinerelevel")
         output, error, _ = node1.run_sstableofflinerelevel("keyspace1", 
"standard1")
-        debug("Getting final levels")
+        logger.debug("Getting final levels")
         final_levels = 
self.get_levels(node1.run_sstablemetadata(keyspace="keyspace1", 
column_families=["standard1"]))
 
-        debug(output)
-        debug(error)
+        logger.debug(output)
+        logger.debug(error)
 
-        debug(initial_levels)
-        debug(final_levels)
+        logger.debug(initial_levels)
+        logger.debug(final_levels)
 
         # let's check sstables were promoted after releveling
-        self.assertGreater(max(final_levels), 1)
+        assert max(final_levels) > 1
 
     @since('2.2')
-    def sstableverify_test(self):
+    def test_sstableverify(self):
         """
         Generate sstables and test offline verification works correctly
         Test on bad input: nonexistent keyspace and sstables
         Test on potential situations: deleted sstables, corrupted sstables
         """
-
         cluster = self.cluster
         cluster.populate(3).start(wait_for_binary_proto=True)
         node1, node2, node3 = cluster.nodelist()
@@ -220,14 +227,14 @@ class TestOfflineTools(Tester):
         try:
             (out, err, rc) = node1.run_sstableverify("keyspace1", "standard1")
         except ToolError as e:
-            self.assertIn("Unknown keyspace/table keyspace1.standard1", 
e.message)
-            self.assertEqual(e.exit_status, 1, msg=str(e.exit_status))
+            assert "Unknown keyspace/table keyspace1.standard1" in repr(e)
+            assert e.exit_status == 1, str(e.exit_status)
 
         # test on nonexistent sstables:
         node1.stress(['write', 'n=100', 'no-warmup', '-schema', 
'replication(factor=3)',
                       '-rate', 'threads=8'])
         (out, err, rc) = node1.run_sstableverify("keyspace1", "standard1")
-        self.assertEqual(rc, 0, msg=str(rc))
+        assert rc == 0, str(rc)
 
         # Generate multiple sstables and test works properly in the simple case
         node1.stress(['write', 'n=100K', 'no-warmup', '-schema', 
'replication(factor=3)',
@@ -240,15 +247,14 @@ class TestOfflineTools(Tester):
 
         (out, error, rc) = node1.run_sstableverify("keyspace1", "standard1")
 
-        self.assertEqual(rc, 0, msg=str(rc))
+        assert rc == 0, str(rc)
 
         # STDOUT of the sstableverify command consists of multiple lines which 
may contain
         # Java-normalized paths. To later compare these with Python-normalized 
paths, we
         # map over each line of out and replace Java-normalized paths with 
Python equivalents.
-        outlines = map(lambda line: re.sub("(?<=path=').*(?=')",
+        outlines = [re.sub("(?<=path=').*(?=')",
                                            lambda match: 
os.path.normcase(match.group(0)),
-                                           line),
-                       out.splitlines())
+                                           line) for line in 
out.decode("utf-8").splitlines()]
 
         # check output is correct for each sstable
         sstables = self._get_final_sstables(node1, "keyspace1", "standard1")
@@ -263,18 +269,18 @@ class TestOfflineTools(Tester):
                     elif "Checking computed hash of BigTableReader" in line:
                         hashcomputed = True
                     else:
-                        debug(line)
+                        logger.debug(line)
 
-            debug(verified)
-            debug(hashcomputed)
-            debug(sstable)
-            self.assertTrue(verified and hashcomputed)
+            logger.debug(verified)
+            logger.debug(hashcomputed)
+            logger.debug(sstable)
+            assert verified and hashcomputed
 
         # now try intentionally corrupting an sstable to see if hash computed 
is different and error recognized
         sstable1 = sstables[1]
-        with open(sstable1, 'r') as f:
+        with open(sstable1, 'rb') as f:
             sstabledata = bytearray(f.read())
-        with open(sstable1, 'w') as out:
+        with open(sstable1, 'wb') as out:
             position = random.randrange(0, len(sstabledata))
             sstabledata[position] = (sstabledata[position] + 1) % 256
             out.write(sstabledata)
@@ -284,12 +290,12 @@ class TestOfflineTools(Tester):
             (out, error, rc) = node1.run_sstableverify("keyspace1", 
"standard1", options=['-v'])
         except ToolError as e:
             # Process sstableverify output to normalize paths in string to 
Python casing as above
-            error = re.sub("(?<=Corrupted: ).*", lambda match: 
os.path.normcase(match.group(0)), e.message)
+            error = re.sub("(?<=Corrupted: ).*", lambda match: 
os.path.normcase(match.group(0)), str(e))
 
-            self.assertIn("Corrupted: " + sstable1, error)
-            self.assertEqual(e.exit_status, 1, msg=str(e.exit_status))
+            assert re.search("Corrupted: " + sstable1, error)
+            assert e.exit_status == 1, str(e.exit_status)
 
-    def sstableexpiredblockers_test(self):
+    def test_sstableexpiredblockers(self):
         cluster = self.cluster
         cluster.populate(1).start(wait_for_binary_proto=True)
         [node1] = cluster.nodelist()
@@ -304,14 +310,14 @@ class TestOfflineTools(Tester):
         session.execute("delete from ks.cf where key = 3")
         node1.flush()
         out, error, _ = node1.run_sstableexpiredblockers(keyspace="ks", 
column_family="cf")
-        self.assertIn("blocks 2 expired sstables from getting dropped", out)
+        assert "blocks 2 expired sstables from getting dropped" in 
out.decode("utf-8")
 
     # 4.0 removes back compatibility with pre-3.0 versions, so testing 
upgradesstables for
     # paths from those versions to 4.0 is invalid (and can only fail). There 
isn't currently
     # any difference between the 3.0 and 4.0 sstable format though, but when 
the version is
     # bumped for 4.0, remove the max_version & add a case for testing a 3.0 -> 
4.0 upgrade
     @since('2.2', max_version='3.X')
-    def sstableupgrade_test(self):
+    def test_sstableupgrade(self):
         """
         Test that sstableupgrade functions properly offline on a same-version 
Cassandra sstable, a
         stdout message of "Found 0 sstables that need upgrading." should be 
returned.
@@ -320,7 +326,7 @@ class TestOfflineTools(Tester):
         cluster = self.cluster
         testversion = cluster.version()
         original_install_dir = cluster.get_install_dir()
-        debug('Original install dir: {}'.format(original_install_dir))
+        logger.debug('Original install dir: {}'.format(original_install_dir))
 
         # Set up last major version to upgrade from, assuming 2.1 branch is 
the oldest tested version
         if testversion < '2.2':
@@ -330,36 +336,36 @@ class TestOfflineTools(Tester):
             #   Error opening zip file or JAR manifest missing : 
/home/mshuler/git/cassandra/lib/jamm-0.2.5.jar
             # The 2.1 installed jamm version is 0.3.0, but bin/cassandra.in.sh 
used by nodetool still has 0.2.5
             # (when this is fixed in CCM issue #463, install 
version='github:apache/cassandra-2.0' as below)
-            self.skipTest('Skipping 2.1 test due to jamm.jar version upgrade 
problem in CCM node configuration.')
+            pytest.skip('Skipping 2.1 test due to jamm.jar version upgrade 
problem in CCM node configuration.')
         elif testversion < '3.0':
-            debug('Test version: {} - installing 
github:apache/cassandra-2.1'.format(testversion))
+            logger.debug('Test version: {} - installing 
github:apache/cassandra-2.1'.format(testversion))
             cluster.set_install_dir(version='github:apache/cassandra-2.1')
         # As of 3.5, sstable format 'ma' from 3.0 is still the latest - 
install 2.2 to upgrade from
         elif testversion < '4.0':
-            debug('Test version: {} - installing 
github:apache/cassandra-2.2'.format(testversion))
+            logger.debug('Test version: {} - installing 
github:apache/cassandra-2.2'.format(testversion))
             cluster.set_install_dir(version='github:apache/cassandra-2.2')
         # From 4.0, one can only upgrade from 3.0
         else:
-            debug('Test version: {} - installing 
github:apache/cassandra-3.0'.format(testversion))
+            logger.debug('Test version: {} - installing 
github:apache/cassandra-3.0'.format(testversion))
             cluster.set_install_dir(version='github:apache/cassandra-3.0')
 
         # Start up last major version, write out an sstable to upgrade, and 
stop node
         cluster.populate(1).start(wait_for_binary_proto=True)
         [node1] = cluster.nodelist()
         # Check that node1 is actually what we expect
-        debug('Downgraded install dir: {}'.format(node1.get_install_dir()))
+        logger.debug('Downgraded install dir: 
{}'.format(node1.get_install_dir()))
         session = self.patient_cql_connection(node1)
         create_ks(session, 'ks', 1)
         session.execute('create table ks.cf (key int PRIMARY KEY, val int) 
with gc_grace_seconds=0')
         session.execute('insert into ks.cf (key, val) values (1,1)')
         node1.flush()
         cluster.stop()
-        debug('Beginning ks.cf sstable: 
{}'.format(node1.get_sstables(keyspace='ks', column_family='cf')))
+        logger.debug('Beginning ks.cf sstable: 
{}'.format(node1.get_sstables(keyspace='ks', column_family='cf')))
 
         # Upgrade Cassandra to original testversion and run sstableupgrade
         cluster.set_install_dir(original_install_dir)
         # Check that node1 is actually upgraded
-        debug('Upgraded to original install dir: 
{}'.format(node1.get_install_dir()))
+        logger.debug('Upgraded to original install dir: 
{}'.format(node1.get_install_dir()))
         # Perform a node start/stop so system tables get internally updated, 
otherwise we may get "Unknown keyspace/table ks.cf"
         cluster.start(wait_for_binary_proto=True)
         node1.flush()
@@ -372,19 +378,19 @@ class TestOfflineTools(Tester):
         # change before it's release.
         if testversion < '4.0':
             (out, error, rc) = node1.run_sstableupgrade(keyspace='ks', 
column_family='cf')
-            debug(out)
-            debug(error)
-            debug('Upgraded ks.cf sstable: 
{}'.format(node1.get_sstables(keyspace='ks', column_family='cf')))
-            self.assertIn('Found 1 sstables that need upgrading.', out)
+            logger.debug(out)
+            logger.debug(error)
+            logger.debug('Upgraded ks.cf sstable: 
{}'.format(node1.get_sstables(keyspace='ks', column_family='cf')))
+            assert 'Found 1 sstables that need upgrading.' in out
 
         # Check that sstableupgrade finds no upgrade needed on current version.
         (out, error, rc) = node1.run_sstableupgrade(keyspace='ks', 
column_family='cf')
-        debug(out)
-        debug(error)
-        self.assertIn('Found 0 sstables that need upgrading.', out)
+        logger.debug(out)
+        logger.debug(error)
+        assert 'Found 0 sstables that need upgrading.' in out
 
     @since('3.0')
-    def sstabledump_test(self):
+    def test_sstabledump(self):
         """
         Test that sstabledump functions properly offline to output the 
contents of a table.
         """
@@ -405,41 +411,41 @@ class TestOfflineTools(Tester):
         node1.flush()
         cluster.stop()
         [(out, error, rc)] = node1.run_sstabledump(keyspace='ks', 
column_families=['cf'])
-        debug(out)
-        debug(error)
+        logger.debug(out)
+        logger.debug(error)
 
         # Load the json output and check that it contains the inserted key=1
         s = json.loads(out)
-        debug(s)
-        self.assertEqual(len(s), 2)
+        logger.debug(s)
+        assert len(s) == 2
 
         # order the rows so that we have key=1 first, then key=2
         row0, row1 = s
         (row0, row1) = (row0, row1) if row0['partition']['key'] == ['1'] else 
(row1, row0)
 
-        self.assertEqual(row0['partition']['key'], ['1'])
+        assert row0['partition']['key'] == ['1']
 
-        self.assertEqual(row1['partition']['key'], ['2'])
-        self.assertIsNotNone(row1['partition'].get('deletion_info'))
-        self.assertIsNotNone(row1.get('rows'))
+        assert row1['partition']['key'] == ['2']
+        assert row1['partition'].get('deletion_info') is not None
+        assert row1.get('rows') is not None
 
         # Check that we only get the key back using the enumerate option
         [(out, error, rc)] = node1.run_sstabledump(keyspace='ks', 
column_families=['cf'], enumerate_keys=True)
-        debug(out)
-        debug(error)
+        logger.debug(out)
+        logger.debug(error)
         s = json.loads(out)
-        debug(s)
-        self.assertEqual(len(s), 2)
+        logger.debug(s)
+        assert len(s) == 2
         dumped_keys = set(row[0] for row in s)
-        self.assertEqual(set(['1', '2']), dumped_keys)
+        assert {'1', '2'} == dumped_keys
 
     def _check_stderr_error(self, error):
         acceptable = ["Max sstable size of", "Consider adding more capacity", 
"JNA link failure", "Class JavaLaunchHelper is implemented in both"]
 
         if len(error) > 0:
             for line in error.splitlines():
-                self.assertTrue(any([msg in line for msg in acceptable]),
-                                'Found line \n\n"{line}"\n\n in 
error\n\n{error}'.format(line=line, error=error))
+                assert any([msg in line for msg in acceptable]), \
+                    'Found line \n\n"{line}"\n\n in 
error\n\n{error}'.format(line=line, error=error)
 
     def _get_final_sstables(self, node, ks, table):
         """
@@ -448,7 +454,7 @@ class TestOfflineTools(Tester):
         file names no longer contain tmp in their names (CASSANDRA-7066).
         """
         # Get all sstable data files
-        allsstables = map(os.path.normcase, node.get_sstables(ks, table))
+        allsstables = list(map(os.path.normcase, node.get_sstables(ks, table)))
 
         # Remove any temporary files
         tool_bin = node.get_tool('sstableutil')
@@ -457,7 +463,7 @@ class TestOfflineTools(Tester):
             env = common.make_cassandra_env(node.get_install_cassandra_root(), 
node.get_node_cassandra_root())
             p = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, 
stderr=subprocess.PIPE)
             (stdout, stderr) = p.communicate()
-            tmpsstables = map(os.path.normcase, stdout.splitlines())
+            tmpsstables = list(map(os.path.normcase, 
stdout.decode("utf-8").splitlines()))
 
             ret = list(set(allsstables) - set(tmpsstables))
         else:


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to