http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/dtest.py
----------------------------------------------------------------------
diff --git a/dtest.py b/dtest.py
index 25c52e3..aad9e58 100644
--- a/dtest.py
+++ b/dtest.py
@@ -1,50 +1,30 @@
-from __future__ import with_statement
-
-import ConfigParser
+import configparser
 import copy
-import errno
-import glob
 import logging
 import os
-import pprint
 import re
-import shutil
-import signal
 import subprocess
 import sys
-import tempfile
-import thread
 import threading
 import time
 import traceback
-import types
-import unittest.case
-from collections import OrderedDict
-from subprocess import CalledProcessError
-from unittest import TestCase
-
+import pytest
 import cassandra
 import ccmlib.repository
-from cassandra import ConsistencyLevel
+
+from subprocess import CalledProcessError
+
+from flaky import flaky
+
+from cassandra import ConsistencyLevel, OperationTimedOut
 from cassandra.auth import PlainTextAuthProvider
-from cassandra.cluster import Cluster as PyCluster
-from cassandra.cluster import NoHostAvailable
-from cassandra.cluster import ExecutionProfile, EXEC_PROFILE_DEFAULT
-from cassandra.policies import RetryPolicy, WhiteListRoundRobinPolicy
-from ccmlib.cluster import Cluster
-from ccmlib.cluster_factory import ClusterFactory
+from cassandra.cluster import ExecutionProfile
+from cassandra.policies import RetryPolicy, RoundRobinPolicy
 from ccmlib.common import get_version_from_build, is_win
+from ccmlib.node import ToolError, TimeoutError
 from distutils.version import LooseVersion
-from nose.exc import SkipTest
-from nose.tools import assert_greater_equal
-from six import print_
+from tools.misc import retry_till_success
 
-from plugins.dtestconfig import _CONFIG as CONFIG
-# We don't want test files to know about the plugins module, so we import
-# constants here and re-export them.
-from plugins.dtestconfig import GlobalConfigObject
-from tools.context import log_filter
-from tools.funcutils import merge_dicts
 
 LOG_SAVED_DIR = "logs"
 try:
@@ -57,52 +37,14 @@ LAST_LOG = os.path.join(LOG_SAVED_DIR, "last")
 LAST_TEST_DIR = 'last_test_dir'
 
 DEFAULT_DIR = './'
-config = ConfigParser.RawConfigParser()
+config = configparser.RawConfigParser()
 if len(config.read(os.path.expanduser('~/.cassandra-dtest'))) > 0:
     if config.has_option('main', 'default_dir'):
         DEFAULT_DIR = os.path.expanduser(config.get('main', 'default_dir'))
-CASSANDRA_DIR = os.environ.get('CASSANDRA_DIR', DEFAULT_DIR)
-
-NO_SKIP = os.environ.get('SKIP', '').lower() in ('no', 'false')
-DEBUG = os.environ.get('DEBUG', '').lower() in ('yes', 'true')
-TRACE = os.environ.get('TRACE', '').lower() in ('yes', 'true')
-KEEP_LOGS = os.environ.get('KEEP_LOGS', '').lower() in ('yes', 'true')
-KEEP_TEST_DIR = os.environ.get('KEEP_TEST_DIR', '').lower() in ('yes', 'true')
-PRINT_DEBUG = os.environ.get('PRINT_DEBUG', '').lower() in ('yes', 'true')
-OFFHEAP_MEMTABLES = os.environ.get('OFFHEAP_MEMTABLES', '').lower() in ('yes', 
'true')
-NUM_TOKENS = os.environ.get('NUM_TOKENS', '256')
-RECORD_COVERAGE = os.environ.get('RECORD_COVERAGE', '').lower() in ('yes', 
'true')
-IGNORE_REQUIRE = os.environ.get('IGNORE_REQUIRE', '').lower() in ('yes', 
'true')
-DATADIR_COUNT = os.environ.get('DATADIR_COUNT', '3')
-ENABLE_ACTIVE_LOG_WATCHING = os.environ.get('ENABLE_ACTIVE_LOG_WATCHING', 
'').lower() in ('yes', 'true')
-RUN_STATIC_UPGRADE_MATRIX = os.environ.get('RUN_STATIC_UPGRADE_MATRIX', 
'').lower() in ('yes', 'true')
-
-# devault values for configuration from configuration plugin
-_default_config = GlobalConfigObject(
-    vnodes=True,
-)
-
-if CONFIG is None:
-    CONFIG = _default_config
 
-DISABLE_VNODES = not CONFIG.vnodes
-
-
-if os.environ.get('DISABLE_VNODES', '').lower() in ('yes', 'true'):
-    print 'DISABLE_VNODES environment variable deprecated. Use 
`./run_dtests.py --vnodes false` instead.'
-
-
-CURRENT_TEST = ""
-
-logging.basicConfig(filename=os.path.join(LOG_SAVED_DIR, "dtest.log"),
-                    filemode='w',
-                    format='%(asctime)s,%(msecs)d %(name)s %(levelname)s 
%(message)s',
-                    datefmt='%H:%M:%S',
-                    level=logging.DEBUG)
+RUN_STATIC_UPGRADE_MATRIX = os.environ.get('RUN_STATIC_UPGRADE_MATRIX', 
'').lower() in ('yes', 'true')
 
-LOG = logging.getLogger('dtest')
-# set python-driver log level to INFO by default for dtest
-logging.getLogger('cassandra').setLevel(logging.INFO)
+logger = logging.getLogger(__name__)
 
 
 def get_sha(repo_dir):
@@ -111,11 +53,13 @@ def get_sha(repo_dir):
         prefix = 'github:apache/'
         local_repo_location = os.environ.get('LOCAL_GIT_REPO')
         if local_repo_location is not None:
-            prefix = 'local:{}:'.format(local_repo_location)  # local: slugs 
take the form 'local:/some/path/to/cassandra/:branch_name_or_sha'
+            prefix = 'local:{}:'.format(local_repo_location)
+            # local: slugs take the form 
'local:/some/path/to/cassandra/:branch_name_or_sha'
         return "{}{}".format(prefix, output)
     except CalledProcessError as e:
-        if re.search('Not a git repository', e.message) is not None:
-            # we tried to get a sha, but repo_dir isn't a git repo. No big 
deal, must just be working from a non-git install.
+        if re.search(str(e), 'Not a git repository') is not None:
+            # we tried to get a sha, but repo_dir isn't a git repo. No big 
deal, must just be
+            # working from a non-git install.
             return None
         else:
             # git call failed for some unknown reason
@@ -137,33 +81,12 @@ if _cassandra_version_slug:
     CASSANDRA_VERSION_FROM_BUILD = get_version_from_build(ccm_repo_cache_dir)
     CASSANDRA_GITREF = get_sha(ccm_repo_cache_dir)  # will be set None when 
not a git repo
 else:
-    CASSANDRA_VERSION_FROM_BUILD = get_version_from_build(CASSANDRA_DIR)
-    CASSANDRA_GITREF = get_sha(CASSANDRA_DIR)
-
+    CASSANDRA_VERSION_FROM_BUILD = LooseVersion("4.0") # todo kjkjkj
+    CASSANDRA_GITREF = ""
+    #CASSANDRA_VERSION_FROM_BUILD = 
get_version_from_build(self.dtest_config.cassandra_dir)
+    #CASSANDRA_GITREF = get_sha(dtest_config.cassandra_dir)
 
-# Determine the location of the libjemalloc jar so that we can specify it
-# through environment variables when start Cassandra.  This reduces startup
-# time, making the dtests run faster.
-def find_libjemalloc():
-    if is_win():
-        # let the normal bat script handle finding libjemalloc
-        return ""
 
-    this_dir = os.path.dirname(os.path.realpath(__file__))
-    script = os.path.join(this_dir, "findlibjemalloc.sh")
-    try:
-        p = subprocess.Popen([script], stdout=subprocess.PIPE, 
stderr=subprocess.PIPE)
-        stdout, stderr = p.communicate()
-        if stderr or not stdout:
-            return "-"  # tells C* not to look for libjemalloc
-        else:
-            return stdout
-    except Exception as exc:
-        print "Failed to run script to prelocate libjemalloc ({}): 
{}".format(script, exc)
-        return ""
-
-
-CASSANDRA_LIBJEMALLOC = find_libjemalloc()
 # copy the initial environment variables so we can reset them later:
 initial_environment = copy.deepcopy(os.environ)
 
@@ -172,40 +95,7 @@ class DtestTimeoutError(Exception):
     pass
 
 
-def reset_environment_vars():
-    os.environ.clear()
-    os.environ.update(initial_environment)
-
-
-def warning(msg):
-    LOG.warning("{} - {}".format(CURRENT_TEST, msg))
-    if PRINT_DEBUG:
-        print "WARN: " + msg
-
-
-def debug(msg):
-    LOG.debug("{} - {}".format(CURRENT_TEST, msg))
-    if PRINT_DEBUG:
-        print msg
-
-
-debug("Python driver version in use: {}".format(cassandra.__version__))
-
-
-def retry_till_success(fun, *args, **kwargs):
-    timeout = kwargs.pop('timeout', 60)
-    bypassed_exception = kwargs.pop('bypassed_exception', Exception)
-
-    deadline = time.time() + timeout
-    while True:
-        try:
-            return fun(*args, **kwargs)
-        except bypassed_exception:
-            if time.time() > deadline:
-                raise
-            else:
-                # brief pause before next attempt
-                time.sleep(0.25)
+logger.debug("Python driver version in use: {}".format(cassandra.__version__))
 
 
 class FlakyRetryPolicy(RetryPolicy):
@@ -219,21 +109,21 @@ class FlakyRetryPolicy(RetryPolicy):
 
     def on_read_timeout(self, *args, **kwargs):
         if kwargs['retry_num'] < self.max_retries:
-            debug("Retrying read after timeout. Attempt #" + 
str(kwargs['retry_num']))
+            logger.debug("Retrying read after timeout. Attempt #" + 
str(kwargs['retry_num']))
             return (self.RETRY, None)
         else:
             return (self.RETHROW, None)
 
     def on_write_timeout(self, *args, **kwargs):
         if kwargs['retry_num'] < self.max_retries:
-            debug("Retrying write after timeout. Attempt #" + 
str(kwargs['retry_num']))
+            logger.debug("Retrying write after timeout. Attempt #" + 
str(kwargs['retry_num']))
             return (self.RETRY, None)
         else:
             return (self.RETHROW, None)
 
     def on_unavailable(self, *args, **kwargs):
         if kwargs['retry_num'] < self.max_retries:
-            debug("Retrying request after UE. Attempt #" + 
str(kwargs['retry_num']))
+            logger.debug("Retrying request after UE. Attempt #" + 
str(kwargs['retry_num']))
             return (self.RETRY, None)
         else:
             return (self.RETHROW, None)
@@ -261,8 +151,18 @@ class Runner(threading.Thread):
             i = i + 1
 
     def stop(self):
+        if self.__stopped:
+            return
+
         self.__stopped = True
-        self.join()
+        # pytests may appear to hang forever waiting for cluster tear down. 
are all driver session objects shutdown?
+        # to debug hang you can add the following at the top of the test
+        #     import faulthandler
+        #     faulthandler.enable()
+        #
+        # and then when the hang occurs send a SIGABRT to the pytest process 
(e.g. kill -SIGABRT <pytest_pid>)
+        # this will print a python thread dump of all currently alive threads
+        self.join(timeout=30)
         if self.__error is not None:
             raise self.__error
 
@@ -272,344 +172,103 @@ class Runner(threading.Thread):
 
 
 def make_execution_profile(retry_policy=FlakyRetryPolicy(), 
consistency_level=ConsistencyLevel.ONE, **kwargs):
-    return ExecutionProfile(retry_policy=retry_policy,
-                            consistency_level=consistency_level,
-                            **kwargs)
-
-
-class Tester(TestCase):
-
-    maxDiff = None
-    allow_log_errors = False  # scan the log of each node for errors after 
every test.
-    cluster_options = None
-
-    def set_node_to_current_version(self, node):
-        version = os.environ.get('CASSANDRA_VERSION')
-        cdir = CASSANDRA_DIR
-
-        if version:
-            node.set_install_dir(version=version)
-        else:
-            node.set_install_dir(install_dir=cdir)
-
-    def init_config(self):
-        init_default_config(self.cluster, self.cluster_options)
-
-    def setUp(self):
-        self.set_current_tst_name()
-        kill_windows_cassandra_procs()
-        maybe_cleanup_cluster_from_last_test_file()
-
-        self.test_path = get_test_path()
-        self.cluster = create_ccm_cluster(self.test_path, name='test')
-
-        self.maybe_begin_active_log_watch()
-        maybe_setup_jacoco(self.test_path)
-
-        self.init_config()
-        write_last_test_file(self.test_path, self.cluster)
-
-        set_log_levels(self.cluster)
-        self.connections = []
-        self.runners = []
-
-    # this is intentionally spelled 'tst' instead of 'test' to avoid
-    # making unittest think it's a test method
-    def set_current_tst_name(self):
-        global CURRENT_TEST
-        CURRENT_TEST = self.id()
-
-    def maybe_begin_active_log_watch(self):
-        if ENABLE_ACTIVE_LOG_WATCHING:
-            if not self.allow_log_errors:
-                self.begin_active_log_watch()
-
-    def begin_active_log_watch(self):
-        """
-        Calls into ccm to start actively watching logs.
-
-        In the event that errors are seen in logs, ccm will call back to 
_log_error_handler.
-
-        When the cluster is no longer in use, stop_active_log_watch should be 
called to end log watching.
-        (otherwise a 'daemon' thread will (needlessly) run until the process 
exits).
-        """
-        # log watching happens in another thread, but we want it to halt the 
main
-        # thread's execution, which we have to do by registering a signal 
handler
-        signal.signal(signal.SIGINT, self._catch_interrupt)
-        self._log_watch_thread = 
self.cluster.actively_watch_logs_for_error(self._log_error_handler, 
interval=0.25)
-
-    def _log_error_handler(self, errordata):
-        """
-        Callback handler used in conjunction with begin_active_log_watch.
-        When called, prepares exception instance, then will indirectly
-        cause _catch_interrupt to be called, which can raise the exception in 
the main
-        program thread.
-
-        @param errordata is a dictonary mapping node name to failure list.
-        """
-        # in some cases self.allow_log_errors may get set after proactive log 
checking has been enabled
-        # so we need to double-check first thing before proceeding
-        if self.allow_log_errors:
-            return
-
-        reportable_errordata = OrderedDict()
-
-        for nodename, errors in errordata.items():
-            filtered_errors = list(self.__filter_errors(['\n'.join(msg) for 
msg in errors]))
-            if len(filtered_errors) is not 0:
-                reportable_errordata[nodename] = filtered_errors
+    if 'load_balancing_policy' in kwargs:
+        return ExecutionProfile(retry_policy=retry_policy,
+                                consistency_level=consistency_level,
+                                **kwargs)
+    else:
+        return ExecutionProfile(retry_policy=retry_policy,
+                                consistency_level=consistency_level,
+                                load_balancing_policy=RoundRobinPolicy(),
+                                **kwargs)
 
-        # no errors worthy of halting the test
-        if not reportable_errordata:
-            return
 
-        message = "Errors seen in logs for: {nodes}".format(nodes=", 
".join(reportable_errordata.keys()))
-        for nodename, errors in reportable_errordata.items():
-            for error in errors:
-                message += "\n{nodename}: {error}".format(nodename=nodename, 
error=error)
+def running_in_docker():
+    return os.path.isfile('/.dockerenv')
 
-        try:
-            debug('Errors were just seen in logs, ending test (if not ending 
already)!')
-            print_("Error details: \n{message}".format(message=message))
-            self.test_is_ending  # will raise AttributeError if not present
-        except AttributeError:
-            self.test_is_ending = True
-            self.exit_with_exception = AssertionError("Log error encountered 
during active log scanning, see stdout")
-            # thread.interrupt_main will SIGINT in the main thread, which we 
can
-            # catch to raise an exception with useful information
-            thread.interrupt_main()
 
+def cleanup_docker_environment_before_test_execution():
+    """
+    perform a bunch of system cleanup operations, like kill any instances that 
might be
+    hanging around incorrectly from a previous run, sync the disk, and clear 
swap.
+    Ideally we would also drop the page cache, but as docker isn't running in 
privileged
+    mode there is no way for us to do this.
     """
-    Finds files matching the glob pattern specified as argument on
-    the given keyspace in all nodes
+    # attempt to wack all existing running Cassandra processes forcefully to 
get us into a clean state
+    p_kill = subprocess.Popen('ps aux | grep -ie CassandraDaemon | grep java | 
awk \'{print $2}\' | xargs kill -9',
+                              shell=True)
+    p_kill.wait(timeout=10)
+
+    # explicitly call "sync" to flush everything that might be pending from a 
previous test
+    # so tests are less likely to hit a very slow fsync during the test by 
starting from a 'known' state
+    # note: to mitigate this further the docker image is mounting /tmp as a 
volume, which gives
+    # us an ext4 mount which should talk directly to the underlying device on 
the host, skipping
+    # the aufs pain that we get with anything else running in the docker 
image. Originally,
+    # I had a timeout of 120 seconds (2 minutes), 300 seconds (5 minutes) but 
sync was still occasionally timing out.
+    p_sync = subprocess.Popen('sudo /bin/sync', shell=True)
+    p_sync.wait(timeout=600)
+
+    # turn swap off and back on to make sure it's fully cleared if anything 
happened to swap
+    # from a previous test run
+    p_swap = subprocess.Popen('sudo /sbin/swapoff -a && sudo /sbin/swapon -a', 
shell=True)
+    p_swap.wait(timeout=60)
+
+
+def test_failure_due_to_timeout(err, *args):
     """
+    check if we should rerun a test with the flaky plugin or not.
+    for now, only run if we failed the test for one of the following
+    three exceptions: cassandra.OperationTimedOut, ccm.node.ToolError,
+    and ccm.node.TimeoutError.
+
+    - cassandra.OperationTimedOut will be thrown when a cql query made thru
+    the python-driver times out.
+    - ccm.node.ToolError will be thrown when an invocation of a "tool"
+    (in the case of dtests this will almost always invoking stress).
+    - ccm.node.TimeoutError will be thrown when a blocking ccm operation
+    on a individual node times out. In most cases this tends to be something
+    like watch_log_for hitting the timeout before the desired pattern is seen
+    in the node's logs.
+
+    if we failed for one of these reasons - and we're running in docker - run
+    the same "cleanup" logic we run before test execution and test setup begins
+    and for good measure introduce a 2 second sleep. why 2 seconds? because 
it's
+    magic :) - ideally this gets the environment back into a good state and 
makes
+    the rerun of flaky tests likely to suceed if they failed in the first place
+    due to environmental issues.
+    """
+    if issubclass(err[0], OperationTimedOut) or issubclass(err[0], ToolError) 
or issubclass(err[0], TimeoutError):
+        if running_in_docker():
+            cleanup_docker_environment_before_test_execution()
+            time.sleep(2)
+        return True
+    else:
+        return False
 
-    def glob_data_dirs(self, path, ks="ks"):
-        result = []
-        for node in self.cluster.nodelist():
-            for data_dir in node.data_directories():
-                ks_dir = os.path.join(data_dir, ks, path)
-                result.extend(glob.glob(ks_dir))
-        return result
-
-    def _catch_interrupt(self, signal, frame):
-        """
-        Signal handler for registering on SIGINT.
-
-        If called will look for a stored exception and raise it to abort test.
-        If a stored exception is not present, this handler has likely caught a
-        user interrupt via CTRL-C, and will raise a KeyboardInterrupt.
-        """
-        try:
-            # check if we have a persisted exception to fail with
-            raise self.exit_with_exception
-        except AttributeError:
-            # looks like this was just a plain CTRL-C event
-            raise KeyboardInterrupt()
-
-    def copy_logs(self, cluster, directory=None, name=None):
-        """Copy the current cluster's log files somewhere, by default to 
LOG_SAVED_DIR with a name of 'last'"""
-        if directory is None:
-            directory = LOG_SAVED_DIR
-        if name is None:
-            name = LAST_LOG
-        else:
-            name = os.path.join(directory, name)
-        if not os.path.exists(directory):
-            os.mkdir(directory)
-        logs = [(node.name, node.logfilename(), node.debuglogfilename(), 
node.gclogfilename(), node.compactionlogfilename())
-                for node in self.cluster.nodes.values()]
-        if len(logs) is not 0:
-            basedir = str(int(time.time() * 1000)) + '_' + self.id()
-            logdir = os.path.join(directory, basedir)
-            os.mkdir(logdir)
-            for n, log, debuglog, gclog, compactionlog in logs:
-                if os.path.exists(log):
-                    self.assertGreaterEqual(os.path.getsize(log), 0)
-                    shutil.copyfile(log, os.path.join(logdir, n + ".log"))
-                if os.path.exists(debuglog):
-                    self.assertGreaterEqual(os.path.getsize(debuglog), 0)
-                    shutil.copyfile(debuglog, os.path.join(logdir, n + 
"_debug.log"))
-                if os.path.exists(gclog):
-                    self.assertGreaterEqual(os.path.getsize(gclog), 0)
-                    shutil.copyfile(gclog, os.path.join(logdir, n + "_gc.log"))
-                if os.path.exists(compactionlog):
-                    self.assertGreaterEqual(os.path.getsize(compactionlog), 0)
-                    shutil.copyfile(compactionlog, os.path.join(logdir, n + 
"_compaction.log"))
-            if os.path.exists(name):
-                os.unlink(name)
-            if not is_win():
-                os.symlink(basedir, name)
-
-    def cql_connection(self, node, keyspace=None, user=None,
-                       password=None, compression=True, protocol_version=None, 
port=None, ssl_opts=None, **kwargs):
-
-        return self._create_session(node, keyspace, user, password, 
compression,
-                                    protocol_version, port=port, 
ssl_opts=ssl_opts, **kwargs)
-
-    def exclusive_cql_connection(self, node, keyspace=None, user=None,
-                                 password=None, compression=True, 
protocol_version=None, port=None, ssl_opts=None, **kwargs):
-
-        node_ip = get_ip_from_node(node)
-        wlrr = WhiteListRoundRobinPolicy([node_ip])
-
-        return self._create_session(node, keyspace, user, password, 
compression,
-                                    protocol_version, port=port, 
ssl_opts=ssl_opts, load_balancing_policy=wlrr, **kwargs)
-
-    def _create_session(self, node, keyspace, user, password, compression, 
protocol_version,
-                        port=None, ssl_opts=None, execution_profiles=None, 
**kwargs):
-        node_ip = get_ip_from_node(node)
-        if not port:
-            port = get_port_from_node(node)
-
-        if protocol_version is None:
-            protocol_version = 
get_eager_protocol_version(node.cluster.version())
-
-        if user is not None:
-            auth_provider = get_auth_provider(user=user, password=password)
-        else:
-            auth_provider = None
-
-        profiles = {EXEC_PROFILE_DEFAULT: make_execution_profile(**kwargs)
-                    } if not execution_profiles else execution_profiles
-
-        cluster = PyCluster([node_ip],
-                            auth_provider=auth_provider,
-                            compression=compression,
-                            protocol_version=protocol_version,
-                            port=port,
-                            ssl_options=ssl_opts,
-                            connect_timeout=10,
-                            allow_beta_protocol_version=True,
-                            execution_profiles=profiles)
-        session = cluster.connect(wait_for_all_pools=True)
-
-        if keyspace is not None:
-            session.set_keyspace(keyspace)
-
-        self.connections.append(session)
-        return session
-
-    def patient_cql_connection(self, node, keyspace=None,
-                               user=None, password=None, timeout=30, 
compression=True,
-                               protocol_version=None, port=None, 
ssl_opts=None, **kwargs):
-        """
-        Returns a connection after it stops throwing NoHostAvailables due to 
not being ready.
-
-        If the timeout is exceeded, the exception is raised.
-        """
-        if is_win():
-            timeout *= 2
-
-        expected_log_lines = ('Control connection failed to connect, shutting 
down Cluster:', '[control connection] Error connecting to ')
-        with log_filter('cassandra.cluster', expected_log_lines):
-            session = retry_till_success(
-                self.cql_connection,
-                node,
-                keyspace=keyspace,
-                user=user,
-                password=password,
-                timeout=timeout,
-                compression=compression,
-                protocol_version=protocol_version,
-                port=port,
-                ssl_opts=ssl_opts,
-                bypassed_exception=NoHostAvailable,
-                **kwargs
-            )
-
-        return session
-
-    def patient_exclusive_cql_connection(self, node, keyspace=None,
-                                         user=None, password=None, timeout=30, 
compression=True,
-                                         protocol_version=None, port=None, 
ssl_opts=None, **kwargs):
-        """
-        Returns a connection after it stops throwing NoHostAvailables due to 
not being ready.
-
-        If the timeout is exceeded, the exception is raised.
-        """
-        if is_win():
-            timeout *= 2
-
-        return retry_till_success(
-            self.exclusive_cql_connection,
-            node,
-            keyspace=keyspace,
-            user=user,
-            password=password,
-            timeout=timeout,
-            compression=compression,
-            protocol_version=protocol_version,
-            port=port,
-            ssl_opts=ssl_opts,
-            bypassed_exception=NoHostAvailable,
-            **kwargs
-        )
-
-    @classmethod
-    def tearDownClass(cls):
-        reset_environment_vars()
-        if os.path.exists(LAST_TEST_DIR):
-            with open(LAST_TEST_DIR) as f:
-                test_path = f.readline().strip('\n')
-                name = f.readline()
-                try:
-                    cluster = ClusterFactory.load(test_path, name)
-                    # Avoid waiting too long for node to be marked down
-                    if KEEP_TEST_DIR:
-                        cluster.stop(gently=RECORD_COVERAGE)
-                    else:
-                        cluster.remove()
-                        os.rmdir(test_path)
-                except IOError:
-                    # after a restart, /tmp will be emptied so we'll get an 
IOError when loading the old cluster here
-                    pass
-            try:
-                os.remove(LAST_TEST_DIR)
-            except IOError:
-                # Ignore - see comment above
-                pass
 
-    def tearDown(self):
-        # test_is_ending prevents active log watching from being able to 
interrupt the test
-        # which we don't want to happen once tearDown begins
-        self.test_is_ending = True
+@flaky(rerun_filter=test_failure_due_to_timeout)
+class Tester:
 
-        reset_environment_vars()
+    def __getattribute__(self, name):
+        try:
+            return object.__getattribute__(self, name)
+        except AttributeError:
+            fixture_dtest_setup = object.__getattribute__(self, 
'fixture_dtest_setup')
+            return object.__getattribute__(fixture_dtest_setup , name)
 
-        for con in self.connections:
-            con.cluster.shutdown()
+    @pytest.fixture(scope='function', autouse=True)
+    def set_dtest_setup_on_function(self, fixture_dtest_setup, 
fixture_dtest_config):
+        self.fixture_dtest_setup = fixture_dtest_setup
+        self.dtest_config = fixture_dtest_config
 
-        for runner in self.runners:
-            try:
-                runner.stop()
-            except Exception:
-                pass
+    def set_node_to_current_version(self, node):
+        version = os.environ.get('CASSANDRA_VERSION')
 
-        failed = did_fail()
-        try:
-            if not self.allow_log_errors and self.check_logs_for_errors():
-                failed = True
-                raise AssertionError('Unexpected error in log, see stdout')
-        finally:
-            try:
-                # save the logs for inspection
-                if failed or KEEP_LOGS:
-                    self.copy_logs(self.cluster)
-            except Exception as e:
-                print "Error saving log:", str(e)
-            finally:
-                log_watch_thread = getattr(self, '_log_watch_thread', None)
-                cleanup_cluster(self.cluster, self.test_path, log_watch_thread)
-
-    def check_logs_for_errors(self):
-        for node in self.cluster.nodelist():
-            errors = list(self.__filter_errors(
-                ['\n'.join(msg) for msg in node.grep_log_for_errors()]))
-            if len(errors) is not 0:
-                for error in errors:
-                    print_("Unexpected error in {node_name} log, error: 
\n{error}".format(node_name=node.name, error=error))
-                return True
+        if version:
+            node.set_install_dir(version=version)
+        else:
+            node.set_install_dir(install_dir=self.dtest_config.cassandra_dir)
+            os.environ.set('CASSANDRA_DIR', self.dtest_config.cassandra_dir)
 
     def go(self, func):
         runner = Runner(func)
@@ -617,57 +276,6 @@ class Tester(TestCase):
         runner.start()
         return runner
 
-    def skip(self, msg):
-        if not NO_SKIP:
-            raise SkipTest(msg)
-
-    def __filter_errors(self, errors):
-        """Filter errors, removing those that match self.ignore_log_patterns"""
-        if not hasattr(self, 'ignore_log_patterns'):
-            self.ignore_log_patterns = []
-        for e in errors:
-            for pattern in self.ignore_log_patterns:
-                if re.search(pattern, e):
-                    break
-            else:
-                yield e
-
-    # Disable docstrings printing in nosetest output
-    def shortDescription(self):
-        return None
-
-    def get_jfr_jvm_args(self):
-        """
-        @return The JVM arguments required for attaching flight recorder to a 
Java process.
-        """
-        return ["-XX:+UnlockCommercialFeatures", "-XX:+FlightRecorder"]
-
-    def start_jfr_recording(self, nodes):
-        """
-        Start Java flight recorder provided the cluster was started with the 
correct jvm arguments.
-        """
-        for node in nodes:
-            p = subprocess.Popen(['jcmd', str(node.pid), 'JFR.start'],
-                                 stdout=subprocess.PIPE,
-                                 stderr=subprocess.PIPE)
-            stdout, stderr = p.communicate()
-            debug(stdout)
-            debug(stderr)
-
-    def dump_jfr_recording(self, nodes):
-        """
-        Save Java flight recorder results to file for analyzing with mission 
control.
-        """
-        for node in nodes:
-            p = subprocess.Popen(['jcmd', str(node.pid), 'JFR.dump',
-                                  'recording=1', 
'filename=recording_{}.jfr'.format(node.address())],
-                                 stdout=subprocess.PIPE,
-                                 stderr=subprocess.PIPE)
-            stdout, stderr = p.communicate()
-            debug(stdout)
-            debug(stderr)
-
-
 def get_eager_protocol_version(cassandra_version):
     """
     Returns the highest protocol version accepted
@@ -690,7 +298,7 @@ def create_cf(session, name, key_type="varchar", 
speculative_retry=None, read_re
 
     additional_columns = ""
     if columns is not None:
-        for k, v in columns.items():
+        for k, v in list(columns.items()):
             additional_columns = "{}, {} {}".format(additional_columns, k, v)
 
     if additional_columns == "":
@@ -714,20 +322,42 @@ def create_cf(session, name, key_type="varchar", 
speculative_retry=None, read_re
     if compact_storage:
         query += ' AND COMPACT STORAGE'
 
-    session.execute(query)
-    time.sleep(0.2)
-
+    try:
+        retry_till_success(session.execute, query=query, timeout=120, 
bypassed_exception=cassandra.OperationTimedOut)
+    except cassandra.AlreadyExists:
+        logger.warn('AlreadyExists executing create cf query \'%s\'' % query)
+    session.cluster.control_connection.wait_for_schema_agreement(wait_time=120)
+    #Going to ignore OperationTimedOut from create CF, so need to validate it 
was indeed created
+    session.execute('SELECT * FROM %s LIMIT 1' % name);
+
+def create_cf_simple(session, name, query):
+    try:
+        retry_till_success(session.execute, query=query, timeout=120, 
bypassed_exception=cassandra.OperationTimedOut)
+    except cassandra.AlreadyExists:
+        logger.warn('AlreadyExists executing create cf query \'%s\'' % query)
+    session.cluster.control_connection.wait_for_schema_agreement(wait_time=120)
+    #Going to ignore OperationTimedOut from create CF, so need to validate it 
was indeed created
+    session.execute('SELECT * FROM %s LIMIT 1' % name)
 
 def create_ks(session, name, rf):
     query = 'CREATE KEYSPACE %s WITH replication={%s}'
-    if isinstance(rf, types.IntType):
+    if isinstance(rf, int):
         # we assume simpleStrategy
-        session.execute(query % (name, "'class':'SimpleStrategy', 
'replication_factor':%d" % rf))
+        query = query % (name, "'class':'SimpleStrategy', 
'replication_factor':%d" % rf)
     else:
-        assert_greater_equal(len(rf), 0, "At least one datacenter/rf pair is 
needed")
+        assert len(rf) >= 0, "At least one datacenter/rf pair is needed"
         # we assume networkTopologyStrategy
-        options = (', ').join(['\'%s\':%d' % (d, r) for d, r in 
rf.iteritems()])
-        session.execute(query % (name, "'class':'NetworkTopologyStrategy', %s" 
% options))
+        options = (', ').join(['\'%s\':%d' % (d, r) for d, r in rf.items()])
+        query = query % (name, "'class':'NetworkTopologyStrategy', %s" % 
options)
+
+    try:
+        retry_till_success(session.execute, query=query, timeout=120, 
bypassed_exception=cassandra.OperationTimedOut)
+    except cassandra.AlreadyExists:
+        logger.warn('AlreadyExists executing create ks query \'%s\'' % query)
+
+    session.cluster.control_connection.wait_for_schema_agreement(wait_time=120)
+    #Also validates it was indeed created even though we ignored 
OperationTimedOut
+    #Might happen some of the time because CircleCI disk IO is unreliable and 
hangs randomly
     session.execute('USE {}'.format(name))
 
 
@@ -774,301 +404,13 @@ def kill_windows_cassandra_procs():
                     pass
                 else:
                     if (pinfo['name'] == 'java.exe' and '-Dcassandra' in 
pinfo['cmdline']):
-                        print 'Found running cassandra process with pid: ' + 
str(pinfo['pid']) + '. Killing.'
+                        print('Found running cassandra process with pid: ' + 
str(pinfo['pid']) + '. Killing.')
                         psutil.Process(pinfo['pid']).kill()
         except ImportError:
-            debug("WARN: psutil not installed. Cannot detect and kill "
+            logger.debug("WARN: psutil not installed. Cannot detect and kill "
                   "running cassandra processes - you may see cascading dtest 
failures.")
 
 
-def get_test_path():
-    test_path = tempfile.mkdtemp(prefix='dtest-')
-
-    # ccm on cygwin needs absolute path to directory - it crosses from cygwin 
space into
-    # regular Windows space on wmic calls which will otherwise break pathing
-    if sys.platform == "cygwin":
-        process = subprocess.Popen(["cygpath", "-m", test_path], 
stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
-        test_path = process.communicate()[0].rstrip()
-
-    return test_path
-
-
-# nose will discover this as a test, so we manually make it not a test
-get_test_path.__test__ = False
-
-
-def create_ccm_cluster(test_path, name):
-    debug("cluster ccm directory: " + test_path)
-    version = os.environ.get('CASSANDRA_VERSION')
-    cdir = CASSANDRA_DIR
-
-    if version:
-        cluster = Cluster(test_path, name, cassandra_version=version)
-    else:
-        cluster = Cluster(test_path, name, cassandra_dir=cdir)
-
-    if DISABLE_VNODES:
-        cluster.set_configuration_options(values={'num_tokens': None})
-    else:
-        cluster.set_configuration_options(values={'initial_token': None, 
'num_tokens': NUM_TOKENS})
-
-    if OFFHEAP_MEMTABLES:
-        cluster.set_configuration_options(values={'memtable_allocation_type': 
'offheap_objects'})
-
-    cluster.set_datadir_count(DATADIR_COUNT)
-    cluster.set_environment_variable('CASSANDRA_LIBJEMALLOC', 
CASSANDRA_LIBJEMALLOC)
-
-    return cluster
-
-
-def cleanup_cluster(cluster, test_path, log_watch_thread=None):
-    with log_filter('cassandra'):  # quiet noise from driver when nodes start 
going down
-        if KEEP_TEST_DIR:
-            cluster.stop(gently=RECORD_COVERAGE)
-        else:
-            # when recording coverage the jvm has to exit normally
-            # or the coverage information is not written by the jacoco agent
-            # otherwise we can just kill the process
-            if RECORD_COVERAGE:
-                cluster.stop(gently=True)
-
-            # Cleanup everything:
-            try:
-                if log_watch_thread:
-                    stop_active_log_watch(log_watch_thread)
-            finally:
-                debug("removing ccm cluster {name} at: 
{path}".format(name=cluster.name, path=test_path))
-                cluster.remove()
-
-                debug("clearing ssl stores from [{0}] 
directory".format(test_path))
-                for filename in ('keystore.jks', 'truststore.jks', 
'ccm_node.cer'):
-                    try:
-                        os.remove(os.path.join(test_path, filename))
-                    except OSError as e:
-                        # once we port to py3, which has better reporting for 
exceptions raised while
-                        # handling other excpetions, we should just assert 
e.errno == errno.ENOENT
-                        if e.errno != errno.ENOENT:  # ENOENT = no such file 
or directory
-                            raise
-
-                os.rmdir(test_path)
-                cleanup_last_test_dir()
-
-
-def cleanup_last_test_dir():
-    if os.path.exists(LAST_TEST_DIR):
-        os.remove(LAST_TEST_DIR)
-
-
-def stop_active_log_watch(log_watch_thread):
-    """
-    Joins the log watching thread, which will then exit.
-    Should be called after each test, ideally after nodes are stopped but 
before cluster files are removed.
-
-    Can be called multiple times without error.
-    If not called, log watching thread will remain running until the parent 
process exits.
-    """
-    log_watch_thread.join(timeout=60)
-
-
-def maybe_cleanup_cluster_from_last_test_file():
-    # cleaning up if a previous execution didn't trigger tearDown (which
-    # can happen if it is interrupted by KeyboardInterrupt)
-    if os.path.exists(LAST_TEST_DIR):
-        with open(LAST_TEST_DIR) as f:
-            test_path = f.readline().strip('\n')
-            name = f.readline()
-        try:
-            cluster = ClusterFactory.load(test_path, name)
-            # Avoid waiting too long for node to be marked down
-            cleanup_cluster(cluster, test_path)
-        except IOError:
-            # after a restart, /tmp will be emptied so we'll get an IOError 
when loading the old cluster here
-            pass
-
-
-def init_default_config(cluster, cluster_options):
-    # the failure detector can be quite slow in such tests with quick 
start/stop
-    phi_values = {'phi_convict_threshold': 5}
-
-    timeout = 10000
-    if cluster_options is not None:
-        values = merge_dicts(cluster_options, phi_values)
-    else:
-        values = merge_dicts(phi_values, {
-            'read_request_timeout_in_ms': timeout,
-            'range_request_timeout_in_ms': timeout,
-            'write_request_timeout_in_ms': timeout,
-            'truncate_request_timeout_in_ms': timeout,
-            'request_timeout_in_ms': timeout
-        })
-
-    # No more thrift in 4.0, and start_rpc doesn't exists anymore
-    if cluster.version() >= '4' and 'start_rpc' in values:
-        del values['start_rpc']
-
-    cluster.set_configuration_options(values)
-    debug("Done setting configuration options:\n" + 
pprint.pformat(cluster._config_options, indent=4))
-
-
-def write_last_test_file(test_path, cluster):
-    with open(LAST_TEST_DIR, 'w') as f:
-        f.write(test_path + '\n')
-        f.write(cluster.name)
-
-
-def set_log_levels(cluster):
-    if DEBUG:
-        cluster.set_log_level("DEBUG")
-    if TRACE:
-        cluster.set_log_level("TRACE")
-
-    if os.environ.get('DEBUG', 'no').lower() not in ('no', 'false', 'yes', 
'true'):
-        classes_to_debug = os.environ.get('DEBUG').split(":")
-        cluster.set_log_level('DEBUG', None if len(classes_to_debug) == 0 else 
classes_to_debug)
-
-    if os.environ.get('TRACE', 'no').lower() not in ('no', 'false', 'yes', 
'true'):
-        classes_to_trace = os.environ.get('TRACE').split(":")
-        cluster.set_log_level('TRACE', None if len(classes_to_trace) == 0 else 
classes_to_trace)
-
-
-def maybe_setup_jacoco(test_path, cluster_name='test'):
-    """Setup JaCoCo code coverage support"""
-
-    if not RECORD_COVERAGE:
-        return
-
-    # use explicit agent and execfile locations
-    # or look for a cassandra build if they are not specified
-    cdir = CASSANDRA_DIR
-
-    agent_location = os.environ.get('JACOCO_AGENT_JAR', os.path.join(cdir, 
'build/lib/jars/jacocoagent.jar'))
-    jacoco_execfile = os.environ.get('JACOCO_EXECFILE', os.path.join(cdir, 
'build/jacoco/jacoco.exec'))
-
-    if os.path.isfile(agent_location):
-        debug("Jacoco agent found at {}".format(agent_location))
-        with open(os.path.join(
-                test_path, cluster_name, 'cassandra.in.sh'), 'w') as f:
-
-            f.write('JVM_OPTS="$JVM_OPTS 
-javaagent:{jar_path}=destfile={exec_file}"'
-                    .format(jar_path=agent_location, 
exec_file=jacoco_execfile))
-
-            if os.path.isfile(jacoco_execfile):
-                debug("Jacoco execfile found at {}, execution data will be 
appended".format(jacoco_execfile))
-            else:
-                debug("Jacoco execfile will be created at 
{}".format(jacoco_execfile))
-    else:
-        debug("Jacoco agent not found or is not file. Execution will not be 
recorded.")
-
-
-def did_fail():
-    if sys.exc_info() == (None, None, None):
-        return False
-
-    exc_class, _, _ = sys.exc_info()
-    return not issubclass(exc_class, unittest.case.SkipTest)
-
-
-class ReusableClusterTester(Tester):
-    """
-    A Tester designed for reusing the same cluster across multiple
-    test methods.  This makes test suites with many small tests run
-    much, much faster.  However, there are a couple of downsides:
-
-    First, test setup and teardown must be diligent about cleaning
-    up any data or schema elements that may interfere with other
-    tests.
-
-    Second, errors triggered by one test method may cascade
-    into other test failures.  In an attempt to limit this, the
-    cluster will be restarted if a test fails or an exception is
-    caught.  However, there may still be undetected problems in
-    Cassandra that cause cascading failures.
-    """
-
-    test_path = None
-    cluster = None
-    cluster_options = None
-
-    @classmethod
-    def setUpClass(cls):
-        kill_windows_cassandra_procs()
-        maybe_cleanup_cluster_from_last_test_file()
-        cls.initialize_cluster()
-
-    def setUp(self):
-        self.set_current_tst_name()
-        self.connections = []
-
-        # TODO enable active log watching
-        # This needs to happen in setUp() and not setUpClass() so that 
individual
-        # test methods can set allow_log_errors and so that error handling
-        # only fails a single test method instead of the entire class.
-        # The problem with this is that ccm doesn't yet support stopping the
-        # active log watcher -- it runs until the cluster is destroyed.  Since
-        # we reuse the same cluster, this doesn't work for us.
-
-    def tearDown(self):
-        # test_is_ending prevents active log watching from being able to 
interrupt the test
-        self.test_is_ending = True
-
-        failed = did_fail()
-        try:
-            if not self.allow_log_errors and self.check_logs_for_errors():
-                failed = True
-                raise AssertionError('Unexpected error in log, see stdout')
-        finally:
-            try:
-                # save the logs for inspection
-                if failed or KEEP_LOGS:
-                    self.copy_logs(self.cluster)
-            except Exception as e:
-                print "Error saving log:", str(e)
-            finally:
-                reset_environment_vars()
-                if failed:
-                    cleanup_cluster(self.cluster, self.test_path)
-                    kill_windows_cassandra_procs()
-                    self.initialize_cluster()
-
-    @classmethod
-    def initialize_cluster(cls):
-        """
-        This method is responsible for initializing and configuring a ccm
-        cluster for the next set of tests.  This can be called for two
-        different reasons:
-         * A class of tests is starting
-         * A test method failed/errored, so the cluster has been wiped
-
-        Subclasses that require custom initialization should generally
-        do so by overriding post_initialize_cluster().
-        """
-        cls.test_path = get_test_path()
-        cls.cluster = create_ccm_cluster(cls.test_path, name='test')
-        cls.init_config()
-
-        maybe_setup_jacoco(cls.test_path)
-        cls.init_config()
-        write_last_test_file(cls.test_path, cls.cluster)
-        set_log_levels(cls.cluster)
-
-        cls.post_initialize_cluster()
-
-    @classmethod
-    def post_initialize_cluster(cls):
-        """
-        This method is called after the ccm cluster has been created
-        and default config options have been applied.  Any custom
-        initialization for a test class should generally be done
-        here in order to correctly handle cluster restarts after
-        test method failures.
-        """
-        pass
-
-    @classmethod
-    def init_config(cls):
-        init_default_config(cls.cluster, cls.cluster_options)
-
-
 class MultiError(Exception):
     """
     Extends Exception to provide reporting multiple exceptions at once.
@@ -1109,24 +451,20 @@ def run_scenarios(scenarios, handler, 
deferred_exceptions=tuple()):
     tracebacks = []
 
     for i, scenario in enumerate(scenarios, 1):
-        debug("running scenario {}/{}: {}".format(i, len(scenarios), scenario))
+        logger.debug("running scenario {}/{}: {}".format(i, len(scenarios), 
scenario))
 
         try:
             handler(scenario)
         except deferred_exceptions as e:
             tracebacks.append(traceback.format_exc(sys.exc_info()))
-            errors.append(type(e)('encountered {} {} running scenario:\n  
{}\n'.format(e.__class__.__name__, e.message, scenario)))
-            debug("scenario {}/{} encountered a deferrable exception, 
continuing".format(i, len(scenarios)))
+            errors.append(type(e)('encountered {} {} running scenario:\n  
{}\n'.format(e.__class__.__name__, str(e), scenario)))
+            logger.debug("scenario {}/{} encountered a deferrable exception, 
continuing".format(i, len(scenarios)))
         except Exception as e:
             # catch-all for any exceptions not intended to be deferred
             tracebacks.append(traceback.format_exc(sys.exc_info()))
-            errors.append(type(e)('encountered {} {} running scenario:\n  
{}\n'.format(e.__class__.__name__, e.message, scenario)))
-            debug("scenario {}/{} encountered a non-deferrable exception, 
aborting".format(i, len(scenarios)))
+            errors.append(type(e)('encountered {} {} running scenario:\n  
{}\n'.format(e.__class__.__name__, str(e), scenario)))
+            logger.debug("scenario {}/{} encountered a non-deferrable 
exception, aborting".format(i, len(scenarios)))
             raise MultiError(errors, tracebacks)
 
     if errors:
         raise MultiError(errors, tracebacks)
-
-
-def supports_v5_protocol(cluster_version):
-    return cluster_version >= LooseVersion('4.0')

http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/dtest_setup.py
----------------------------------------------------------------------
diff --git a/dtest_setup.py b/dtest_setup.py
new file mode 100644
index 0000000..87014f4
--- /dev/null
+++ b/dtest_setup.py
@@ -0,0 +1,498 @@
+import pytest
+import glob
+import os
+import shutil
+import time
+import logging
+import re
+import tempfile
+import subprocess
+import sys
+import errno
+import pprint
+from collections import OrderedDict
+
+from cassandra.cluster import Cluster as PyCluster
+from cassandra.cluster import NoHostAvailable
+from cassandra.cluster import EXEC_PROFILE_DEFAULT
+from cassandra.policies import WhiteListRoundRobinPolicy
+from ccmlib.common import get_version_from_build, is_win
+from ccmlib.cluster import Cluster
+
+from dtest import (get_ip_from_node, make_execution_profile, 
get_auth_provider, get_port_from_node,
+                   get_eager_protocol_version)
+from distutils.version import LooseVersion
+
+from tools.context import log_filter
+from tools.funcutils import merge_dicts
+
+logger = logging.getLogger(__name__)
+
+
+def retry_till_success(fun, *args, **kwargs):
+    timeout = kwargs.pop('timeout', 60)
+    bypassed_exception = kwargs.pop('bypassed_exception', Exception)
+
+    deadline = time.time() + timeout
+    while True:
+        try:
+            return fun(*args, **kwargs)
+        except bypassed_exception:
+            if time.time() > deadline:
+                raise
+            else:
+                # brief pause before next attempt
+                time.sleep(0.25)
+
+
+class DTestSetup:
+    def __init__(self, dtest_config=None, setup_overrides=None):
+        self.dtest_config = dtest_config
+        self.setup_overrides = setup_overrides
+        self.ignore_log_patterns = []
+        self.cluster = None
+        self.cluster_options = []
+        self.replacement_node = None
+        self.allow_log_errors = False
+        self.connections = []
+
+        self.log_saved_dir = "logs"
+        try:
+            os.mkdir(self.log_saved_dir)
+        except OSError:
+            pass
+
+        self.last_log = os.path.join(self.log_saved_dir, "last")
+        self.test_path = self.get_test_path()
+        self.enable_for_jolokia = False
+        self.subprocs = []
+        self.log_watch_thread = None
+        self.last_test_dir = "last_test_dir"
+        self.jvm_args = []
+
+    def get_test_path(self):
+        test_path = tempfile.mkdtemp(prefix='dtest-')
+
+        # ccm on cygwin needs absolute path to directory - it crosses from 
cygwin space into
+        # regular Windows space on wmic calls which will otherwise break 
pathing
+        if sys.platform == "cygwin":
+            process = subprocess.Popen(["cygpath", "-m", test_path], 
stdout=subprocess.PIPE,
+                                       stderr=subprocess.STDOUT)
+            test_path = process.communicate()[0].rstrip()
+
+        return test_path
+
+    def glob_data_dirs(self, path, ks="ks"):
+        result = []
+        for node in self.cluster.nodelist():
+            for data_dir in node.data_directories():
+                ks_dir = os.path.join(data_dir, ks, path)
+                result.extend(glob.glob(ks_dir))
+        return result
+
+    def begin_active_log_watch(self):
+        """
+        Calls into ccm to start actively watching logs.
+
+        In the event that errors are seen in logs, ccm will call back to 
_log_error_handler.
+
+        When the cluster is no longer in use, stop_active_log_watch should be 
called to end log watching.
+        (otherwise a 'daemon' thread will (needlessly) run until the process 
exits).
+        """
+        self._log_watch_thread = 
self.cluster.actively_watch_logs_for_error(self._log_error_handler, 
interval=0.25)
+
+    def _log_error_handler(self, errordata):
+        """
+        Callback handler used in conjunction with begin_active_log_watch.
+        When called, prepares exception instance, we will use pytest.fail
+        to kill the current test being executed and mark it as failed
+
+        @param errordata is a dictonary mapping node name to failure list.
+        """
+        # in some cases self.allow_log_errors may get set after proactive log 
checking has been enabled
+        # so we need to double-check first thing before proceeding
+        if self.allow_log_errors:
+            return
+
+        reportable_errordata = OrderedDict()
+
+        for nodename, errors in list(errordata.items()):
+            filtered_errors = list(self.__filter_errors(['\n'.join(msg) for 
msg in errors]))
+            if len(filtered_errors) is not 0:
+                reportable_errordata[nodename] = filtered_errors
+
+        # no errors worthy of halting the test
+        if not reportable_errordata:
+            return
+
+        message = "Errors seen in logs for: {nodes}".format(nodes=", 
".join(list(reportable_errordata.keys())))
+        for nodename, errors in list(reportable_errordata.items()):
+            for error in errors:
+                message += "\n{nodename}: {error}".format(nodename=nodename, 
error=error)
+
+        logger.debug('Errors were just seen in logs, ending test (if not 
ending already)!')
+        pytest.fail("Error details: \n{message}".format(message=message))
+
+    def copy_logs(self, directory=None, name=None):
+        """Copy the current cluster's log files somewhere, by default to 
LOG_SAVED_DIR with a name of 'last'"""
+        if directory is None:
+            directory = self.log_saved_dir
+        if name is None:
+            name = self.last_log
+        else:
+            name = os.path.join(directory, name)
+        if not os.path.exists(directory):
+            os.mkdir(directory)
+        logs = [(node.name, node.logfilename(), node.debuglogfilename(), 
node.gclogfilename(),
+                 node.compactionlogfilename())
+                for node in list(self.cluster.nodes.values())]
+        if len(logs) is not 0:
+            basedir = str(int(time.time() * 1000)) + '_' + str(id(self))
+            logdir = os.path.join(directory, basedir)
+            os.mkdir(logdir)
+            for n, log, debuglog, gclog, compactionlog in logs:
+                if os.path.exists(log):
+                    assert os.path.getsize(log) >= 0
+                    shutil.copyfile(log, os.path.join(logdir, n + ".log"))
+                if os.path.exists(debuglog):
+                    assert os.path.getsize(debuglog) >= 0
+                    shutil.copyfile(debuglog, os.path.join(logdir, n + 
"_debug.log"))
+                if os.path.exists(gclog):
+                    assert os.path.getsize(gclog) >= 0
+                    shutil.copyfile(gclog, os.path.join(logdir, n + "_gc.log"))
+                if os.path.exists(compactionlog):
+                    assert os.path.getsize(compactionlog) >= 0
+                    shutil.copyfile(compactionlog, os.path.join(logdir, n + 
"_compaction.log"))
+            if os.path.exists(name):
+                os.unlink(name)
+            if not is_win():
+                os.symlink(basedir, name)
+
+    def cql_connection(self, node, keyspace=None, user=None,
+                       password=None, compression=True, protocol_version=None, 
port=None, ssl_opts=None, **kwargs):
+
+        return self._create_session(node, keyspace, user, password, 
compression,
+                                    protocol_version, port=port, 
ssl_opts=ssl_opts, **kwargs)
+
+    def exclusive_cql_connection(self, node, keyspace=None, user=None,
+                                 password=None, compression=True, 
protocol_version=None, port=None, ssl_opts=None,
+                                 **kwargs):
+
+        node_ip = get_ip_from_node(node)
+        wlrr = WhiteListRoundRobinPolicy([node_ip])
+
+        return self._create_session(node, keyspace, user, password, 
compression,
+                                    protocol_version, port=port, 
ssl_opts=ssl_opts, load_balancing_policy=wlrr,
+                                    **kwargs)
+
+    def _create_session(self, node, keyspace, user, password, compression, 
protocol_version,
+                        port=None, ssl_opts=None, execution_profiles=None, 
**kwargs):
+        node_ip = get_ip_from_node(node)
+        if not port:
+            port = get_port_from_node(node)
+
+        if protocol_version is None:
+            protocol_version = 
get_eager_protocol_version(node.cluster.version())
+
+        if user is not None:
+            auth_provider = get_auth_provider(user=user, password=password)
+        else:
+            auth_provider = None
+
+        profiles = {EXEC_PROFILE_DEFAULT: make_execution_profile(**kwargs)
+                    } if not execution_profiles else execution_profiles
+
+        cluster = PyCluster([node_ip],
+                            auth_provider=auth_provider,
+                            compression=compression,
+                            protocol_version=protocol_version,
+                            port=port,
+                            ssl_options=ssl_opts,
+                            connect_timeout=15,
+                            allow_beta_protocol_version=True,
+                            execution_profiles=profiles)
+        session = cluster.connect(wait_for_all_pools=True)
+
+        if keyspace is not None:
+            session.set_keyspace(keyspace)
+
+        self.connections.append(session)
+        return session
+
+    def patient_cql_connection(self, node, keyspace=None,
+                               user=None, password=None, timeout=30, 
compression=True,
+                               protocol_version=None, port=None, 
ssl_opts=None, **kwargs):
+        """
+        Returns a connection after it stops throwing NoHostAvailables due to 
not being ready.
+
+        If the timeout is exceeded, the exception is raised.
+        """
+        if is_win():
+            timeout *= 2
+
+        expected_log_lines = ('Control connection failed to connect, shutting 
down Cluster:',
+                              '[control connection] Error connecting to ')
+        with log_filter('cassandra.cluster', expected_log_lines):
+            session = retry_till_success(
+                self.cql_connection,
+                node,
+                keyspace=keyspace,
+                user=user,
+                password=password,
+                timeout=timeout,
+                compression=compression,
+                protocol_version=protocol_version,
+                port=port,
+                ssl_opts=ssl_opts,
+                bypassed_exception=NoHostAvailable,
+                **kwargs
+            )
+
+        return session
+
+    def patient_exclusive_cql_connection(self, node, keyspace=None,
+                                         user=None, password=None, timeout=30, 
compression=True,
+                                         protocol_version=None, port=None, 
ssl_opts=None, **kwargs):
+        """
+        Returns a connection after it stops throwing NoHostAvailables due to 
not being ready.
+
+        If the timeout is exceeded, the exception is raised.
+        """
+        if is_win():
+            timeout *= 2
+
+        return retry_till_success(
+            self.exclusive_cql_connection,
+            node,
+            keyspace=keyspace,
+            user=user,
+            password=password,
+            timeout=timeout,
+            compression=compression,
+            protocol_version=protocol_version,
+            port=port,
+            ssl_opts=ssl_opts,
+            bypassed_exception=NoHostAvailable,
+            **kwargs
+        )
+
+    def check_logs_for_errors(self):
+        for node in self.cluster.nodelist():
+            errors = list(self.__filter_errors(
+                ['\n'.join(msg) for msg in node.grep_log_for_errors()]))
+            if len(errors) is not 0:
+                for error in errors:
+                    print("Unexpected error in {node_name} log, error: 
\n{error}".format(node_name=node.name, error=error))
+                return True
+
+    def __filter_errors(self, errors):
+        """Filter errors, removing those that match self.ignore_log_patterns"""
+        if not hasattr(self, 'ignore_log_patterns'):
+            self.ignore_log_patterns = []
+        for e in errors:
+            for pattern in self.ignore_log_patterns:
+                if re.search(pattern, e):
+                    break
+            else:
+                yield e
+
+    def get_jfr_jvm_args(self):
+        """
+        @return The JVM arguments required for attaching flight recorder to a 
Java process.
+        """
+        return ["-XX:+UnlockCommercialFeatures", "-XX:+FlightRecorder"]
+
+    def start_jfr_recording(self, nodes):
+        """
+        Start Java flight recorder provided the cluster was started with the 
correct jvm arguments.
+        """
+        for node in nodes:
+            p = subprocess.Popen(['jcmd', str(node.pid), 'JFR.start'],
+                                 stdout=subprocess.PIPE,
+                                 stderr=subprocess.PIPE)
+            stdout, stderr = p.communicate()
+            logger.debug(stdout)
+            logger.debug(stderr)
+
+    def dump_jfr_recording(self, nodes):
+        """
+        Save Java flight recorder results to file for analyzing with mission 
control.
+        """
+        for node in nodes:
+            p = subprocess.Popen(['jcmd', str(node.pid), 'JFR.dump',
+                                  'recording=1', 
'filename=recording_{}.jfr'.format(node.address())],
+                                 stdout=subprocess.PIPE,
+                                 stderr=subprocess.PIPE)
+            stdout, stderr = p.communicate()
+            logger.debug(stdout)
+            logger.debug(stderr)
+
+    def supports_v5_protocol(self, cluster_version):
+        return cluster_version >= LooseVersion('4.0')
+
+    def cleanup_last_test_dir(self):
+        if os.path.exists(self.last_test_dir):
+            os.remove(self.last_test_dir)
+
+    def stop_active_log_watch(self):
+        """
+        Joins the log watching thread, which will then exit.
+        Should be called after each test, ideally after nodes are stopped but 
before cluster files are removed.
+
+        Can be called multiple times without error.
+        If not called, log watching thread will remain running until the 
parent process exits.
+        """
+        self.log_watch_thread.join(timeout=60)
+
+    def cleanup_cluster(self):
+        with log_filter('cassandra'):  # quiet noise from driver when nodes 
start going down
+            if self.dtest_config.keep_test_dir:
+                
self.cluster.stop(gently=self.dtest_config.enable_jacoco_code_coverage)
+            else:
+                # when recording coverage the jvm has to exit normally
+                # or the coverage information is not written by the jacoco 
agent
+                # otherwise we can just kill the process
+                if self.dtest_config.enable_jacoco_code_coverage:
+                    self.cluster.stop(gently=True)
+
+                # Cleanup everything:
+                try:
+                    if self.log_watch_thread:
+                        self.stop_active_log_watch()
+                finally:
+                    logger.debug("removing ccm cluster {name} at: 
{path}".format(name=self.cluster.name,
+                                                                          
path=self.test_path))
+                    self.cluster.remove()
+
+                    logger.debug("clearing ssl stores from [{0}] 
directory".format(self.test_path))
+                    for filename in ('keystore.jks', 'truststore.jks', 
'ccm_node.cer'):
+                        try:
+                            os.remove(os.path.join(self.test_path, filename))
+                        except OSError as e:
+                            # ENOENT = no such file or directory
+                            assert e.errno == errno.ENOENT
+
+                    os.rmdir(self.test_path)
+                    self.cleanup_last_test_dir()
+
+    def cleanup_and_replace_cluster(self):
+        for con in self.connections:
+            con.cluster.shutdown()
+        self.connections = []
+
+        self.cleanup_cluster()
+        self.test_path = self.get_test_path()
+        self.initialize_cluster()
+
+    def init_default_config(self):
+        # the failure detector can be quite slow in such tests with quick 
start/stop
+        phi_values = {'phi_convict_threshold': 5}
+
+        timeout = 15000
+        if self.cluster_options is not None and len(self.cluster_options) > 0:
+            values = merge_dicts(self.cluster_options, phi_values)
+        else:
+            values = merge_dicts(phi_values, {
+                'read_request_timeout_in_ms': timeout,
+                'range_request_timeout_in_ms': timeout,
+                'write_request_timeout_in_ms': timeout,
+                'truncate_request_timeout_in_ms': timeout,
+                'request_timeout_in_ms': timeout
+            })
+
+        if self.setup_overrides is not None and 
len(self.setup_overrides.cluster_options) > 0:
+            values = merge_dicts(values, self.setup_overrides.cluster_options)
+
+        # No more thrift in 4.0, and start_rpc doesn't exists anymore
+        if self.cluster.version() >= '4' and 'start_rpc' in values:
+            del values['start_rpc']
+
+        self.cluster.set_configuration_options(values)
+        logger.debug("Done setting configuration options:\n" + 
pprint.pformat(self.cluster._config_options, indent=4))
+
+    def maybe_setup_jacoco(self, cluster_name='test'):
+        """Setup JaCoCo code coverage support"""
+
+        if not self.dtest_config.enable_jacoco_code_coverage:
+            return
+
+        # use explicit agent and execfile locations
+        # or look for a cassandra build if they are not specified
+        agent_location = os.environ.get('JACOCO_AGENT_JAR',
+                                        
os.path.join(self.dtest_config.cassandra_dir, 'build/lib/jars/jacocoagent.jar'))
+        jacoco_execfile = os.environ.get('JACOCO_EXECFILE',
+                                         
os.path.join(self.dtest_config.cassandra_dir, 'build/jacoco/jacoco.exec'))
+
+        if os.path.isfile(agent_location):
+            logger.debug("Jacoco agent found at {}".format(agent_location))
+            with open(os.path.join(
+                    self.test_path, cluster_name, 'cassandra.in.sh'), 'w') as 
f:
+
+                f.write('JVM_OPTS="$JVM_OPTS 
-javaagent:{jar_path}=destfile={exec_file}"'
+                        .format(jar_path=agent_location, 
exec_file=jacoco_execfile))
+
+                if os.path.isfile(jacoco_execfile):
+                    logger.debug("Jacoco execfile found at {}, execution data 
will be appended".format(jacoco_execfile))
+                else:
+                    logger.debug("Jacoco execfile will be created at 
{}".format(jacoco_execfile))
+        else:
+            logger.debug("Jacoco agent not found or is not file. Execution 
will not be recorded.")
+
+    def create_ccm_cluster(self, name):
+        logger.debug("cluster ccm directory: " + self.test_path)
+        version = self.dtest_config.cassandra_version
+
+        if version:
+            cluster = Cluster(self.test_path, name, cassandra_version=version)
+        else:
+            cluster = Cluster(self.test_path, name, 
cassandra_dir=self.dtest_config.cassandra_dir)
+
+        if self.dtest_config.use_vnodes:
+            cluster.set_configuration_options(values={'initial_token': None, 
'num_tokens': self.dtest_config.num_tokens})
+        else:
+            cluster.set_configuration_options(values={'num_tokens': None})
+
+        if self.dtest_config.use_off_heap_memtables:
+            
cluster.set_configuration_options(values={'memtable_allocation_type': 
'offheap_objects'})
+
+        cluster.set_datadir_count(self.dtest_config.data_dir_count)
+        cluster.set_environment_variable('CASSANDRA_LIBJEMALLOC', 
self.dtest_config.jemalloc_path)
+
+        return cluster
+
+    def set_cluster_log_levels(self):
+        """
+        The root logger gets configured in the fixture named 
fixture_logging_setup.
+        Based on the logging configuration options the user invoked pytest 
with,
+        that fixture sets the root logger to that configuration. We then 
ensure all
+        Cluster objects we work with "inherit" these logging settings (which 
we can
+        lookup off the root logger)
+        """
+        if logging.root.level != 'NOTSET':
+            log_level = logging.getLevelName(logging.INFO)
+        else:
+            log_level = logging.root.level
+        self.cluster.set_log_level(log_level)
+
+    def initialize_cluster(self):
+        """
+        This method is responsible for initializing and configuring a ccm
+        cluster for the next set of tests.  This can be called for two
+        different reasons:
+         * A class of tests is starting
+         * A test method failed/errored, so the cluster has been wiped
+
+        Subclasses that require custom initialization should generally
+        do so by overriding post_initialize_cluster().
+        """
+        # connections = []
+        # cluster_options = []
+        self.cluster = self.create_ccm_cluster(name='test')
+        self.init_default_config()
+        self.maybe_setup_jacoco()
+        self.set_cluster_log_levels()
+
+        # cls.init_config()
+        # write_last_test_file(cls.test_path, cls.cluster)
+
+        # cls.post_initialize_cluster()

http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/dtest_setup_overrides.py
----------------------------------------------------------------------
diff --git a/dtest_setup_overrides.py b/dtest_setup_overrides.py
new file mode 100644
index 0000000..6ea3258
--- /dev/null
+++ b/dtest_setup_overrides.py
@@ -0,0 +1,3 @@
+class DTestSetupOverrides:
+    def __init__(self):
+        self.cluster_options = []
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/global_row_key_cache_test.py
----------------------------------------------------------------------
diff --git a/global_row_key_cache_test.py b/global_row_key_cache_test.py
index c16f793..74c67f4 100644
--- a/global_row_key_cache_test.py
+++ b/global_row_key_cache_test.py
@@ -1,13 +1,16 @@
 import time
+import logging
 
 from cassandra.concurrent import execute_concurrent_with_args
 
-from dtest import Tester, debug, create_ks
+from dtest import Tester, create_ks, create_cf_simple
+
+logger = logging.getLogger(__name__)
 
 
 class TestGlobalRowKeyCache(Tester):
 
-    def functional_test(self):
+    def test_functional(self):
         cluster = self.cluster
         cluster.populate(3)
         node1 = cluster.nodelist()[0]
@@ -15,7 +18,7 @@ class TestGlobalRowKeyCache(Tester):
         for keycache_size in (0, 10):
             for rowcache_size in (0, 10):
                 cluster.stop()
-                debug("Testing with keycache size of %d MB, rowcache size of 
%d MB " %
+                logger.debug("Testing with keycache size of %d MB, rowcache 
size of %d MB " %
                       (keycache_size, rowcache_size))
                 keyspace_name = 'ks_%d_%d' % (keycache_size, rowcache_size)
 
@@ -29,14 +32,15 @@ class TestGlobalRowKeyCache(Tester):
 
                 cluster.start()
                 session = self.patient_cql_connection(node1)
-
                 create_ks(session, keyspace_name, rf=3)
 
                 session.set_keyspace(keyspace_name)
-                session.execute("CREATE TABLE test (k int PRIMARY KEY, v1 int, 
v2 int)")
-                session.execute("CREATE TABLE test_clustering (k int, v1 int, 
v2 int, PRIMARY KEY (k, v1))")
-                session.execute("CREATE TABLE test_counter (k int PRIMARY KEY, 
v1 counter)")
-                session.execute("CREATE TABLE test_counter_clustering (k int, 
v1 int, v2 counter, PRIMARY KEY (k, v1))")
+                create_cf_simple(session, 'test', "CREATE TABLE test (k int 
PRIMARY KEY, v1 int, v2 int)")
+                create_cf_simple(session, 'test_clustering',
+                                 "CREATE TABLE test_clustering (k int, v1 int, 
v2 int, PRIMARY KEY (k, v1))")
+                create_cf_simple(session, 'test_counter', "CREATE TABLE 
test_counter (k int PRIMARY KEY, v1 counter)")
+                create_cf_simple(session, 'test_counter_clustering',
+                                 "CREATE TABLE test_counter_clustering (k int, 
v1 int, v2 counter, PRIMARY KEY (k, v1))")
 
                 # insert 100 rows into each table
                 for cf in ('test', 'test_clustering'):
@@ -87,12 +91,12 @@ class TestGlobalRowKeyCache(Tester):
                 session.shutdown()
 
                 # let the data be written to the row/key caches.
-                debug("Letting caches be saved to disk")
+                logger.debug("Letting caches be saved to disk")
                 time.sleep(10)
-                debug("Stopping cluster")
+                logger.debug("Stopping cluster")
                 cluster.stop()
                 time.sleep(1)
-                debug("Starting cluster")
+                logger.debug("Starting cluster")
                 cluster.start()
                 time.sleep(5)  # read the data back from row and key caches
 
@@ -108,38 +112,38 @@ class TestGlobalRowKeyCache(Tester):
             rows = list(session.execute("SELECT * FROM %s" % (cf,)))
 
             # one row gets deleted each validation round
-            self.assertEquals(100 - (validation_round + 1), len(rows))
+            assert 100 - (validation_round + 1) == len(rows)
 
             # adjust enumeration start to account for row deletions
             for i, row in enumerate(sorted(rows), start=(validation_round + 
1)):
-                self.assertEquals(i, row.k)
-                self.assertEquals(i, row.v1)
+                assert i == row.k
+                assert i == row.v1
 
                 # updated rows will have different values
                 expected_value = validation_round if i < num_updates else i
-                self.assertEquals(expected_value, row.v2)
+                assert expected_value == row.v2
 
         # check values of counter tables
         rows = list(session.execute("SELECT * FROM test_counter"))
-        self.assertEquals(100, len(rows))
+        assert 100 == len(rows)
         for i, row in enumerate(sorted(rows)):
-            self.assertEquals(i, row.k)
+            assert i == row.k
 
             # updated rows will get incremented once each round
             expected_value = i
             if i < num_updates:
                 expected_value += validation_round + 1
 
-            self.assertEquals(expected_value, row.v1)
+            assert expected_value == row.v1
 
         rows = list(session.execute("SELECT * FROM test_counter_clustering"))
-        self.assertEquals(100, len(rows))
+        assert 100 == len(rows)
         for i, row in enumerate(sorted(rows)):
-            self.assertEquals(i, row.k)
-            self.assertEquals(i, row.v1)
+            assert i == row.k
+            assert i == row.v1
 
             expected_value = i
             if i < num_updates:
                 expected_value += validation_round + 1
 
-            self.assertEquals(expected_value, row.v2)
+            assert expected_value == row.v2

http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/hintedhandoff_test.py
----------------------------------------------------------------------
diff --git a/hintedhandoff_test.py b/hintedhandoff_test.py
index 6345e3c..68d341e 100644
--- a/hintedhandoff_test.py
+++ b/hintedhandoff_test.py
@@ -1,11 +1,15 @@
 import os
 import time
+import pytest
+import logging
 
 from cassandra import ConsistencyLevel
 
-from dtest import DISABLE_VNODES, Tester, create_ks
+from dtest import Tester, create_ks
 from tools.data import create_c1c2_table, insert_c1c2, query_c1c2
-from tools.decorators import no_vnodes, since
+
+since = pytest.mark.since
+logger = logging.getLogger(__name__)
 
 
 @since('3.0')
@@ -26,7 +30,7 @@ class TestHintedHandoffConfig(Tester):
         if config_options:
             cluster.set_configuration_options(values=config_options)
 
-        if DISABLE_VNODES:
+        if not self.dtest_config.use_vnodes:
             cluster.populate([2]).start()
         else:
             tokens = cluster.balanced_tokens(2)
@@ -39,7 +43,7 @@ class TestHintedHandoffConfig(Tester):
         Launch a nodetool command and check there is no error, return the 
result
         """
         out, err, _ = node.nodetool(cmd)
-        self.assertEqual('', err)
+        assert '' == err
         return out
 
     def _do_hinted_handoff(self, node1, node2, enabled, keyspace='ks'):
@@ -65,13 +69,13 @@ class TestHintedHandoffConfig(Tester):
 
         # Check node2 for all the keys that should have been delivered via HH 
if enabled or not if not enabled
         session = self.patient_exclusive_cql_connection(node2, 
keyspace=keyspace)
-        for n in xrange(0, 100):
+        for n in range(0, 100):
             if enabled:
                 query_c1c2(session, n, ConsistencyLevel.ONE)
             else:
                 query_c1c2(session, n, ConsistencyLevel.ONE, 
tolerate_missing=True, must_be_missing=True)
 
-    def nodetool_test(self):
+    def test_nodetool(self):
         """
         Test various nodetool commands
         """
@@ -79,25 +83,25 @@ class TestHintedHandoffConfig(Tester):
 
         for node in node1, node2:
             res = self._launch_nodetool_cmd(node, 'statushandoff')
-            self.assertEqual('Hinted handoff is running', res.rstrip())
+            assert 'Hinted handoff is running' == res.rstrip()
 
             self._launch_nodetool_cmd(node, 'disablehandoff')
             res = self._launch_nodetool_cmd(node, 'statushandoff')
-            self.assertEqual('Hinted handoff is not running', res.rstrip())
+            assert 'Hinted handoff is not running' == res.rstrip()
 
             self._launch_nodetool_cmd(node, 'enablehandoff')
             res = self._launch_nodetool_cmd(node, 'statushandoff')
-            self.assertEqual('Hinted handoff is running', res.rstrip())
+            assert 'Hinted handoff is running' == res.rstrip()
 
             self._launch_nodetool_cmd(node, 'disablehintsfordc dc1')
             res = self._launch_nodetool_cmd(node, 'statushandoff')
-            self.assertEqual('Hinted handoff is running{}Data center dc1 is 
disabled'.format(os.linesep), res.rstrip())
+            assert 'Hinted handoff is running{}Data center dc1 is 
disabled'.format(os.linesep) == res.rstrip()
 
             self._launch_nodetool_cmd(node, 'enablehintsfordc dc1')
             res = self._launch_nodetool_cmd(node, 'statushandoff')
-            self.assertEqual('Hinted handoff is running', res.rstrip())
+            assert 'Hinted handoff is running' == res.rstrip()
 
-    def hintedhandoff_disabled_test(self):
+    def test_hintedhandoff_disabled(self):
         """
         Test gloabl hinted handoff disabled
         """
@@ -105,11 +109,11 @@ class TestHintedHandoffConfig(Tester):
 
         for node in node1, node2:
             res = self._launch_nodetool_cmd(node, 'statushandoff')
-            self.assertEqual('Hinted handoff is not running', res.rstrip())
+            assert 'Hinted handoff is not running' == res.rstrip()
 
         self._do_hinted_handoff(node1, node2, False)
 
-    def hintedhandoff_enabled_test(self):
+    def test_hintedhandoff_enabled(self):
         """
         Test global hinted handoff enabled
         """
@@ -117,12 +121,12 @@ class TestHintedHandoffConfig(Tester):
 
         for node in node1, node2:
             res = self._launch_nodetool_cmd(node, 'statushandoff')
-            self.assertEqual('Hinted handoff is running', res.rstrip())
+            assert 'Hinted handoff is running' == res.rstrip()
 
         self._do_hinted_handoff(node1, node2, True)
 
     @since('4.0')
-    def hintedhandoff_setmaxwindow_test(self):
+    def test_hintedhandoff_setmaxwindow(self):
         """
         Test global hinted handoff against max_hint_window_in_ms update via 
nodetool
         """
@@ -130,18 +134,18 @@ class TestHintedHandoffConfig(Tester):
 
         for node in node1, node2:
             res = self._launch_nodetool_cmd(node, 'statushandoff')
-            self.assertEqual('Hinted handoff is running', res.rstrip())
+            assert 'Hinted handoff is running' == res.rstrip()
 
         res = self._launch_nodetool_cmd(node, 'getmaxhintwindow')
-        self.assertEqual('Current max hint window: 300000 ms', res.rstrip())
+        assert 'Current max hint window: 300000 ms' == res.rstrip()
         self._do_hinted_handoff(node1, node2, True)
         node1.start(wait_other_notice=True)
         self._launch_nodetool_cmd(node, 'setmaxhintwindow 1')
         res = self._launch_nodetool_cmd(node, 'getmaxhintwindow')
-        self.assertEqual('Current max hint window: 1 ms', res.rstrip())
+        assert 'Current max hint window: 1 ms' == res.rstrip()
         self._do_hinted_handoff(node1, node2, False, keyspace='ks2')
 
-    def hintedhandoff_dc_disabled_test(self):
+    def test_hintedhandoff_dc_disabled(self):
         """
         Test global hinted handoff enabled with the dc disabled
         """
@@ -150,11 +154,11 @@ class TestHintedHandoffConfig(Tester):
 
         for node in node1, node2:
             res = self._launch_nodetool_cmd(node, 'statushandoff')
-            self.assertEqual('Hinted handoff is running{}Data center dc1 is 
disabled'.format(os.linesep), res.rstrip())
+            assert 'Hinted handoff is running{}Data center dc1 is 
disabled'.format(os.linesep) == res.rstrip()
 
         self._do_hinted_handoff(node1, node2, False)
 
-    def hintedhandoff_dc_reenabled_test(self):
+    def test_hintedhandoff_dc_reenabled(self):
         """
         Test global hinted handoff enabled with the dc disabled first and then 
re-enabled
         """
@@ -163,20 +167,20 @@ class TestHintedHandoffConfig(Tester):
 
         for node in node1, node2:
             res = self._launch_nodetool_cmd(node, 'statushandoff')
-            self.assertEqual('Hinted handoff is running{}Data center dc1 is 
disabled'.format(os.linesep), res.rstrip())
+            assert 'Hinted handoff is running{}Data center dc1 is 
disabled'.format(os.linesep) == res.rstrip()
 
         for node in node1, node2:
             self._launch_nodetool_cmd(node, 'enablehintsfordc dc1')
             res = self._launch_nodetool_cmd(node, 'statushandoff')
-            self.assertEqual('Hinted handoff is running', res.rstrip())
+            assert 'Hinted handoff is running' == res.rstrip()
 
         self._do_hinted_handoff(node1, node2, True)
 
 
 class TestHintedHandoff(Tester):
 
-    @no_vnodes()
-    def hintedhandoff_decom_test(self):
+    @pytest.mark.no_vnodes
+    def test_hintedhandoff_decom(self):
         self.cluster.populate(4).start(wait_for_binary_proto=True)
         [node1, node2, node3, node4] = self.cluster.nodelist()
         session = self.patient_cql_connection(node1)
@@ -192,5 +196,5 @@ class TestHintedHandoff(Tester):
         node3.decommission(force=force)
 
         time.sleep(5)
-        for x in xrange(0, 100):
+        for x in range(0, 100):
             query_c1c2(session, x, ConsistencyLevel.ONE)

http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/internode_ssl_test.py
----------------------------------------------------------------------
diff --git a/internode_ssl_test.py b/internode_ssl_test.py
index 4149d26..fff9985 100644
--- a/internode_ssl_test.py
+++ b/internode_ssl_test.py
@@ -1,11 +1,15 @@
-from dtest import Tester, debug, create_ks, create_cf
+import logging
+
+from dtest import Tester, create_ks, create_cf
 from tools.data import putget
 from tools.misc import generate_ssl_stores
 
+logger = logging.getLogger(__name__)
+
 
 class TestInternodeSSL(Tester):
 
-    def putget_with_internode_ssl_test(self):
+    def test_putget_with_internode_ssl(self):
         """
         Simple putget test with internode ssl enabled
         with default 'all' internode compression
@@ -13,7 +17,7 @@ class TestInternodeSSL(Tester):
         """
         self.__putget_with_internode_ssl_test('all')
 
-    def putget_with_internode_ssl_without_compression_test(self):
+    def test_putget_with_internode_ssl_without_compression(self):
         """
         Simple putget test with internode ssl enabled
         without internode compression
@@ -24,10 +28,10 @@ class TestInternodeSSL(Tester):
     def __putget_with_internode_ssl_test(self, internode_compression):
         cluster = self.cluster
 
-        debug("***using internode ssl***")
-        generate_ssl_stores(self.test_path)
+        logger.debug("***using internode ssl***")
+        generate_ssl_stores(self.fixture_dtest_setup.test_path)
         cluster.set_configuration_options({'internode_compression': 
internode_compression})
-        cluster.enable_internode_ssl(self.test_path)
+        cluster.enable_internode_ssl(self.fixture_dtest_setup.test_path)
 
         cluster.populate(3).start()
 

http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/jmx_auth_test.py
----------------------------------------------------------------------
diff --git a/jmx_auth_test.py b/jmx_auth_test.py
index 99e7d80..99b227f 100644
--- a/jmx_auth_test.py
+++ b/jmx_auth_test.py
@@ -1,16 +1,19 @@
+import pytest
+import logging
 from distutils.version import LooseVersion
 
 from ccmlib.node import ToolError
-
 from dtest import Tester
-from tools.decorators import since
 from tools.jmxutils import apply_jmx_authentication
 
+since = pytest.mark.since
+logger = logging.getLogger(__name__)
+
 
 @since('3.6')
 class TestJMXAuth(Tester):
 
-    def basic_auth_test(self):
+    def test_basic_auth(self):
         """
         Some basic smoke testing of JMX authentication and authorization.
         Uses nodetool as a means of exercising the JMX interface as 
JolokiaAgent
@@ -29,21 +32,21 @@ class TestJMXAuth(Tester):
         session.execute("GRANT DESCRIBE ON ALL MBEANS TO jmx_user")
         session.execute("CREATE ROLE test WITH LOGIN=true and 
PASSWORD='abc123'")
 
-        with self.assertRaisesRegexp(ToolError, 
self.authentication_fail_message(node, 'baduser')):
+        with pytest.raises(ToolError, 
matches=self.authentication_fail_message(node, 'baduser')):
             node.nodetool('-u baduser -pw abc123 gossipinfo')
 
-        with self.assertRaisesRegexp(ToolError, 
self.authentication_fail_message(node, 'test')):
+        with pytest.raises(ToolError, 
matches=self.authentication_fail_message(node, 'test')):
             node.nodetool('-u test -pw badpassword gossipinfo')
 
-        with self.assertRaisesRegexp(ToolError, "Required key 'username' is 
missing"):
+        with pytest.raises(ToolError, matches="Required key 'username' is 
missing"):
             node.nodetool('gossipinfo')
 
         # role must have LOGIN attribute
-        with self.assertRaisesRegexp(ToolError, 'jmx_user is not permitted to 
log in'):
+        with pytest.raises(ToolError, matches='jmx_user is not permitted to 
log in'):
             node.nodetool('-u jmx_user -pw 321cba gossipinfo')
 
         # test doesn't yet have any privileges on the necessary JMX resources
-        with self.assertRaisesRegexp(ToolError, 'Access Denied'):
+        with pytest.raises(ToolError, matches='Access Denied'):
             node.nodetool('-u test -pw abc123 gossipinfo')
 
         session.execute("GRANT jmx_user TO test")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to