http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/batch_test.py
----------------------------------------------------------------------
diff --git a/batch_test.py b/batch_test.py
index f8b9881..7e7b6b2 100644
--- a/batch_test.py
+++ b/batch_test.py
@@ -1,22 +1,24 @@
 import sys
 import time
-from unittest import skipIf
-from nose.tools import assert_greater_equal
+import pytest
+import logging
 
 from cassandra import ConsistencyLevel, Timeout, Unavailable
 from cassandra.query import SimpleStatement
 
-from dtest import Tester, create_ks, debug
+from dtest import Tester, create_ks
 from tools.assertions import (assert_all, assert_invalid, assert_one,
                               assert_unavailable)
-from tools.decorators import since
 from tools.jmxutils import (JolokiaAgent, make_mbean,
                             remove_perf_disable_shared_mem)
 
+since = pytest.mark.since
+logger = logging.getLogger(__name__)
+
 
 class TestBatch(Tester):
 
-    def empty_batch_throws_no_error_test(self):
+    def test_empty_batch_throws_no_error(self):
         """
         @jira_ticket CASSANDRA-10711
         """
@@ -26,9 +28,9 @@ class TestBatch(Tester):
             APPLY BATCH;
         """)
         for node in self.cluster.nodelist():
-            self.assertEquals(0, len(node.grep_log_for_errors()))
+            assert 0 == len(node.grep_log_for_errors())
 
-    def counter_batch_accepts_counter_mutations_test(self):
+    def test_counter_batch_accepts_counter_mutations(self):
         """ Test that counter batch accepts counter mutations """
         session = self.prepare()
         session.execute("""
@@ -40,7 +42,7 @@ class TestBatch(Tester):
         """)
         assert_all(session, "SELECT total FROM clicks", [[1], [1], [1]])
 
-    def counter_batch_rejects_regular_mutations_test(self):
+    def test_counter_batch_rejects_regular_mutations(self):
         """ Test that counter batch rejects non-counter mutations """
         session = self.prepare()
         err = "Cannot include non-counter statement in a counter batch"
@@ -54,7 +56,7 @@ class TestBatch(Tester):
             APPLY BATCH
             """, matching=err)
 
-    def logged_batch_accepts_regular_mutations_test(self):
+    def test_logged_batch_accepts_regular_mutations(self):
         """ Test that logged batch accepts regular mutations """
         session = self.prepare()
         session.execute("""
@@ -63,10 +65,10 @@ class TestBatch(Tester):
             INSERT INTO users (id, firstname, lastname) VALUES (1, 'Will', 
'Turner')
             APPLY BATCH
         """)
-        assert_all(session, "SELECT * FROM users", [[1, u'Will', u'Turner'], 
[0, u'Jack', u'Sparrow']])
+        assert_all(session, "SELECT * FROM users", [[1, 'Will', 'Turner'], [0, 
'Jack', 'Sparrow']])
 
     @since('3.0')
-    def logged_batch_gcgs_below_threshold_single_table_test(self):
+    def test_logged_batch_gcgs_below_threshold_single_table(self):
         """ Test that logged batch accepts regular mutations """
         session = self.prepare()
 
@@ -84,11 +86,11 @@ class TestBatch(Tester):
                                  "batchlog entries, so setting 
gc_grace_seconds too low on tables "
                                  "involved in an atomic batch might cause 
batchlog entries to expire "
                                  "before being replayed.")
-        debug(warning)
-        self.assertEquals(1, len(warning), "Cannot find the gc_grace_seconds 
warning message.")
+        logger.debug(warning)
+        assert 1 == len(warning), "Cannot find the gc_grace_seconds warning 
message."
 
     @since('3.0')
-    def logged_batch_gcgs_below_threshold_multi_table_test(self):
+    def test_logged_batch_gcgs_below_threshold_multi_table(self):
         """ Test that logged batch accepts regular mutations """
         session = self.prepare()
         session.execute("ALTER TABLE users WITH gc_grace_seconds = 0")
@@ -111,11 +113,11 @@ class TestBatch(Tester):
                                  "batchlog entries, so setting 
gc_grace_seconds too low on tables "
                                  "involved in an atomic batch might cause 
batchlog entries to expire "
                                  "before being replayed.")
-        debug(warning)
-        self.assertEquals(1, len(warning), "Cannot find the gc_grace_seconds 
warning message.")
+        logger.debug(warning)
+        assert 1 == len(warning), "Cannot find the gc_grace_seconds warning 
message."
 
     @since('3.0')
-    def 
unlogged_batch_gcgs_below_threshold_should_not_print_warning_test(self):
+    def 
test_unlogged_batch_gcgs_below_threshold_should_not_print_warning(self):
         """ Test that logged batch accepts regular mutations """
         session = self.prepare()
         session.execute("ALTER TABLE users WITH gc_grace_seconds = 0")
@@ -127,10 +129,10 @@ class TestBatch(Tester):
         """)
         node1 = self.cluster.nodelist()[0]
         warning = node1.grep_log("setting a too low gc_grace_seconds on tables 
involved in an atomic batch")
-        debug(warning)
-        self.assertEquals(0, len(warning), "Cannot find the gc_grace_seconds 
warning message.")
+        logger.debug(warning)
+        assert 0 == len(warning), "Cannot find the gc_grace_seconds warning 
message."
 
-    def logged_batch_rejects_counter_mutations_test(self):
+    def test_logged_batch_rejects_counter_mutations(self):
         """ Test that logged batch rejects counter mutations """
         session = self.prepare()
         err = "Cannot include a counter statement in a logged batch"
@@ -143,7 +145,7 @@ class TestBatch(Tester):
             APPLY BATCH
             """, matching=err)
 
-    def unlogged_batch_accepts_regular_mutations_test(self):
+    def test_unlogged_batch_accepts_regular_mutations(self):
         """ Test that unlogged batch accepts regular mutations """
         session = self.prepare()
         session.execute("""
@@ -152,9 +154,9 @@ class TestBatch(Tester):
             INSERT INTO users (id, firstname, lastname) VALUES (2, 
'Elizabeth', 'Swann')
             APPLY BATCH
         """)
-        assert_all(session, "SELECT * FROM users", [[0, u'Jack', u'Sparrow'], 
[2, u'Elizabeth', u'Swann']])
+        assert_all(session, "SELECT * FROM users", [[0, 'Jack', 'Sparrow'], 
[2, 'Elizabeth', 'Swann']])
 
-    def unlogged_batch_rejects_counter_mutations_test(self):
+    def test_unlogged_batch_rejects_counter_mutations(self):
         """ Test that unlogged batch rejects counter mutations """
         session = self.prepare()
         err = "Counter and non-counter mutations cannot exist in the same 
batch"
@@ -167,7 +169,7 @@ class TestBatch(Tester):
             APPLY BATCH
             """, matching=err)
 
-    def logged_batch_throws_uae_test(self):
+    def test_logged_batch_throws_uae(self):
         """ Test that logged batch throws UAE if there aren't enough live 
nodes """
         session = self.prepare(nodes=3)
         [node.stop(wait_other_notice=True) for node in 
self.cluster.nodelist()[1:]]
@@ -179,7 +181,7 @@ class TestBatch(Tester):
             APPLY BATCH
         """)
 
-    def logged_batch_doesnt_throw_uae_test(self):
+    def test_logged_batch_doesnt_throw_uae(self):
         """ Test that logged batch DOES NOT throw UAE if there are at least 2 
live nodes """
         session = self.prepare(nodes=3)
         self.cluster.nodelist()[-1].stop(wait_other_notice=True)
@@ -192,10 +194,10 @@ class TestBatch(Tester):
         session.execute(query)
 
         self.cluster.nodelist()[-1].start(wait_for_binary_proto=True, 
wait_other_notice=True)
-        assert_all(session, "SELECT * FROM users", [[1, u'Will', u'Turner'], 
[0, u'Jack', u'Sparrow']],
+        assert_all(session, "SELECT * FROM users", [[1, 'Will', 'Turner'], [0, 
'Jack', 'Sparrow']],
                    cl=ConsistencyLevel.ALL)
 
-    def acknowledged_by_batchlog_not_set_when_batchlog_write_fails_test(self):
+    def test_acknowledged_by_batchlog_not_set_when_batchlog_write_fails(self):
         """ Test that acknowledged_by_batchlog is False if batchlog can't be 
written """
         session = self.prepare(nodes=3, compression=False)
         # kill 2 of the 3 nodes (all the batchlog write candidates).
@@ -207,7 +209,7 @@ class TestBatch(Tester):
             APPLY BATCH
         """, ConsistencyLevel.ONE, received_responses=0)
 
-    def acknowledged_by_batchlog_set_when_batchlog_write_succeeds_test(self):
+    def test_acknowledged_by_batchlog_set_when_batchlog_write_succeeds(self):
         """ Test that acknowledged_by_batchlog is True if batchlog can be 
written """
         session = self.prepare(nodes=3, compression=False)
         # kill one of the nodes so that batchlog will be written, but the 
write will fail.
@@ -219,7 +221,7 @@ class TestBatch(Tester):
             APPLY BATCH
         """, ConsistencyLevel.THREE, received_responses=2)
 
-    def batch_uses_proper_timestamp_test(self):
+    def test_batch_uses_proper_timestamp(self):
         """ Test that each statement will be executed with provided BATCH 
timestamp """
         session = self.prepare()
         session.execute("""
@@ -231,7 +233,7 @@ class TestBatch(Tester):
         query = "SELECT id, writetime(firstname), writetime(lastname) FROM 
users"
         assert_all(session, query, [[1, 1111111111111111, 1111111111111111], 
[0, 1111111111111111, 1111111111111111]])
 
-    def only_one_timestamp_is_valid_test(self):
+    def test_only_one_timestamp_is_valid(self):
         """ Test that TIMESTAMP must not be used in the statements within the 
batch. """
         session = self.prepare()
         assert_invalid(session, """
@@ -241,7 +243,7 @@ class TestBatch(Tester):
             APPLY BATCH
         """, matching="Timestamp must be set either on BATCH or individual 
statements")
 
-    def each_statement_in_batch_uses_proper_timestamp_test(self):
+    def test_each_statement_in_batch_uses_proper_timestamp(self):
         """ Test that each statement will be executed with its own timestamp 
"""
         session = self.prepare()
         session.execute("""
@@ -254,9 +256,8 @@ class TestBatch(Tester):
         query = "SELECT id, writetime(firstname), writetime(lastname) FROM 
users"
         assert_all(session, query, [[1, 1111111111111112, 1111111111111112], 
[0, 1111111111111111, 1111111111111111]])
 
-    def multi_table_batch_for_10554_test(self):
+    def test_multi_table_batch_for_10554(self):
         """ Test a batch on 2 tables having different columns, restarting the 
node afterwards, to reproduce CASSANDRA-10554 """
-
         session = self.prepare()
 
         # prepare() adds users and clicks but clicks is a counter table, so 
adding a random other table for this test.
@@ -289,7 +290,7 @@ class TestBatch(Tester):
         assert_one(session, "SELECT * FROM dogs", [0, 'Pluto'])
 
     @since('3.0', max_version='3.x')
-    def logged_batch_compatibility_1_test(self):
+    def test_logged_batch_compatibility_1(self):
         """
         @jira_ticket CASSANDRA-9673, test that logged batches still work with 
a mixed version cluster.
 
@@ -298,7 +299,7 @@ class TestBatch(Tester):
         self._logged_batch_compatibility_test(0, 1, 
'github:apache/cassandra-2.2', 2, 4)
 
     @since('3.0', max_version='3.x')
-    def batchlog_replay_compatibility_1_test(self):
+    def test_batchlog_replay_compatibility_1(self):
         """
         @jira_ticket CASSANDRA-9673, test that logged batches still work with 
a mixed version cluster.
 
@@ -307,8 +308,8 @@ class TestBatch(Tester):
         self._batchlog_replay_compatibility_test(0, 1, 
'github:apache/cassandra-2.2', 2, 4)
 
     @since('3.0', max_version='3.x')
-    @skipIf(sys.platform == 'win32', 'Windows production support only on 2.2+')
-    def logged_batch_compatibility_2_test(self):
+    @pytest.mark.skipif(sys.platform == 'win32', reason='Windows production 
support only on 2.2+')
+    def test_logged_batch_compatibility_2(self):
         """
         @jira_ticket CASSANDRA-9673, test that logged batches still work with 
a mixed version cluster.
 
@@ -317,8 +318,8 @@ class TestBatch(Tester):
         self._logged_batch_compatibility_test(0, 1, 
'github:apache/cassandra-2.1', 2, 3)
 
     @since('3.0', max_version='3.x')
-    @skipIf(sys.platform == 'win32', 'Windows production support only on 2.2+')
-    def logged_batch_compatibility_3_test(self):
+    @pytest.mark.skipif(sys.platform == 'win32', reason='Windows production 
support only on 2.2+')
+    def test_logged_batch_compatibility_3(self):
         """
         @jira_ticket CASSANDRA-9673, test that logged batches still work with 
a mixed version cluster.
 
@@ -327,7 +328,7 @@ class TestBatch(Tester):
         self._logged_batch_compatibility_test(0, 2, 
'github:apache/cassandra-2.1', 1, 3)
 
     @since('3.0', max_version='3.x')
-    def logged_batch_compatibility_4_test(self):
+    def test_logged_batch_compatibility_4(self):
         """
         @jira_ticket CASSANDRA-9673, test that logged batches still work with 
a mixed version cluster.
 
@@ -336,7 +337,7 @@ class TestBatch(Tester):
         self._logged_batch_compatibility_test(2, 2, 
'github:apache/cassandra-2.2', 1, 4)
 
     @since('3.0', max_version='3.x')
-    def batchlog_replay_compatibility_4_test(self):
+    def test_batchlog_replay_compatibility_4(self):
         """
         @jira_ticket CASSANDRA-9673, test that logged batches still work with 
a mixed version cluster.
 
@@ -345,8 +346,8 @@ class TestBatch(Tester):
         self._batchlog_replay_compatibility_test(2, 2, 
'github:apache/cassandra-2.2', 1, 4)
 
     @since('3.0', max_version='3.x')
-    @skipIf(sys.platform == 'win32', 'Windows production support only on 2.2+')
-    def logged_batch_compatibility_5_test(self):
+    @pytest.mark.skipif(sys.platform == 'win32', reason='Windows production 
support only on 2.2+')
+    def test_logged_batch_compatibility_5(self):
         """
         @jira_ticket CASSANDRA-9673, test that logged batches still work with 
a mixed version cluster.
 
@@ -365,7 +366,7 @@ class TestBatch(Tester):
         session.execute(query)
         rows = session.execute("SELECT id, firstname, lastname FROM users")
         res = sorted(rows)
-        self.assertEquals([[0, 'Jack', 'Sparrow'], [1, 'Will', 'Turner']], 
[list(res[0]), list(res[1])])
+        assert [[0, 'Jack', 'Sparrow'], [1, 'Will', 'Turner']], [list(res[0]) 
== list(res[1])]
 
     def _batchlog_replay_compatibility_test(self, coordinator_idx, 
current_nodes, previous_version, previous_nodes, protocol_version):
         session = self.prepare_mixed(coordinator_idx, current_nodes, 
previous_version, previous_nodes,
@@ -373,7 +374,7 @@ class TestBatch(Tester):
 
         coordinator = self.cluster.nodelist()[coordinator_idx]
         coordinator.byteman_submit(['./byteman/fail_after_batchlog_write.btm'])
-        debug("Injected byteman scripts to enable batchlog replay 
{}".format(coordinator.name))
+        logger.debug("Injected byteman scripts to enable batchlog replay 
{}".format(coordinator.name))
 
         query = """
             BEGIN BATCH
@@ -387,7 +388,7 @@ class TestBatch(Tester):
         # 2 * write_request_timeout_in_ms ms: 1x timeout for all mutations to 
be written,
         # and another 1x timeout for batch remove mutation to be received.
         delay = 2 * coordinator.get_conf_option('write_request_timeout_in_ms') 
/ 1000.0 + 1
-        debug('Sleeping for {}s for the batches to not be 
skipped'.format(delay))
+        logger.debug('Sleeping for {}s for the batches to not be 
skipped'.format(delay))
         time.sleep(delay)
 
         total_batches_replayed = 0
@@ -398,18 +399,18 @@ class TestBatch(Tester):
                 continue
 
             with JolokiaAgent(n) as jmx:
-                debug('Forcing batchlog replay for {}'.format(n.name))
+                logger.debug('Forcing batchlog replay for {}'.format(n.name))
                 jmx.execute_method(blm, 'forceBatchlogReplay')
                 batches_replayed = jmx.read_attribute(blm, 
'TotalBatchesReplayed')
-                debug('{} batches replayed on node 
{}'.format(batches_replayed, n.name))
+                logger.debug('{} batches replayed on node 
{}'.format(batches_replayed, n.name))
                 total_batches_replayed += batches_replayed
 
-        assert_greater_equal(total_batches_replayed, 2)
+        assert total_batches_replayed >= 2
 
         for node in self.cluster.nodelist():
             session = self.patient_exclusive_cql_connection(node, 
protocol_version=protocol_version)
             rows = sorted(session.execute('SELECT id, firstname, lastname FROM 
ks.users'))
-            self.assertEqual([[0, 'Jack', 'Sparrow'], [1, 'Will', 'Turner']], 
[list(rows[0]), list(rows[1])])
+            assert [[0, 'Jack', 'Sparrow'], [1, 'Will', 'Turner']], 
[list(rows[0]) == list(rows[1])]
 
     def assert_timedout(self, session, query, cl, acknowledged_by=None,
                         received_responses=None):
@@ -420,12 +421,12 @@ class TestBatch(Tester):
             if received_responses is not None:
                 msg = "Expecting received_responses to be {}, got: {}".format(
                     received_responses, e.received_responses,)
-                self.assertEqual(e.received_responses, received_responses, msg)
+                assert e.received_responses == received_responses, msg
         except Unavailable as e:
             if received_responses is not None:
                 msg = "Expecting alive_replicas to be {}, got: {}".format(
                     received_responses, e.alive_replicas,)
-                self.assertEqual(e.alive_replicas, received_responses, msg)
+                assert e.alive_replicas == received_responses, msg
         except Exception as e:
             assert False, "Expecting TimedOutException, got:" + str(e)
         else:
@@ -434,7 +435,7 @@ class TestBatch(Tester):
     def prepare(self, nodes=1, compression=True, version=None, 
protocol_version=None, install_byteman=False):
         if version:
             self.cluster.set_install_dir(version=version)
-            debug("Set cassandra dir to 
{}".format(self.cluster.get_install_dir()))
+            logger.debug("Set cassandra dir to 
{}".format(self.cluster.get_install_dir()))
 
         self.cluster.populate(nodes, install_byteman=install_byteman)
 
@@ -449,7 +450,7 @@ class TestBatch(Tester):
         return session
 
     def create_schema(self, session, rf):
-        debug('Creating schema...')
+        logger.debug('Creating schema...')
         create_ks(session, 'ks', rf)
 
         session.execute("""
@@ -472,19 +473,22 @@ class TestBatch(Tester):
 
         time.sleep(.5)
 
-    def prepare_mixed(self, coordinator_idx, current_nodes, previous_version, 
previous_nodes, compression=True, protocol_version=None, install_byteman=False):
-        debug("Testing with {} node(s) at version '{}', {} node(s) at current 
version"
+    def prepare_mixed(self, coordinator_idx, current_nodes, previous_version, 
previous_nodes, compression=True,
+                      protocol_version=None, install_byteman=False):
+        logger.debug("Testing with {} node(s) at version '{}', {} node(s) at 
current version"
               .format(previous_nodes, previous_version, current_nodes))
 
         # start a cluster using the previous version
-        self.prepare(previous_nodes + current_nodes, compression, 
previous_version, protocol_version=protocol_version, 
install_byteman=install_byteman)
+        self.prepare(previous_nodes + current_nodes, compression, 
previous_version, protocol_version=protocol_version,
+                     install_byteman=install_byteman)
 
         # then upgrade the current nodes to the current version but not the 
previous nodes
-        for i in xrange(current_nodes):
+        for i in range(current_nodes):
             node = self.cluster.nodelist()[i]
             self.upgrade_node(node)
 
-        session = 
self.patient_exclusive_cql_connection(self.cluster.nodelist()[coordinator_idx], 
protocol_version=protocol_version)
+        session = 
self.patient_exclusive_cql_connection(self.cluster.nodelist()[coordinator_idx],
+                                                        
protocol_version=protocol_version)
         session.execute('USE ks')
         return session
 
@@ -492,13 +496,13 @@ class TestBatch(Tester):
         """
         Upgrade a node to the current version
         """
-        debug('Upgrading {} to the current version'.format(node.name))
-        debug('Shutting down {}'.format(node.name))
+        logger.debug('Upgrading {} to the current version'.format(node.name))
+        logger.debug('Shutting down {}'.format(node.name))
         node.stop(wait_other_notice=False)
         self.set_node_to_current_version(node)
-        debug("Set cassandra dir for {} to {}".format(node.name, 
node.get_install_dir()))
+        logger.debug("Set cassandra dir for {} to {}".format(node.name, 
node.get_install_dir()))
         # needed for jmx
         remove_perf_disable_shared_mem(node)
         # Restart nodes on new version
-        debug('Starting {} on new version ({})'.format(node.name, 
node.get_cassandra_version()))
+        logger.debug('Starting {} on new version ({})'.format(node.name, 
node.get_cassandra_version()))
         node.start(wait_other_notice=True, wait_for_binary_proto=True)

http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/bin/collect_known_failures.py
----------------------------------------------------------------------
diff --git a/bin/collect_known_failures.py b/bin/collect_known_failures.py
deleted file mode 100644
index cf66cd1..0000000
--- a/bin/collect_known_failures.py
+++ /dev/null
@@ -1,58 +0,0 @@
-"""
-A script that runs the tests with --collect-only, but instead of just printing
-the tests' names, prints the information added by the 
tools.decorators.known_failure
-decorator.
-
-This is basically a wrapper around the `nosetests` command, so it takes the
-same arguments, though it appends some arguments to sys.argv. In particular,
-if you want to look at particular kinds of known failures, use the `-a`
-parameter on this script as you would for any of the known_failures attributes.
-In addition, you should call it from the same directory from which you'd call
-`nosetests`.
-"""
-
-import json
-import os
-import sys
-from functools import partial
-
-import nose
-
-
-class PrintJiraURLPlugin(nose.plugins.Plugin):
-    enabled = True
-
-    def options(self, parser, env):
-        super(PrintJiraURLPlugin, self).configure(parser, env)
-
-    def testName(self, test):
-        _, test_module, test_name = test.address()
-        test_method_name = test_name.split('.')[-1]
-        test_method = getattr(test.test, test_method_name)
-
-        get_attr_for_current_method = partial(
-            nose.plugins.attrib.get_method_attr,
-            method=test_method,
-            cls=test.test,
-        )
-
-        failure_annotations = 
get_attr_for_current_method(attr_name='known_failure')
-
-        return json.dumps({
-            'module': test_module,
-            'name': test_name,
-            'failure_annotations': failure_annotations
-        })
-
-
-if __name__ == '__main__':
-    argv = sys.argv + ['--collect-only', '-v']
-
-    # The tests need a CASSANDRA_VERSION or CASSANDRA_DIR environment variable
-    # to run at all, so we specify it here. However, we have to do so by
-    # modifying os.environ, rather than using the env parameter to nose.main,
-    # because env does not do what you think it does:
-    # http://stackoverflow.com/a/28611124
-    os.environ['CASSANDRA_VERSION'] = 'git:trunk'
-
-    nose.main(addplugins=[PrintJiraURLPlugin()], argv=argv)

http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/bootstrap_test.py
----------------------------------------------------------------------
diff --git a/bootstrap_test.py b/bootstrap_test.py
index efa84ec..22dddcd 100644
--- a/bootstrap_test.py
+++ b/bootstrap_test.py
@@ -5,34 +5,40 @@ import shutil
 import tempfile
 import threading
 import time
+import logging
+import signal
 
 from cassandra import ConsistencyLevel
 from cassandra.concurrent import execute_concurrent_with_args
 from ccmlib.node import NodeError
 
-from dtest import DISABLE_VNODES, Tester, debug, create_ks, create_cf
+import pytest
+
+from dtest import Tester, create_ks, create_cf
 from tools.assertions import (assert_almost_equal, assert_bootstrap_state, 
assert_not_running,
                               assert_one, assert_stderr_clean)
 from tools.data import query_c1c2
-from tools.decorators import no_vnodes, since
 from tools.intervention import InterruptBootstrap, KillOnBootstrap
 from tools.misc import new_node
-from tools.misc import generate_ssl_stores
-
-
-class BaseBootstrapTest(Tester):
-    __test__ = False
-
-    allow_log_errors = True
-    ignore_log_patterns = (
-        # This one occurs when trying to send the migration to a
-        # node that hasn't started yet, and when it does, it gets
-        # replayed and everything is fine.
-        r'Can\'t send migration request: node.*is down',
-        # ignore streaming error during bootstrap
-        r'Exception encountered during startup',
-        r'Streaming error occurred'
-    )
+from tools.misc import generate_ssl_stores, retry_till_success
+
+since = pytest.mark.since
+logger = logging.getLogger(__name__)
+
+class TestBootstrap(Tester):
+
+    @pytest.fixture(autouse=True)
+    def fixture_add_additional_log_patterns(self, fixture_dtest_setup):
+        fixture_dtest_setup.allow_log_errors = True
+        fixture_dtest_setup.ignore_log_patterns = (
+            # This one occurs when trying to send the migration to a
+            # node that hasn't started yet, and when it does, it gets
+            # replayed and everything is fine.
+            r'Can\'t send migration request: node.*is down',
+            # ignore streaming error during bootstrap
+            r'Exception encountered during startup',
+            r'Streaming error occurred'
+        )
 
     def _base_bootstrap_test(self, bootstrap=None, bootstrap_from_version=None,
                              enable_ssl=None):
@@ -48,14 +54,14 @@ class BaseBootstrapTest(Tester):
         cluster = self.cluster
 
         if enable_ssl:
-            debug("***using internode ssl***")
-            generate_ssl_stores(self.test_path)
-            cluster.enable_internode_ssl(self.test_path)
+            logger.debug("***using internode ssl***")
+            generate_ssl_stores(self.fixture_dtest_setup.test_path)
+            cluster.enable_internode_ssl(self.fixture_dtest_setup.test_path)
 
         tokens = cluster.balanced_tokens(2)
         cluster.set_configuration_options(values={'num_tokens': 1})
 
-        debug("[node1, node2] tokens: %r" % (tokens,))
+        logger.debug("[node1, node2] tokens: %r" % (tokens,))
 
         keys = 10000
 
@@ -63,7 +69,7 @@ class BaseBootstrapTest(Tester):
         cluster.populate(1)
         node1 = cluster.nodelist()[0]
         if bootstrap_from_version:
-            debug("starting source node on version 
{}".format(bootstrap_from_version))
+            logger.debug("starting source node on version 
{}".format(bootstrap_from_version))
             node1.set_install_dir(version=bootstrap_from_version)
         node1.set_configuration_options(values={'initial_token': tokens[0]})
         cluster.start(wait_other_notice=True)
@@ -74,7 +80,7 @@ class BaseBootstrapTest(Tester):
 
         # record the size before inserting any of our own data
         empty_size = node1.data_size()
-        debug("node1 empty size : %s" % float(empty_size))
+        logger.debug("node1 empty size : %s" % float(empty_size))
 
         insert_statement = session.prepare("INSERT INTO ks.cf (key, c1, c2) 
VALUES (?, 'value1', 'value2')")
         execute_concurrent_with_args(session, insert_statement, [['k%d' % k] 
for k in range(keys)])
@@ -82,25 +88,23 @@ class BaseBootstrapTest(Tester):
         node1.flush()
         node1.compact()
         initial_size = node1.data_size()
-        debug("node1 size before bootstrapping node2: %s" % 
float(initial_size))
+        logger.debug("node1 size before bootstrapping node2: %s" % 
float(initial_size))
 
         # Reads inserted data all during the bootstrap process. We shouldn't
         # get any error
-        reader = self.go(lambda _: query_c1c2(session, random.randint(0, keys 
- 1), ConsistencyLevel.ONE))
+        query_c1c2(session, random.randint(0, keys - 1), ConsistencyLevel.ONE)
+        session.shutdown()
 
         # Bootstrapping a new node in the current version
         node2 = bootstrap(cluster, tokens[1])
         node2.compact()
 
-        reader.check()
         node1.cleanup()
-        debug("node1 size after cleanup: %s" % float(node1.data_size()))
+        logger.debug("node1 size after cleanup: %s" % float(node1.data_size()))
         node1.compact()
-        debug("node1 size after compacting: %s" % float(node1.data_size()))
-        time.sleep(.5)
-        reader.check()
+        logger.debug("node1 size after compacting: %s" % 
float(node1.data_size()))
 
-        debug("node2 size after compacting: %s" % float(node2.data_size()))
+        logger.debug("node2 size after compacting: %s" % 
float(node2.data_size()))
 
         size1 = float(node1.data_size())
         size2 = float(node2.data_size())
@@ -108,40 +112,34 @@ class BaseBootstrapTest(Tester):
         assert_almost_equal(float(initial_size - empty_size), 2 * (size1 - 
float(empty_size)))
 
         assert_bootstrap_state(self, node2, 'COMPLETED')
-        if bootstrap_from_version:
-            self.assertTrue(node2.grep_log('does not support keep-alive', 
filename='debug.log'))
 
-
-class TestBootstrap(BaseBootstrapTest):
-    __test__ = True
-
-    @no_vnodes()
-    def simple_bootstrap_test_with_ssl(self):
+    @pytest.mark.no_vnodes
+    def test_simple_bootstrap_with_ssl(self):
         self._base_bootstrap_test(enable_ssl=True)
 
-    @no_vnodes()
-    def simple_bootstrap_test(self):
+    @pytest.mark.no_vnodes
+    def test_simple_bootstrap(self):
         self._base_bootstrap_test()
 
-    @no_vnodes()
-    def bootstrap_on_write_survey_test(self):
+    @pytest.mark.no_vnodes
+    def test_bootstrap_on_write_survey(self):
         def bootstrap_on_write_survey_and_join(cluster, token):
             node2 = new_node(cluster)
             node2.set_configuration_options(values={'initial_token': token})
             node2.start(jvm_args=["-Dcassandra.write_survey=true"], 
wait_for_binary_proto=True)
 
-            self.assertTrue(len(node2.grep_log('Startup complete, but write 
survey mode is active, not becoming an active ring member.')))
+            assert len(node2.grep_log('Startup complete, but write survey mode 
is active, not becoming an active ring member.'))
             assert_bootstrap_state(self, node2, 'IN_PROGRESS')
 
             node2.nodetool("join")
-            self.assertTrue(len(node2.grep_log('Leaving write survey mode and 
joining ring at operator request')))
+            assert len(node2.grep_log('Leaving write survey mode and joining 
ring at operator request'))
             return node2
 
         self._base_bootstrap_test(bootstrap_on_write_survey_and_join)
 
     @since('3.10')
-    @no_vnodes()
-    def simple_bootstrap_test_small_keepalive_period(self):
+    @pytest.mark.no_vnodes
+    def test_simple_bootstrap_small_keepalive_period(self):
         """
         @jira_ticket CASSANDRA-11841
         Test that bootstrap completes if it takes longer than 
streaming_socket_timeout_in_ms or
@@ -157,7 +155,7 @@ class TestBootstrap(BaseBootstrapTest):
         cluster.populate(1)
         node1 = cluster.nodelist()[0]
 
-        debug("Setting up byteman on {}".format(node1.name))
+        logger.debug("Setting up byteman on {}".format(node1.name))
         # set up byteman
         node1.byteman_port = '8100'
         node1.import_config_files()
@@ -169,7 +167,7 @@ class TestBootstrap(BaseBootstrapTest):
                       'compaction(strategy=SizeTieredCompactionStrategy, 
enabled=false)'])
         cluster.flush()
 
-        debug("Submitting byteman script to {} to".format(node1.name))
+        logger.debug("Submitting byteman script to {} to".format(node1.name))
         # Sleep longer than streaming_socket_timeout_in_ms to make sure the 
node will not be killed
         node1.byteman_submit(['./byteman/stream_5s_sleep.btm'])
 
@@ -181,16 +179,15 @@ class TestBootstrap(BaseBootstrapTest):
         assert_bootstrap_state(self, node2, 'COMPLETED')
 
         for node in cluster.nodelist():
-            self.assertTrue(node.grep_log('Scheduling keep-alive task with 2s 
period.', filename='debug.log'))
-            self.assertTrue(node.grep_log('Sending keep-alive', 
filename='debug.log'))
-            self.assertTrue(node.grep_log('Received keep-alive', 
filename='debug.log'))
+            assert node.grep_log('Scheduling keep-alive task with 2s period.', 
filename='debug.log')
+            assert node.grep_log('Sending keep-alive', filename='debug.log')
+            assert node.grep_log('Received keep-alive', filename='debug.log')
 
-    def simple_bootstrap_test_nodata(self):
+    def test_simple_bootstrap_nodata(self):
         """
         @jira_ticket CASSANDRA-11010
         Test that bootstrap completes if streaming from nodes with no data
         """
-
         cluster = self.cluster
         # Create a two-node cluster
         cluster.populate(2)
@@ -202,7 +199,7 @@ class TestBootstrap(BaseBootstrapTest):
 
         assert_bootstrap_state(self, node3, 'COMPLETED')
 
-    def read_from_bootstrapped_node_test(self):
+    def test_read_from_bootstrapped_node(self):
         """
         Test bootstrapped node sees existing data
         @jira_ticket CASSANDRA-6648
@@ -223,18 +220,18 @@ class TestBootstrap(BaseBootstrapTest):
 
         session = self.patient_exclusive_cql_connection(node4)
         new_rows = list(session.execute("SELECT * FROM %s" % (stress_table,)))
-        self.assertEquals(original_rows, new_rows)
+        assert original_rows == new_rows
 
-    def 
consistent_range_movement_true_with_replica_down_should_fail_test(self):
+    def 
test_consistent_range_movement_true_with_replica_down_should_fail(self):
         self._bootstrap_test_with_replica_down(True)
 
-    def 
consistent_range_movement_false_with_replica_down_should_succeed_test(self):
+    def 
test_consistent_range_movement_false_with_replica_down_should_succeed(self):
         self._bootstrap_test_with_replica_down(False)
 
-    def consistent_range_movement_true_with_rf1_should_fail_test(self):
+    def test_consistent_range_movement_true_with_rf1_should_fail(self):
         self._bootstrap_test_with_replica_down(True, rf=1)
 
-    def consistent_range_movement_false_with_rf1_should_succeed_test(self):
+    def test_consistent_range_movement_false_with_rf1_should_succeed(self):
         self._bootstrap_test_with_replica_down(False, rf=1)
 
     def _bootstrap_test_with_replica_down(self, consistent_range_movement, 
rf=2):
@@ -249,10 +246,10 @@ class TestBootstrap(BaseBootstrapTest):
 
         node3_token = None
         # Make token assignment deterministic
-        if DISABLE_VNODES:
+        if not self.dtest_config.use_vnodes:
             cluster.set_configuration_options(values={'num_tokens': 1})
             tokens = cluster.balanced_tokens(3)
-            debug("non-vnode tokens: %r" % (tokens,))
+            logger.debug("non-vnode tokens: %r" % (tokens,))
             node1.set_configuration_options(values={'initial_token': 
tokens[0]})
             node2.set_configuration_options(values={'initial_token': 
tokens[2]})
             node3_token = tokens[1]  # Add node 3 between node1 and node2
@@ -283,7 +280,7 @@ class TestBootstrap(BaseBootstrapTest):
             # with rf=1 and cassandra.consistent.rangemovement=false, missing 
sources are ignored
             if not consistent_range_movement and rf == 1:
                 node3.watch_log_for("Unable to find sufficient sources for 
streaming range")
-            self.assertTrue(node3.is_running())
+            assert node3.is_running()
             assert_bootstrap_state(self, node3, 'COMPLETED')
         else:
             if consistent_range_movement:
@@ -293,11 +290,10 @@ class TestBootstrap(BaseBootstrapTest):
             assert_not_running(node3)
 
     @since('2.2')
-    def resumable_bootstrap_test(self):
+    def test_resumable_bootstrap(self):
         """
         Test resuming bootstrap after data streaming failure
         """
-
         cluster = self.cluster
         cluster.populate(2)
 
@@ -323,7 +319,7 @@ class TestBootstrap(BaseBootstrapTest):
         node3.watch_log_for("Starting listening for CQL clients")
         mark = node3.mark_log()
         # check if node3 is still in bootstrap mode
-        assert_bootstrap_state(self, node3, 'IN_PROGRESS')
+        retry_till_success(assert_bootstrap_state, tester=self, node=node3, 
expected_bootstrap_state='IN_PROGRESS', timeout=120)
 
         # bring back node1 and invoke nodetool bootstrap to resume 
bootstrapping
         node3.nodetool('bootstrap resume')
@@ -334,17 +330,16 @@ class TestBootstrap(BaseBootstrapTest):
         # cleanup to guarantee each node will only have sstables of its ranges
         cluster.cleanup()
 
-        debug("Check data is present")
+        logger.debug("Check data is present")
         # Let's check stream bootstrap completely transferred data
         stdout, stderr, _ = node3.stress(['read', 'n=1k', 'no-warmup', 
'-schema', 'replication(factor=2)', '-rate', 'threads=8'])
 
         if stdout is not None:
-            self.assertNotIn("FAILURE", stdout)
+            assert "FAILURE" not in stdout.decode("utf-8")
 
     @since('2.2')
-    def bootstrap_with_reset_bootstrap_state_test(self):
+    def test_bootstrap_with_reset_bootstrap_state(self):
         """Test bootstrap with resetting bootstrap progress"""
-
         cluster = self.cluster
         
cluster.set_configuration_options(values={'stream_throughput_outbound_megabits_per_sec':
 1})
         cluster.populate(2).start(wait_other_notice=True)
@@ -367,7 +362,7 @@ class TestBootstrap(BaseBootstrapTest):
         node1.start()
 
         # restart node3 bootstrap with resetting bootstrap progress
-        node3.stop()
+        node3.stop(signal_event=signal.SIGKILL)
         mark = node3.mark_log()
         node3.start(jvm_args=["-Dcassandra.reset_bootstrap_progress=true"])
         # check if we reset bootstrap state
@@ -378,7 +373,7 @@ class TestBootstrap(BaseBootstrapTest):
         # check if 2nd bootstrap succeeded
         assert_bootstrap_state(self, node3, 'COMPLETED')
 
-    def manual_bootstrap_test(self):
+    def test_manual_bootstrap(self):
         """
             Test adding a new node and bootstrapping it manually. No 
auto_bootstrap.
             This test also verify that all data are OK after the addition of 
the new node.
@@ -403,14 +398,13 @@ class TestBootstrap(BaseBootstrapTest):
         node1.cleanup()
 
         current_rows = list(session.execute("SELECT * FROM %s" % stress_table))
-        self.assertEquals(original_rows, current_rows)
+        assert original_rows == current_rows
 
-    def local_quorum_bootstrap_test(self):
+    def test_local_quorum_bootstrap(self):
         """
         Test that CL local_quorum works while a node is bootstrapping.
         @jira_ticket CASSANDRA-8058
         """
-
         cluster = self.cluster
         cluster.populate([1, 1])
         cluster.start()
@@ -453,16 +447,16 @@ class TestBootstrap(BaseBootstrapTest):
                                         '-rate', 'threads=5',
                                         '-errors', 'retries=2'])
 
-        debug(out)
+        logger.debug(out)
         assert_stderr_clean(err)
         regex = re.compile("Operation.+error inserting key.+Exception")
-        failure = regex.search(out)
-        self.assertIsNone(failure, "Error during stress while bootstrapping")
+        failure = regex.search(str(out))
+        assert failure is None, "Error during stress while bootstrapping"
 
-    def shutdown_wiped_node_cannot_join_test(self):
+    def test_shutdown_wiped_node_cannot_join(self):
         self._wiped_node_cannot_join_test(gently=True)
 
-    def killed_wiped_node_cannot_join_test(self):
+    def test_killed_wiped_node_cannot_join(self):
         self._wiped_node_cannot_join_test(gently=False)
 
     def _wiped_node_cannot_join_test(self, gently):
@@ -490,7 +484,7 @@ class TestBootstrap(BaseBootstrapTest):
         node4.start(wait_for_binary_proto=True)
 
         session = self.patient_cql_connection(node4)
-        self.assertEquals(original_rows, list(session.execute("SELECT * FROM 
{}".format(stress_table,))))
+        assert original_rows == list(session.execute("SELECT * FROM 
{}".format(stress_table,)))
 
         # Stop the new node and wipe its data
         node4.stop(gently=gently)
@@ -500,7 +494,7 @@ class TestBootstrap(BaseBootstrapTest):
         node4.start(no_wait=True, wait_other_notice=False)
         node4.watch_log_for("A node with address {} already exists, cancelling 
join".format(node4.address_for_current_version_slashy()), from_mark=mark)
 
-    def decommissioned_wiped_node_can_join_test(self):
+    def test_decommissioned_wiped_node_can_join(self):
         """
         @jira_ticket CASSANDRA-9765
         Test that if we decommission a node and then wipe its data, it can 
join the cluster.
@@ -523,7 +517,7 @@ class TestBootstrap(BaseBootstrapTest):
         node4.start(wait_for_binary_proto=True, wait_other_notice=True)
 
         session = self.patient_cql_connection(node4)
-        self.assertEquals(original_rows, list(session.execute("SELECT * FROM 
{}".format(stress_table,))))
+        assert original_rows == list(session.execute("SELECT * FROM 
{}".format(stress_table,)))
 
         # Decommission the new node and wipe its data
         node4.decommission()
@@ -534,7 +528,7 @@ class TestBootstrap(BaseBootstrapTest):
         node4.start(wait_other_notice=True)
         node4.watch_log_for("JOINING:", from_mark=mark)
 
-    def decommissioned_wiped_node_can_gossip_to_single_seed_test(self):
+    def test_decommissioned_wiped_node_can_gossip_to_single_seed(self):
         """
         @jira_ticket CASSANDRA-8072
         @jira_ticket CASSANDRA-8422
@@ -559,26 +553,26 @@ class TestBootstrap(BaseBootstrapTest):
         session.execute("ALTER KEYSPACE system_traces WITH REPLICATION = 
{'class':'SimpleStrategy', 'replication_factor':'1'};")
 
         # Decommision the new node and kill it
-        debug("Decommissioning & stopping node2")
+        logger.debug("Decommissioning & stopping node2")
         node2.decommission()
         node2.stop(wait_other_notice=False)
 
         # Wipe its data
         for data_dir in node2.data_directories():
-            debug("Deleting {}".format(data_dir))
+            logger.debug("Deleting {}".format(data_dir))
             shutil.rmtree(data_dir)
 
         commitlog_dir = os.path.join(node2.get_path(), 'commitlogs')
-        debug("Deleting {}".format(commitlog_dir))
+        logger.debug("Deleting {}".format(commitlog_dir))
         shutil.rmtree(commitlog_dir)
 
         # Now start it, it should be allowed to join
         mark = node2.mark_log()
-        debug("Restarting wiped node2")
+        logger.debug("Restarting wiped node2")
         node2.start(wait_other_notice=False)
         node2.watch_log_for("JOINING:", from_mark=mark)
 
-    def failed_bootstrap_wiped_node_can_join_test(self):
+    def test_failed_bootstrap_wiped_node_can_join(self):
         """
         @jira_ticket CASSANDRA-9765
         Test that if a node fails to bootstrap, it can join the cluster even 
if the data is wiped.
@@ -607,7 +601,7 @@ class TestBootstrap(BaseBootstrapTest):
 
         node2.start()
         t.join()
-        self.assertFalse(node2.is_running())
+        assert not node2.is_running()
 
         # wipe any data for node2
         self._cleanup(node2)
@@ -617,7 +611,7 @@ class TestBootstrap(BaseBootstrapTest):
         node2.watch_log_for("JOINING:", from_mark=mark)
 
     @since('2.1.1')
-    def simultaneous_bootstrap_test(self):
+    def test_simultaneous_bootstrap(self):
         """
         Attempt to bootstrap two nodes at once, to assert the second 
bootstrapped node fails, and does not interfere.
 
@@ -660,7 +654,7 @@ class TestBootstrap(BaseBootstrapTest):
         # Repeat the select count(*) query, to help catch
         # bugs like 9484, where count(*) fails at higher
         # data loads.
-        for _ in xrange(5):
+        for _ in range(5):
             assert_one(session, "SELECT count(*) from keyspace1.standard1", 
[500000], cl=ConsistencyLevel.ONE)
 
     def test_cleanup(self):
@@ -673,7 +667,7 @@ class TestBootstrap(BaseBootstrapTest):
         cluster.populate(1)
         cluster.start(wait_for_binary_proto=True)
         node1, = cluster.nodelist()
-        for x in xrange(0, 5):
+        for x in range(0, 5):
             node1.stress(['write', 'n=100k', 'no-warmup', '-schema', 
'compaction(strategy=SizeTieredCompactionStrategy,enabled=false)', 
'replication(factor=1)', '-rate', 'threads=10'])
             node1.flush()
         node2 = new_node(cluster)
@@ -682,20 +676,21 @@ class TestBootstrap(BaseBootstrapTest):
         failed = threading.Event()
         jobs = 1
         thread = threading.Thread(target=self._monitor_datadir, args=(node1, 
event, len(node1.get_sstables("keyspace1", "standard1")), jobs, failed))
+        thread.setDaemon(True)
         thread.start()
         node1.nodetool("cleanup -j {} keyspace1 standard1".format(jobs))
         event.set()
         thread.join()
-        self.assertFalse(failed.is_set())
+        assert not failed.is_set()
 
     def _monitor_datadir(self, node, event, basecount, jobs, failed):
         while True:
             sstables = [s for s in node.get_sstables("keyspace1", "standard1") 
if "tmplink" not in s]
-            debug("---")
+            logger.debug("---")
             for sstable in sstables:
-                debug(sstable)
+                logger.debug(sstable)
             if len(sstables) > basecount + jobs:
-                debug("Current count is {}, basecount was 
{}".format(len(sstables), basecount))
+                logger.debug("Current count is {}, basecount was 
{}".format(len(sstables), basecount))
                 failed.set()
                 return
             if event.is_set():
@@ -705,6 +700,6 @@ class TestBootstrap(BaseBootstrapTest):
     def _cleanup(self, node):
         commitlog_dir = os.path.join(node.get_path(), 'commitlogs')
         for data_dir in node.data_directories():
-            debug("Deleting {}".format(data_dir))
+            logger.debug("Deleting {}".format(data_dir))
             shutil.rmtree(data_dir)
         shutil.rmtree(commitlog_dir)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to