http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/dtest.py ---------------------------------------------------------------------- diff --git a/dtest.py b/dtest.py index 25c52e3..aad9e58 100644 --- a/dtest.py +++ b/dtest.py @@ -1,50 +1,30 @@ -from __future__ import with_statement - -import ConfigParser +import configparser import copy -import errno -import glob import logging import os -import pprint import re -import shutil -import signal import subprocess import sys -import tempfile -import thread import threading import time import traceback -import types -import unittest.case -from collections import OrderedDict -from subprocess import CalledProcessError -from unittest import TestCase - +import pytest import cassandra import ccmlib.repository -from cassandra import ConsistencyLevel + +from subprocess import CalledProcessError + +from flaky import flaky + +from cassandra import ConsistencyLevel, OperationTimedOut from cassandra.auth import PlainTextAuthProvider -from cassandra.cluster import Cluster as PyCluster -from cassandra.cluster import NoHostAvailable -from cassandra.cluster import ExecutionProfile, EXEC_PROFILE_DEFAULT -from cassandra.policies import RetryPolicy, WhiteListRoundRobinPolicy -from ccmlib.cluster import Cluster -from ccmlib.cluster_factory import ClusterFactory +from cassandra.cluster import ExecutionProfile +from cassandra.policies import RetryPolicy, RoundRobinPolicy from ccmlib.common import get_version_from_build, is_win +from ccmlib.node import ToolError, TimeoutError from distutils.version import LooseVersion -from nose.exc import SkipTest -from nose.tools import assert_greater_equal -from six import print_ +from tools.misc import retry_till_success -from plugins.dtestconfig import _CONFIG as CONFIG -# We don't want test files to know about the plugins module, so we import -# constants here and re-export them. -from plugins.dtestconfig import GlobalConfigObject -from tools.context import log_filter -from tools.funcutils import merge_dicts LOG_SAVED_DIR = "logs" try: @@ -57,52 +37,14 @@ LAST_LOG = os.path.join(LOG_SAVED_DIR, "last") LAST_TEST_DIR = 'last_test_dir' DEFAULT_DIR = './' -config = ConfigParser.RawConfigParser() +config = configparser.RawConfigParser() if len(config.read(os.path.expanduser('~/.cassandra-dtest'))) > 0: if config.has_option('main', 'default_dir'): DEFAULT_DIR = os.path.expanduser(config.get('main', 'default_dir')) -CASSANDRA_DIR = os.environ.get('CASSANDRA_DIR', DEFAULT_DIR) - -NO_SKIP = os.environ.get('SKIP', '').lower() in ('no', 'false') -DEBUG = os.environ.get('DEBUG', '').lower() in ('yes', 'true') -TRACE = os.environ.get('TRACE', '').lower() in ('yes', 'true') -KEEP_LOGS = os.environ.get('KEEP_LOGS', '').lower() in ('yes', 'true') -KEEP_TEST_DIR = os.environ.get('KEEP_TEST_DIR', '').lower() in ('yes', 'true') -PRINT_DEBUG = os.environ.get('PRINT_DEBUG', '').lower() in ('yes', 'true') -OFFHEAP_MEMTABLES = os.environ.get('OFFHEAP_MEMTABLES', '').lower() in ('yes', 'true') -NUM_TOKENS = os.environ.get('NUM_TOKENS', '256') -RECORD_COVERAGE = os.environ.get('RECORD_COVERAGE', '').lower() in ('yes', 'true') -IGNORE_REQUIRE = os.environ.get('IGNORE_REQUIRE', '').lower() in ('yes', 'true') -DATADIR_COUNT = os.environ.get('DATADIR_COUNT', '3') -ENABLE_ACTIVE_LOG_WATCHING = os.environ.get('ENABLE_ACTIVE_LOG_WATCHING', '').lower() in ('yes', 'true') -RUN_STATIC_UPGRADE_MATRIX = os.environ.get('RUN_STATIC_UPGRADE_MATRIX', '').lower() in ('yes', 'true') - -# devault values for configuration from configuration plugin -_default_config = GlobalConfigObject( - vnodes=True, -) - -if CONFIG is None: - CONFIG = _default_config -DISABLE_VNODES = not CONFIG.vnodes - - -if os.environ.get('DISABLE_VNODES', '').lower() in ('yes', 'true'): - print 'DISABLE_VNODES environment variable deprecated. Use `./run_dtests.py --vnodes false` instead.' - - -CURRENT_TEST = "" - -logging.basicConfig(filename=os.path.join(LOG_SAVED_DIR, "dtest.log"), - filemode='w', - format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s', - datefmt='%H:%M:%S', - level=logging.DEBUG) +RUN_STATIC_UPGRADE_MATRIX = os.environ.get('RUN_STATIC_UPGRADE_MATRIX', '').lower() in ('yes', 'true') -LOG = logging.getLogger('dtest') -# set python-driver log level to INFO by default for dtest -logging.getLogger('cassandra').setLevel(logging.INFO) +logger = logging.getLogger(__name__) def get_sha(repo_dir): @@ -111,11 +53,13 @@ def get_sha(repo_dir): prefix = 'github:apache/' local_repo_location = os.environ.get('LOCAL_GIT_REPO') if local_repo_location is not None: - prefix = 'local:{}:'.format(local_repo_location) # local: slugs take the form 'local:/some/path/to/cassandra/:branch_name_or_sha' + prefix = 'local:{}:'.format(local_repo_location) + # local: slugs take the form 'local:/some/path/to/cassandra/:branch_name_or_sha' return "{}{}".format(prefix, output) except CalledProcessError as e: - if re.search('Not a git repository', e.message) is not None: - # we tried to get a sha, but repo_dir isn't a git repo. No big deal, must just be working from a non-git install. + if re.search(str(e), 'Not a git repository') is not None: + # we tried to get a sha, but repo_dir isn't a git repo. No big deal, must just be + # working from a non-git install. return None else: # git call failed for some unknown reason @@ -137,33 +81,12 @@ if _cassandra_version_slug: CASSANDRA_VERSION_FROM_BUILD = get_version_from_build(ccm_repo_cache_dir) CASSANDRA_GITREF = get_sha(ccm_repo_cache_dir) # will be set None when not a git repo else: - CASSANDRA_VERSION_FROM_BUILD = get_version_from_build(CASSANDRA_DIR) - CASSANDRA_GITREF = get_sha(CASSANDRA_DIR) - + CASSANDRA_VERSION_FROM_BUILD = LooseVersion("4.0") # todo kjkjkj + CASSANDRA_GITREF = "" + #CASSANDRA_VERSION_FROM_BUILD = get_version_from_build(self.dtest_config.cassandra_dir) + #CASSANDRA_GITREF = get_sha(dtest_config.cassandra_dir) -# Determine the location of the libjemalloc jar so that we can specify it -# through environment variables when start Cassandra. This reduces startup -# time, making the dtests run faster. -def find_libjemalloc(): - if is_win(): - # let the normal bat script handle finding libjemalloc - return "" - this_dir = os.path.dirname(os.path.realpath(__file__)) - script = os.path.join(this_dir, "findlibjemalloc.sh") - try: - p = subprocess.Popen([script], stdout=subprocess.PIPE, stderr=subprocess.PIPE) - stdout, stderr = p.communicate() - if stderr or not stdout: - return "-" # tells C* not to look for libjemalloc - else: - return stdout - except Exception as exc: - print "Failed to run script to prelocate libjemalloc ({}): {}".format(script, exc) - return "" - - -CASSANDRA_LIBJEMALLOC = find_libjemalloc() # copy the initial environment variables so we can reset them later: initial_environment = copy.deepcopy(os.environ) @@ -172,40 +95,7 @@ class DtestTimeoutError(Exception): pass -def reset_environment_vars(): - os.environ.clear() - os.environ.update(initial_environment) - - -def warning(msg): - LOG.warning("{} - {}".format(CURRENT_TEST, msg)) - if PRINT_DEBUG: - print "WARN: " + msg - - -def debug(msg): - LOG.debug("{} - {}".format(CURRENT_TEST, msg)) - if PRINT_DEBUG: - print msg - - -debug("Python driver version in use: {}".format(cassandra.__version__)) - - -def retry_till_success(fun, *args, **kwargs): - timeout = kwargs.pop('timeout', 60) - bypassed_exception = kwargs.pop('bypassed_exception', Exception) - - deadline = time.time() + timeout - while True: - try: - return fun(*args, **kwargs) - except bypassed_exception: - if time.time() > deadline: - raise - else: - # brief pause before next attempt - time.sleep(0.25) +logger.debug("Python driver version in use: {}".format(cassandra.__version__)) class FlakyRetryPolicy(RetryPolicy): @@ -219,21 +109,21 @@ class FlakyRetryPolicy(RetryPolicy): def on_read_timeout(self, *args, **kwargs): if kwargs['retry_num'] < self.max_retries: - debug("Retrying read after timeout. Attempt #" + str(kwargs['retry_num'])) + logger.debug("Retrying read after timeout. Attempt #" + str(kwargs['retry_num'])) return (self.RETRY, None) else: return (self.RETHROW, None) def on_write_timeout(self, *args, **kwargs): if kwargs['retry_num'] < self.max_retries: - debug("Retrying write after timeout. Attempt #" + str(kwargs['retry_num'])) + logger.debug("Retrying write after timeout. Attempt #" + str(kwargs['retry_num'])) return (self.RETRY, None) else: return (self.RETHROW, None) def on_unavailable(self, *args, **kwargs): if kwargs['retry_num'] < self.max_retries: - debug("Retrying request after UE. Attempt #" + str(kwargs['retry_num'])) + logger.debug("Retrying request after UE. Attempt #" + str(kwargs['retry_num'])) return (self.RETRY, None) else: return (self.RETHROW, None) @@ -261,8 +151,18 @@ class Runner(threading.Thread): i = i + 1 def stop(self): + if self.__stopped: + return + self.__stopped = True - self.join() + # pytests may appear to hang forever waiting for cluster tear down. are all driver session objects shutdown? + # to debug hang you can add the following at the top of the test + # import faulthandler + # faulthandler.enable() + # + # and then when the hang occurs send a SIGABRT to the pytest process (e.g. kill -SIGABRT <pytest_pid>) + # this will print a python thread dump of all currently alive threads + self.join(timeout=30) if self.__error is not None: raise self.__error @@ -272,344 +172,103 @@ class Runner(threading.Thread): def make_execution_profile(retry_policy=FlakyRetryPolicy(), consistency_level=ConsistencyLevel.ONE, **kwargs): - return ExecutionProfile(retry_policy=retry_policy, - consistency_level=consistency_level, - **kwargs) - - -class Tester(TestCase): - - maxDiff = None - allow_log_errors = False # scan the log of each node for errors after every test. - cluster_options = None - - def set_node_to_current_version(self, node): - version = os.environ.get('CASSANDRA_VERSION') - cdir = CASSANDRA_DIR - - if version: - node.set_install_dir(version=version) - else: - node.set_install_dir(install_dir=cdir) - - def init_config(self): - init_default_config(self.cluster, self.cluster_options) - - def setUp(self): - self.set_current_tst_name() - kill_windows_cassandra_procs() - maybe_cleanup_cluster_from_last_test_file() - - self.test_path = get_test_path() - self.cluster = create_ccm_cluster(self.test_path, name='test') - - self.maybe_begin_active_log_watch() - maybe_setup_jacoco(self.test_path) - - self.init_config() - write_last_test_file(self.test_path, self.cluster) - - set_log_levels(self.cluster) - self.connections = [] - self.runners = [] - - # this is intentionally spelled 'tst' instead of 'test' to avoid - # making unittest think it's a test method - def set_current_tst_name(self): - global CURRENT_TEST - CURRENT_TEST = self.id() - - def maybe_begin_active_log_watch(self): - if ENABLE_ACTIVE_LOG_WATCHING: - if not self.allow_log_errors: - self.begin_active_log_watch() - - def begin_active_log_watch(self): - """ - Calls into ccm to start actively watching logs. - - In the event that errors are seen in logs, ccm will call back to _log_error_handler. - - When the cluster is no longer in use, stop_active_log_watch should be called to end log watching. - (otherwise a 'daemon' thread will (needlessly) run until the process exits). - """ - # log watching happens in another thread, but we want it to halt the main - # thread's execution, which we have to do by registering a signal handler - signal.signal(signal.SIGINT, self._catch_interrupt) - self._log_watch_thread = self.cluster.actively_watch_logs_for_error(self._log_error_handler, interval=0.25) - - def _log_error_handler(self, errordata): - """ - Callback handler used in conjunction with begin_active_log_watch. - When called, prepares exception instance, then will indirectly - cause _catch_interrupt to be called, which can raise the exception in the main - program thread. - - @param errordata is a dictonary mapping node name to failure list. - """ - # in some cases self.allow_log_errors may get set after proactive log checking has been enabled - # so we need to double-check first thing before proceeding - if self.allow_log_errors: - return - - reportable_errordata = OrderedDict() - - for nodename, errors in errordata.items(): - filtered_errors = list(self.__filter_errors(['\n'.join(msg) for msg in errors])) - if len(filtered_errors) is not 0: - reportable_errordata[nodename] = filtered_errors + if 'load_balancing_policy' in kwargs: + return ExecutionProfile(retry_policy=retry_policy, + consistency_level=consistency_level, + **kwargs) + else: + return ExecutionProfile(retry_policy=retry_policy, + consistency_level=consistency_level, + load_balancing_policy=RoundRobinPolicy(), + **kwargs) - # no errors worthy of halting the test - if not reportable_errordata: - return - message = "Errors seen in logs for: {nodes}".format(nodes=", ".join(reportable_errordata.keys())) - for nodename, errors in reportable_errordata.items(): - for error in errors: - message += "\n{nodename}: {error}".format(nodename=nodename, error=error) +def running_in_docker(): + return os.path.isfile('/.dockerenv') - try: - debug('Errors were just seen in logs, ending test (if not ending already)!') - print_("Error details: \n{message}".format(message=message)) - self.test_is_ending # will raise AttributeError if not present - except AttributeError: - self.test_is_ending = True - self.exit_with_exception = AssertionError("Log error encountered during active log scanning, see stdout") - # thread.interrupt_main will SIGINT in the main thread, which we can - # catch to raise an exception with useful information - thread.interrupt_main() +def cleanup_docker_environment_before_test_execution(): + """ + perform a bunch of system cleanup operations, like kill any instances that might be + hanging around incorrectly from a previous run, sync the disk, and clear swap. + Ideally we would also drop the page cache, but as docker isn't running in privileged + mode there is no way for us to do this. """ - Finds files matching the glob pattern specified as argument on - the given keyspace in all nodes + # attempt to wack all existing running Cassandra processes forcefully to get us into a clean state + p_kill = subprocess.Popen('ps aux | grep -ie CassandraDaemon | grep java | awk \'{print $2}\' | xargs kill -9', + shell=True) + p_kill.wait(timeout=10) + + # explicitly call "sync" to flush everything that might be pending from a previous test + # so tests are less likely to hit a very slow fsync during the test by starting from a 'known' state + # note: to mitigate this further the docker image is mounting /tmp as a volume, which gives + # us an ext4 mount which should talk directly to the underlying device on the host, skipping + # the aufs pain that we get with anything else running in the docker image. Originally, + # I had a timeout of 120 seconds (2 minutes), 300 seconds (5 minutes) but sync was still occasionally timing out. + p_sync = subprocess.Popen('sudo /bin/sync', shell=True) + p_sync.wait(timeout=600) + + # turn swap off and back on to make sure it's fully cleared if anything happened to swap + # from a previous test run + p_swap = subprocess.Popen('sudo /sbin/swapoff -a && sudo /sbin/swapon -a', shell=True) + p_swap.wait(timeout=60) + + +def test_failure_due_to_timeout(err, *args): """ + check if we should rerun a test with the flaky plugin or not. + for now, only run if we failed the test for one of the following + three exceptions: cassandra.OperationTimedOut, ccm.node.ToolError, + and ccm.node.TimeoutError. + + - cassandra.OperationTimedOut will be thrown when a cql query made thru + the python-driver times out. + - ccm.node.ToolError will be thrown when an invocation of a "tool" + (in the case of dtests this will almost always invoking stress). + - ccm.node.TimeoutError will be thrown when a blocking ccm operation + on a individual node times out. In most cases this tends to be something + like watch_log_for hitting the timeout before the desired pattern is seen + in the node's logs. + + if we failed for one of these reasons - and we're running in docker - run + the same "cleanup" logic we run before test execution and test setup begins + and for good measure introduce a 2 second sleep. why 2 seconds? because it's + magic :) - ideally this gets the environment back into a good state and makes + the rerun of flaky tests likely to suceed if they failed in the first place + due to environmental issues. + """ + if issubclass(err[0], OperationTimedOut) or issubclass(err[0], ToolError) or issubclass(err[0], TimeoutError): + if running_in_docker(): + cleanup_docker_environment_before_test_execution() + time.sleep(2) + return True + else: + return False - def glob_data_dirs(self, path, ks="ks"): - result = [] - for node in self.cluster.nodelist(): - for data_dir in node.data_directories(): - ks_dir = os.path.join(data_dir, ks, path) - result.extend(glob.glob(ks_dir)) - return result - - def _catch_interrupt(self, signal, frame): - """ - Signal handler for registering on SIGINT. - - If called will look for a stored exception and raise it to abort test. - If a stored exception is not present, this handler has likely caught a - user interrupt via CTRL-C, and will raise a KeyboardInterrupt. - """ - try: - # check if we have a persisted exception to fail with - raise self.exit_with_exception - except AttributeError: - # looks like this was just a plain CTRL-C event - raise KeyboardInterrupt() - - def copy_logs(self, cluster, directory=None, name=None): - """Copy the current cluster's log files somewhere, by default to LOG_SAVED_DIR with a name of 'last'""" - if directory is None: - directory = LOG_SAVED_DIR - if name is None: - name = LAST_LOG - else: - name = os.path.join(directory, name) - if not os.path.exists(directory): - os.mkdir(directory) - logs = [(node.name, node.logfilename(), node.debuglogfilename(), node.gclogfilename(), node.compactionlogfilename()) - for node in self.cluster.nodes.values()] - if len(logs) is not 0: - basedir = str(int(time.time() * 1000)) + '_' + self.id() - logdir = os.path.join(directory, basedir) - os.mkdir(logdir) - for n, log, debuglog, gclog, compactionlog in logs: - if os.path.exists(log): - self.assertGreaterEqual(os.path.getsize(log), 0) - shutil.copyfile(log, os.path.join(logdir, n + ".log")) - if os.path.exists(debuglog): - self.assertGreaterEqual(os.path.getsize(debuglog), 0) - shutil.copyfile(debuglog, os.path.join(logdir, n + "_debug.log")) - if os.path.exists(gclog): - self.assertGreaterEqual(os.path.getsize(gclog), 0) - shutil.copyfile(gclog, os.path.join(logdir, n + "_gc.log")) - if os.path.exists(compactionlog): - self.assertGreaterEqual(os.path.getsize(compactionlog), 0) - shutil.copyfile(compactionlog, os.path.join(logdir, n + "_compaction.log")) - if os.path.exists(name): - os.unlink(name) - if not is_win(): - os.symlink(basedir, name) - - def cql_connection(self, node, keyspace=None, user=None, - password=None, compression=True, protocol_version=None, port=None, ssl_opts=None, **kwargs): - - return self._create_session(node, keyspace, user, password, compression, - protocol_version, port=port, ssl_opts=ssl_opts, **kwargs) - - def exclusive_cql_connection(self, node, keyspace=None, user=None, - password=None, compression=True, protocol_version=None, port=None, ssl_opts=None, **kwargs): - - node_ip = get_ip_from_node(node) - wlrr = WhiteListRoundRobinPolicy([node_ip]) - - return self._create_session(node, keyspace, user, password, compression, - protocol_version, port=port, ssl_opts=ssl_opts, load_balancing_policy=wlrr, **kwargs) - - def _create_session(self, node, keyspace, user, password, compression, protocol_version, - port=None, ssl_opts=None, execution_profiles=None, **kwargs): - node_ip = get_ip_from_node(node) - if not port: - port = get_port_from_node(node) - - if protocol_version is None: - protocol_version = get_eager_protocol_version(node.cluster.version()) - - if user is not None: - auth_provider = get_auth_provider(user=user, password=password) - else: - auth_provider = None - - profiles = {EXEC_PROFILE_DEFAULT: make_execution_profile(**kwargs) - } if not execution_profiles else execution_profiles - - cluster = PyCluster([node_ip], - auth_provider=auth_provider, - compression=compression, - protocol_version=protocol_version, - port=port, - ssl_options=ssl_opts, - connect_timeout=10, - allow_beta_protocol_version=True, - execution_profiles=profiles) - session = cluster.connect(wait_for_all_pools=True) - - if keyspace is not None: - session.set_keyspace(keyspace) - - self.connections.append(session) - return session - - def patient_cql_connection(self, node, keyspace=None, - user=None, password=None, timeout=30, compression=True, - protocol_version=None, port=None, ssl_opts=None, **kwargs): - """ - Returns a connection after it stops throwing NoHostAvailables due to not being ready. - - If the timeout is exceeded, the exception is raised. - """ - if is_win(): - timeout *= 2 - - expected_log_lines = ('Control connection failed to connect, shutting down Cluster:', '[control connection] Error connecting to ') - with log_filter('cassandra.cluster', expected_log_lines): - session = retry_till_success( - self.cql_connection, - node, - keyspace=keyspace, - user=user, - password=password, - timeout=timeout, - compression=compression, - protocol_version=protocol_version, - port=port, - ssl_opts=ssl_opts, - bypassed_exception=NoHostAvailable, - **kwargs - ) - - return session - - def patient_exclusive_cql_connection(self, node, keyspace=None, - user=None, password=None, timeout=30, compression=True, - protocol_version=None, port=None, ssl_opts=None, **kwargs): - """ - Returns a connection after it stops throwing NoHostAvailables due to not being ready. - - If the timeout is exceeded, the exception is raised. - """ - if is_win(): - timeout *= 2 - - return retry_till_success( - self.exclusive_cql_connection, - node, - keyspace=keyspace, - user=user, - password=password, - timeout=timeout, - compression=compression, - protocol_version=protocol_version, - port=port, - ssl_opts=ssl_opts, - bypassed_exception=NoHostAvailable, - **kwargs - ) - - @classmethod - def tearDownClass(cls): - reset_environment_vars() - if os.path.exists(LAST_TEST_DIR): - with open(LAST_TEST_DIR) as f: - test_path = f.readline().strip('\n') - name = f.readline() - try: - cluster = ClusterFactory.load(test_path, name) - # Avoid waiting too long for node to be marked down - if KEEP_TEST_DIR: - cluster.stop(gently=RECORD_COVERAGE) - else: - cluster.remove() - os.rmdir(test_path) - except IOError: - # after a restart, /tmp will be emptied so we'll get an IOError when loading the old cluster here - pass - try: - os.remove(LAST_TEST_DIR) - except IOError: - # Ignore - see comment above - pass - def tearDown(self): - # test_is_ending prevents active log watching from being able to interrupt the test - # which we don't want to happen once tearDown begins - self.test_is_ending = True +@flaky(rerun_filter=test_failure_due_to_timeout) +class Tester: - reset_environment_vars() + def __getattribute__(self, name): + try: + return object.__getattribute__(self, name) + except AttributeError: + fixture_dtest_setup = object.__getattribute__(self, 'fixture_dtest_setup') + return object.__getattribute__(fixture_dtest_setup , name) - for con in self.connections: - con.cluster.shutdown() + @pytest.fixture(scope='function', autouse=True) + def set_dtest_setup_on_function(self, fixture_dtest_setup, fixture_dtest_config): + self.fixture_dtest_setup = fixture_dtest_setup + self.dtest_config = fixture_dtest_config - for runner in self.runners: - try: - runner.stop() - except Exception: - pass + def set_node_to_current_version(self, node): + version = os.environ.get('CASSANDRA_VERSION') - failed = did_fail() - try: - if not self.allow_log_errors and self.check_logs_for_errors(): - failed = True - raise AssertionError('Unexpected error in log, see stdout') - finally: - try: - # save the logs for inspection - if failed or KEEP_LOGS: - self.copy_logs(self.cluster) - except Exception as e: - print "Error saving log:", str(e) - finally: - log_watch_thread = getattr(self, '_log_watch_thread', None) - cleanup_cluster(self.cluster, self.test_path, log_watch_thread) - - def check_logs_for_errors(self): - for node in self.cluster.nodelist(): - errors = list(self.__filter_errors( - ['\n'.join(msg) for msg in node.grep_log_for_errors()])) - if len(errors) is not 0: - for error in errors: - print_("Unexpected error in {node_name} log, error: \n{error}".format(node_name=node.name, error=error)) - return True + if version: + node.set_install_dir(version=version) + else: + node.set_install_dir(install_dir=self.dtest_config.cassandra_dir) + os.environ.set('CASSANDRA_DIR', self.dtest_config.cassandra_dir) def go(self, func): runner = Runner(func) @@ -617,57 +276,6 @@ class Tester(TestCase): runner.start() return runner - def skip(self, msg): - if not NO_SKIP: - raise SkipTest(msg) - - def __filter_errors(self, errors): - """Filter errors, removing those that match self.ignore_log_patterns""" - if not hasattr(self, 'ignore_log_patterns'): - self.ignore_log_patterns = [] - for e in errors: - for pattern in self.ignore_log_patterns: - if re.search(pattern, e): - break - else: - yield e - - # Disable docstrings printing in nosetest output - def shortDescription(self): - return None - - def get_jfr_jvm_args(self): - """ - @return The JVM arguments required for attaching flight recorder to a Java process. - """ - return ["-XX:+UnlockCommercialFeatures", "-XX:+FlightRecorder"] - - def start_jfr_recording(self, nodes): - """ - Start Java flight recorder provided the cluster was started with the correct jvm arguments. - """ - for node in nodes: - p = subprocess.Popen(['jcmd', str(node.pid), 'JFR.start'], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - stdout, stderr = p.communicate() - debug(stdout) - debug(stderr) - - def dump_jfr_recording(self, nodes): - """ - Save Java flight recorder results to file for analyzing with mission control. - """ - for node in nodes: - p = subprocess.Popen(['jcmd', str(node.pid), 'JFR.dump', - 'recording=1', 'filename=recording_{}.jfr'.format(node.address())], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - stdout, stderr = p.communicate() - debug(stdout) - debug(stderr) - - def get_eager_protocol_version(cassandra_version): """ Returns the highest protocol version accepted @@ -690,7 +298,7 @@ def create_cf(session, name, key_type="varchar", speculative_retry=None, read_re additional_columns = "" if columns is not None: - for k, v in columns.items(): + for k, v in list(columns.items()): additional_columns = "{}, {} {}".format(additional_columns, k, v) if additional_columns == "": @@ -714,20 +322,42 @@ def create_cf(session, name, key_type="varchar", speculative_retry=None, read_re if compact_storage: query += ' AND COMPACT STORAGE' - session.execute(query) - time.sleep(0.2) - + try: + retry_till_success(session.execute, query=query, timeout=120, bypassed_exception=cassandra.OperationTimedOut) + except cassandra.AlreadyExists: + logger.warn('AlreadyExists executing create cf query \'%s\'' % query) + session.cluster.control_connection.wait_for_schema_agreement(wait_time=120) + #Going to ignore OperationTimedOut from create CF, so need to validate it was indeed created + session.execute('SELECT * FROM %s LIMIT 1' % name); + +def create_cf_simple(session, name, query): + try: + retry_till_success(session.execute, query=query, timeout=120, bypassed_exception=cassandra.OperationTimedOut) + except cassandra.AlreadyExists: + logger.warn('AlreadyExists executing create cf query \'%s\'' % query) + session.cluster.control_connection.wait_for_schema_agreement(wait_time=120) + #Going to ignore OperationTimedOut from create CF, so need to validate it was indeed created + session.execute('SELECT * FROM %s LIMIT 1' % name) def create_ks(session, name, rf): query = 'CREATE KEYSPACE %s WITH replication={%s}' - if isinstance(rf, types.IntType): + if isinstance(rf, int): # we assume simpleStrategy - session.execute(query % (name, "'class':'SimpleStrategy', 'replication_factor':%d" % rf)) + query = query % (name, "'class':'SimpleStrategy', 'replication_factor':%d" % rf) else: - assert_greater_equal(len(rf), 0, "At least one datacenter/rf pair is needed") + assert len(rf) >= 0, "At least one datacenter/rf pair is needed" # we assume networkTopologyStrategy - options = (', ').join(['\'%s\':%d' % (d, r) for d, r in rf.iteritems()]) - session.execute(query % (name, "'class':'NetworkTopologyStrategy', %s" % options)) + options = (', ').join(['\'%s\':%d' % (d, r) for d, r in rf.items()]) + query = query % (name, "'class':'NetworkTopologyStrategy', %s" % options) + + try: + retry_till_success(session.execute, query=query, timeout=120, bypassed_exception=cassandra.OperationTimedOut) + except cassandra.AlreadyExists: + logger.warn('AlreadyExists executing create ks query \'%s\'' % query) + + session.cluster.control_connection.wait_for_schema_agreement(wait_time=120) + #Also validates it was indeed created even though we ignored OperationTimedOut + #Might happen some of the time because CircleCI disk IO is unreliable and hangs randomly session.execute('USE {}'.format(name)) @@ -774,301 +404,13 @@ def kill_windows_cassandra_procs(): pass else: if (pinfo['name'] == 'java.exe' and '-Dcassandra' in pinfo['cmdline']): - print 'Found running cassandra process with pid: ' + str(pinfo['pid']) + '. Killing.' + print('Found running cassandra process with pid: ' + str(pinfo['pid']) + '. Killing.') psutil.Process(pinfo['pid']).kill() except ImportError: - debug("WARN: psutil not installed. Cannot detect and kill " + logger.debug("WARN: psutil not installed. Cannot detect and kill " "running cassandra processes - you may see cascading dtest failures.") -def get_test_path(): - test_path = tempfile.mkdtemp(prefix='dtest-') - - # ccm on cygwin needs absolute path to directory - it crosses from cygwin space into - # regular Windows space on wmic calls which will otherwise break pathing - if sys.platform == "cygwin": - process = subprocess.Popen(["cygpath", "-m", test_path], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - test_path = process.communicate()[0].rstrip() - - return test_path - - -# nose will discover this as a test, so we manually make it not a test -get_test_path.__test__ = False - - -def create_ccm_cluster(test_path, name): - debug("cluster ccm directory: " + test_path) - version = os.environ.get('CASSANDRA_VERSION') - cdir = CASSANDRA_DIR - - if version: - cluster = Cluster(test_path, name, cassandra_version=version) - else: - cluster = Cluster(test_path, name, cassandra_dir=cdir) - - if DISABLE_VNODES: - cluster.set_configuration_options(values={'num_tokens': None}) - else: - cluster.set_configuration_options(values={'initial_token': None, 'num_tokens': NUM_TOKENS}) - - if OFFHEAP_MEMTABLES: - cluster.set_configuration_options(values={'memtable_allocation_type': 'offheap_objects'}) - - cluster.set_datadir_count(DATADIR_COUNT) - cluster.set_environment_variable('CASSANDRA_LIBJEMALLOC', CASSANDRA_LIBJEMALLOC) - - return cluster - - -def cleanup_cluster(cluster, test_path, log_watch_thread=None): - with log_filter('cassandra'): # quiet noise from driver when nodes start going down - if KEEP_TEST_DIR: - cluster.stop(gently=RECORD_COVERAGE) - else: - # when recording coverage the jvm has to exit normally - # or the coverage information is not written by the jacoco agent - # otherwise we can just kill the process - if RECORD_COVERAGE: - cluster.stop(gently=True) - - # Cleanup everything: - try: - if log_watch_thread: - stop_active_log_watch(log_watch_thread) - finally: - debug("removing ccm cluster {name} at: {path}".format(name=cluster.name, path=test_path)) - cluster.remove() - - debug("clearing ssl stores from [{0}] directory".format(test_path)) - for filename in ('keystore.jks', 'truststore.jks', 'ccm_node.cer'): - try: - os.remove(os.path.join(test_path, filename)) - except OSError as e: - # once we port to py3, which has better reporting for exceptions raised while - # handling other excpetions, we should just assert e.errno == errno.ENOENT - if e.errno != errno.ENOENT: # ENOENT = no such file or directory - raise - - os.rmdir(test_path) - cleanup_last_test_dir() - - -def cleanup_last_test_dir(): - if os.path.exists(LAST_TEST_DIR): - os.remove(LAST_TEST_DIR) - - -def stop_active_log_watch(log_watch_thread): - """ - Joins the log watching thread, which will then exit. - Should be called after each test, ideally after nodes are stopped but before cluster files are removed. - - Can be called multiple times without error. - If not called, log watching thread will remain running until the parent process exits. - """ - log_watch_thread.join(timeout=60) - - -def maybe_cleanup_cluster_from_last_test_file(): - # cleaning up if a previous execution didn't trigger tearDown (which - # can happen if it is interrupted by KeyboardInterrupt) - if os.path.exists(LAST_TEST_DIR): - with open(LAST_TEST_DIR) as f: - test_path = f.readline().strip('\n') - name = f.readline() - try: - cluster = ClusterFactory.load(test_path, name) - # Avoid waiting too long for node to be marked down - cleanup_cluster(cluster, test_path) - except IOError: - # after a restart, /tmp will be emptied so we'll get an IOError when loading the old cluster here - pass - - -def init_default_config(cluster, cluster_options): - # the failure detector can be quite slow in such tests with quick start/stop - phi_values = {'phi_convict_threshold': 5} - - timeout = 10000 - if cluster_options is not None: - values = merge_dicts(cluster_options, phi_values) - else: - values = merge_dicts(phi_values, { - 'read_request_timeout_in_ms': timeout, - 'range_request_timeout_in_ms': timeout, - 'write_request_timeout_in_ms': timeout, - 'truncate_request_timeout_in_ms': timeout, - 'request_timeout_in_ms': timeout - }) - - # No more thrift in 4.0, and start_rpc doesn't exists anymore - if cluster.version() >= '4' and 'start_rpc' in values: - del values['start_rpc'] - - cluster.set_configuration_options(values) - debug("Done setting configuration options:\n" + pprint.pformat(cluster._config_options, indent=4)) - - -def write_last_test_file(test_path, cluster): - with open(LAST_TEST_DIR, 'w') as f: - f.write(test_path + '\n') - f.write(cluster.name) - - -def set_log_levels(cluster): - if DEBUG: - cluster.set_log_level("DEBUG") - if TRACE: - cluster.set_log_level("TRACE") - - if os.environ.get('DEBUG', 'no').lower() not in ('no', 'false', 'yes', 'true'): - classes_to_debug = os.environ.get('DEBUG').split(":") - cluster.set_log_level('DEBUG', None if len(classes_to_debug) == 0 else classes_to_debug) - - if os.environ.get('TRACE', 'no').lower() not in ('no', 'false', 'yes', 'true'): - classes_to_trace = os.environ.get('TRACE').split(":") - cluster.set_log_level('TRACE', None if len(classes_to_trace) == 0 else classes_to_trace) - - -def maybe_setup_jacoco(test_path, cluster_name='test'): - """Setup JaCoCo code coverage support""" - - if not RECORD_COVERAGE: - return - - # use explicit agent and execfile locations - # or look for a cassandra build if they are not specified - cdir = CASSANDRA_DIR - - agent_location = os.environ.get('JACOCO_AGENT_JAR', os.path.join(cdir, 'build/lib/jars/jacocoagent.jar')) - jacoco_execfile = os.environ.get('JACOCO_EXECFILE', os.path.join(cdir, 'build/jacoco/jacoco.exec')) - - if os.path.isfile(agent_location): - debug("Jacoco agent found at {}".format(agent_location)) - with open(os.path.join( - test_path, cluster_name, 'cassandra.in.sh'), 'w') as f: - - f.write('JVM_OPTS="$JVM_OPTS -javaagent:{jar_path}=destfile={exec_file}"' - .format(jar_path=agent_location, exec_file=jacoco_execfile)) - - if os.path.isfile(jacoco_execfile): - debug("Jacoco execfile found at {}, execution data will be appended".format(jacoco_execfile)) - else: - debug("Jacoco execfile will be created at {}".format(jacoco_execfile)) - else: - debug("Jacoco agent not found or is not file. Execution will not be recorded.") - - -def did_fail(): - if sys.exc_info() == (None, None, None): - return False - - exc_class, _, _ = sys.exc_info() - return not issubclass(exc_class, unittest.case.SkipTest) - - -class ReusableClusterTester(Tester): - """ - A Tester designed for reusing the same cluster across multiple - test methods. This makes test suites with many small tests run - much, much faster. However, there are a couple of downsides: - - First, test setup and teardown must be diligent about cleaning - up any data or schema elements that may interfere with other - tests. - - Second, errors triggered by one test method may cascade - into other test failures. In an attempt to limit this, the - cluster will be restarted if a test fails or an exception is - caught. However, there may still be undetected problems in - Cassandra that cause cascading failures. - """ - - test_path = None - cluster = None - cluster_options = None - - @classmethod - def setUpClass(cls): - kill_windows_cassandra_procs() - maybe_cleanup_cluster_from_last_test_file() - cls.initialize_cluster() - - def setUp(self): - self.set_current_tst_name() - self.connections = [] - - # TODO enable active log watching - # This needs to happen in setUp() and not setUpClass() so that individual - # test methods can set allow_log_errors and so that error handling - # only fails a single test method instead of the entire class. - # The problem with this is that ccm doesn't yet support stopping the - # active log watcher -- it runs until the cluster is destroyed. Since - # we reuse the same cluster, this doesn't work for us. - - def tearDown(self): - # test_is_ending prevents active log watching from being able to interrupt the test - self.test_is_ending = True - - failed = did_fail() - try: - if not self.allow_log_errors and self.check_logs_for_errors(): - failed = True - raise AssertionError('Unexpected error in log, see stdout') - finally: - try: - # save the logs for inspection - if failed or KEEP_LOGS: - self.copy_logs(self.cluster) - except Exception as e: - print "Error saving log:", str(e) - finally: - reset_environment_vars() - if failed: - cleanup_cluster(self.cluster, self.test_path) - kill_windows_cassandra_procs() - self.initialize_cluster() - - @classmethod - def initialize_cluster(cls): - """ - This method is responsible for initializing and configuring a ccm - cluster for the next set of tests. This can be called for two - different reasons: - * A class of tests is starting - * A test method failed/errored, so the cluster has been wiped - - Subclasses that require custom initialization should generally - do so by overriding post_initialize_cluster(). - """ - cls.test_path = get_test_path() - cls.cluster = create_ccm_cluster(cls.test_path, name='test') - cls.init_config() - - maybe_setup_jacoco(cls.test_path) - cls.init_config() - write_last_test_file(cls.test_path, cls.cluster) - set_log_levels(cls.cluster) - - cls.post_initialize_cluster() - - @classmethod - def post_initialize_cluster(cls): - """ - This method is called after the ccm cluster has been created - and default config options have been applied. Any custom - initialization for a test class should generally be done - here in order to correctly handle cluster restarts after - test method failures. - """ - pass - - @classmethod - def init_config(cls): - init_default_config(cls.cluster, cls.cluster_options) - - class MultiError(Exception): """ Extends Exception to provide reporting multiple exceptions at once. @@ -1109,24 +451,20 @@ def run_scenarios(scenarios, handler, deferred_exceptions=tuple()): tracebacks = [] for i, scenario in enumerate(scenarios, 1): - debug("running scenario {}/{}: {}".format(i, len(scenarios), scenario)) + logger.debug("running scenario {}/{}: {}".format(i, len(scenarios), scenario)) try: handler(scenario) except deferred_exceptions as e: tracebacks.append(traceback.format_exc(sys.exc_info())) - errors.append(type(e)('encountered {} {} running scenario:\n {}\n'.format(e.__class__.__name__, e.message, scenario))) - debug("scenario {}/{} encountered a deferrable exception, continuing".format(i, len(scenarios))) + errors.append(type(e)('encountered {} {} running scenario:\n {}\n'.format(e.__class__.__name__, str(e), scenario))) + logger.debug("scenario {}/{} encountered a deferrable exception, continuing".format(i, len(scenarios))) except Exception as e: # catch-all for any exceptions not intended to be deferred tracebacks.append(traceback.format_exc(sys.exc_info())) - errors.append(type(e)('encountered {} {} running scenario:\n {}\n'.format(e.__class__.__name__, e.message, scenario))) - debug("scenario {}/{} encountered a non-deferrable exception, aborting".format(i, len(scenarios))) + errors.append(type(e)('encountered {} {} running scenario:\n {}\n'.format(e.__class__.__name__, str(e), scenario))) + logger.debug("scenario {}/{} encountered a non-deferrable exception, aborting".format(i, len(scenarios))) raise MultiError(errors, tracebacks) if errors: raise MultiError(errors, tracebacks) - - -def supports_v5_protocol(cluster_version): - return cluster_version >= LooseVersion('4.0')
http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/dtest_setup.py ---------------------------------------------------------------------- diff --git a/dtest_setup.py b/dtest_setup.py new file mode 100644 index 0000000..87014f4 --- /dev/null +++ b/dtest_setup.py @@ -0,0 +1,498 @@ +import pytest +import glob +import os +import shutil +import time +import logging +import re +import tempfile +import subprocess +import sys +import errno +import pprint +from collections import OrderedDict + +from cassandra.cluster import Cluster as PyCluster +from cassandra.cluster import NoHostAvailable +from cassandra.cluster import EXEC_PROFILE_DEFAULT +from cassandra.policies import WhiteListRoundRobinPolicy +from ccmlib.common import get_version_from_build, is_win +from ccmlib.cluster import Cluster + +from dtest import (get_ip_from_node, make_execution_profile, get_auth_provider, get_port_from_node, + get_eager_protocol_version) +from distutils.version import LooseVersion + +from tools.context import log_filter +from tools.funcutils import merge_dicts + +logger = logging.getLogger(__name__) + + +def retry_till_success(fun, *args, **kwargs): + timeout = kwargs.pop('timeout', 60) + bypassed_exception = kwargs.pop('bypassed_exception', Exception) + + deadline = time.time() + timeout + while True: + try: + return fun(*args, **kwargs) + except bypassed_exception: + if time.time() > deadline: + raise + else: + # brief pause before next attempt + time.sleep(0.25) + + +class DTestSetup: + def __init__(self, dtest_config=None, setup_overrides=None): + self.dtest_config = dtest_config + self.setup_overrides = setup_overrides + self.ignore_log_patterns = [] + self.cluster = None + self.cluster_options = [] + self.replacement_node = None + self.allow_log_errors = False + self.connections = [] + + self.log_saved_dir = "logs" + try: + os.mkdir(self.log_saved_dir) + except OSError: + pass + + self.last_log = os.path.join(self.log_saved_dir, "last") + self.test_path = self.get_test_path() + self.enable_for_jolokia = False + self.subprocs = [] + self.log_watch_thread = None + self.last_test_dir = "last_test_dir" + self.jvm_args = [] + + def get_test_path(self): + test_path = tempfile.mkdtemp(prefix='dtest-') + + # ccm on cygwin needs absolute path to directory - it crosses from cygwin space into + # regular Windows space on wmic calls which will otherwise break pathing + if sys.platform == "cygwin": + process = subprocess.Popen(["cygpath", "-m", test_path], stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + test_path = process.communicate()[0].rstrip() + + return test_path + + def glob_data_dirs(self, path, ks="ks"): + result = [] + for node in self.cluster.nodelist(): + for data_dir in node.data_directories(): + ks_dir = os.path.join(data_dir, ks, path) + result.extend(glob.glob(ks_dir)) + return result + + def begin_active_log_watch(self): + """ + Calls into ccm to start actively watching logs. + + In the event that errors are seen in logs, ccm will call back to _log_error_handler. + + When the cluster is no longer in use, stop_active_log_watch should be called to end log watching. + (otherwise a 'daemon' thread will (needlessly) run until the process exits). + """ + self._log_watch_thread = self.cluster.actively_watch_logs_for_error(self._log_error_handler, interval=0.25) + + def _log_error_handler(self, errordata): + """ + Callback handler used in conjunction with begin_active_log_watch. + When called, prepares exception instance, we will use pytest.fail + to kill the current test being executed and mark it as failed + + @param errordata is a dictonary mapping node name to failure list. + """ + # in some cases self.allow_log_errors may get set after proactive log checking has been enabled + # so we need to double-check first thing before proceeding + if self.allow_log_errors: + return + + reportable_errordata = OrderedDict() + + for nodename, errors in list(errordata.items()): + filtered_errors = list(self.__filter_errors(['\n'.join(msg) for msg in errors])) + if len(filtered_errors) is not 0: + reportable_errordata[nodename] = filtered_errors + + # no errors worthy of halting the test + if not reportable_errordata: + return + + message = "Errors seen in logs for: {nodes}".format(nodes=", ".join(list(reportable_errordata.keys()))) + for nodename, errors in list(reportable_errordata.items()): + for error in errors: + message += "\n{nodename}: {error}".format(nodename=nodename, error=error) + + logger.debug('Errors were just seen in logs, ending test (if not ending already)!') + pytest.fail("Error details: \n{message}".format(message=message)) + + def copy_logs(self, directory=None, name=None): + """Copy the current cluster's log files somewhere, by default to LOG_SAVED_DIR with a name of 'last'""" + if directory is None: + directory = self.log_saved_dir + if name is None: + name = self.last_log + else: + name = os.path.join(directory, name) + if not os.path.exists(directory): + os.mkdir(directory) + logs = [(node.name, node.logfilename(), node.debuglogfilename(), node.gclogfilename(), + node.compactionlogfilename()) + for node in list(self.cluster.nodes.values())] + if len(logs) is not 0: + basedir = str(int(time.time() * 1000)) + '_' + str(id(self)) + logdir = os.path.join(directory, basedir) + os.mkdir(logdir) + for n, log, debuglog, gclog, compactionlog in logs: + if os.path.exists(log): + assert os.path.getsize(log) >= 0 + shutil.copyfile(log, os.path.join(logdir, n + ".log")) + if os.path.exists(debuglog): + assert os.path.getsize(debuglog) >= 0 + shutil.copyfile(debuglog, os.path.join(logdir, n + "_debug.log")) + if os.path.exists(gclog): + assert os.path.getsize(gclog) >= 0 + shutil.copyfile(gclog, os.path.join(logdir, n + "_gc.log")) + if os.path.exists(compactionlog): + assert os.path.getsize(compactionlog) >= 0 + shutil.copyfile(compactionlog, os.path.join(logdir, n + "_compaction.log")) + if os.path.exists(name): + os.unlink(name) + if not is_win(): + os.symlink(basedir, name) + + def cql_connection(self, node, keyspace=None, user=None, + password=None, compression=True, protocol_version=None, port=None, ssl_opts=None, **kwargs): + + return self._create_session(node, keyspace, user, password, compression, + protocol_version, port=port, ssl_opts=ssl_opts, **kwargs) + + def exclusive_cql_connection(self, node, keyspace=None, user=None, + password=None, compression=True, protocol_version=None, port=None, ssl_opts=None, + **kwargs): + + node_ip = get_ip_from_node(node) + wlrr = WhiteListRoundRobinPolicy([node_ip]) + + return self._create_session(node, keyspace, user, password, compression, + protocol_version, port=port, ssl_opts=ssl_opts, load_balancing_policy=wlrr, + **kwargs) + + def _create_session(self, node, keyspace, user, password, compression, protocol_version, + port=None, ssl_opts=None, execution_profiles=None, **kwargs): + node_ip = get_ip_from_node(node) + if not port: + port = get_port_from_node(node) + + if protocol_version is None: + protocol_version = get_eager_protocol_version(node.cluster.version()) + + if user is not None: + auth_provider = get_auth_provider(user=user, password=password) + else: + auth_provider = None + + profiles = {EXEC_PROFILE_DEFAULT: make_execution_profile(**kwargs) + } if not execution_profiles else execution_profiles + + cluster = PyCluster([node_ip], + auth_provider=auth_provider, + compression=compression, + protocol_version=protocol_version, + port=port, + ssl_options=ssl_opts, + connect_timeout=15, + allow_beta_protocol_version=True, + execution_profiles=profiles) + session = cluster.connect(wait_for_all_pools=True) + + if keyspace is not None: + session.set_keyspace(keyspace) + + self.connections.append(session) + return session + + def patient_cql_connection(self, node, keyspace=None, + user=None, password=None, timeout=30, compression=True, + protocol_version=None, port=None, ssl_opts=None, **kwargs): + """ + Returns a connection after it stops throwing NoHostAvailables due to not being ready. + + If the timeout is exceeded, the exception is raised. + """ + if is_win(): + timeout *= 2 + + expected_log_lines = ('Control connection failed to connect, shutting down Cluster:', + '[control connection] Error connecting to ') + with log_filter('cassandra.cluster', expected_log_lines): + session = retry_till_success( + self.cql_connection, + node, + keyspace=keyspace, + user=user, + password=password, + timeout=timeout, + compression=compression, + protocol_version=protocol_version, + port=port, + ssl_opts=ssl_opts, + bypassed_exception=NoHostAvailable, + **kwargs + ) + + return session + + def patient_exclusive_cql_connection(self, node, keyspace=None, + user=None, password=None, timeout=30, compression=True, + protocol_version=None, port=None, ssl_opts=None, **kwargs): + """ + Returns a connection after it stops throwing NoHostAvailables due to not being ready. + + If the timeout is exceeded, the exception is raised. + """ + if is_win(): + timeout *= 2 + + return retry_till_success( + self.exclusive_cql_connection, + node, + keyspace=keyspace, + user=user, + password=password, + timeout=timeout, + compression=compression, + protocol_version=protocol_version, + port=port, + ssl_opts=ssl_opts, + bypassed_exception=NoHostAvailable, + **kwargs + ) + + def check_logs_for_errors(self): + for node in self.cluster.nodelist(): + errors = list(self.__filter_errors( + ['\n'.join(msg) for msg in node.grep_log_for_errors()])) + if len(errors) is not 0: + for error in errors: + print("Unexpected error in {node_name} log, error: \n{error}".format(node_name=node.name, error=error)) + return True + + def __filter_errors(self, errors): + """Filter errors, removing those that match self.ignore_log_patterns""" + if not hasattr(self, 'ignore_log_patterns'): + self.ignore_log_patterns = [] + for e in errors: + for pattern in self.ignore_log_patterns: + if re.search(pattern, e): + break + else: + yield e + + def get_jfr_jvm_args(self): + """ + @return The JVM arguments required for attaching flight recorder to a Java process. + """ + return ["-XX:+UnlockCommercialFeatures", "-XX:+FlightRecorder"] + + def start_jfr_recording(self, nodes): + """ + Start Java flight recorder provided the cluster was started with the correct jvm arguments. + """ + for node in nodes: + p = subprocess.Popen(['jcmd', str(node.pid), 'JFR.start'], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + stdout, stderr = p.communicate() + logger.debug(stdout) + logger.debug(stderr) + + def dump_jfr_recording(self, nodes): + """ + Save Java flight recorder results to file for analyzing with mission control. + """ + for node in nodes: + p = subprocess.Popen(['jcmd', str(node.pid), 'JFR.dump', + 'recording=1', 'filename=recording_{}.jfr'.format(node.address())], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + stdout, stderr = p.communicate() + logger.debug(stdout) + logger.debug(stderr) + + def supports_v5_protocol(self, cluster_version): + return cluster_version >= LooseVersion('4.0') + + def cleanup_last_test_dir(self): + if os.path.exists(self.last_test_dir): + os.remove(self.last_test_dir) + + def stop_active_log_watch(self): + """ + Joins the log watching thread, which will then exit. + Should be called after each test, ideally after nodes are stopped but before cluster files are removed. + + Can be called multiple times without error. + If not called, log watching thread will remain running until the parent process exits. + """ + self.log_watch_thread.join(timeout=60) + + def cleanup_cluster(self): + with log_filter('cassandra'): # quiet noise from driver when nodes start going down + if self.dtest_config.keep_test_dir: + self.cluster.stop(gently=self.dtest_config.enable_jacoco_code_coverage) + else: + # when recording coverage the jvm has to exit normally + # or the coverage information is not written by the jacoco agent + # otherwise we can just kill the process + if self.dtest_config.enable_jacoco_code_coverage: + self.cluster.stop(gently=True) + + # Cleanup everything: + try: + if self.log_watch_thread: + self.stop_active_log_watch() + finally: + logger.debug("removing ccm cluster {name} at: {path}".format(name=self.cluster.name, + path=self.test_path)) + self.cluster.remove() + + logger.debug("clearing ssl stores from [{0}] directory".format(self.test_path)) + for filename in ('keystore.jks', 'truststore.jks', 'ccm_node.cer'): + try: + os.remove(os.path.join(self.test_path, filename)) + except OSError as e: + # ENOENT = no such file or directory + assert e.errno == errno.ENOENT + + os.rmdir(self.test_path) + self.cleanup_last_test_dir() + + def cleanup_and_replace_cluster(self): + for con in self.connections: + con.cluster.shutdown() + self.connections = [] + + self.cleanup_cluster() + self.test_path = self.get_test_path() + self.initialize_cluster() + + def init_default_config(self): + # the failure detector can be quite slow in such tests with quick start/stop + phi_values = {'phi_convict_threshold': 5} + + timeout = 15000 + if self.cluster_options is not None and len(self.cluster_options) > 0: + values = merge_dicts(self.cluster_options, phi_values) + else: + values = merge_dicts(phi_values, { + 'read_request_timeout_in_ms': timeout, + 'range_request_timeout_in_ms': timeout, + 'write_request_timeout_in_ms': timeout, + 'truncate_request_timeout_in_ms': timeout, + 'request_timeout_in_ms': timeout + }) + + if self.setup_overrides is not None and len(self.setup_overrides.cluster_options) > 0: + values = merge_dicts(values, self.setup_overrides.cluster_options) + + # No more thrift in 4.0, and start_rpc doesn't exists anymore + if self.cluster.version() >= '4' and 'start_rpc' in values: + del values['start_rpc'] + + self.cluster.set_configuration_options(values) + logger.debug("Done setting configuration options:\n" + pprint.pformat(self.cluster._config_options, indent=4)) + + def maybe_setup_jacoco(self, cluster_name='test'): + """Setup JaCoCo code coverage support""" + + if not self.dtest_config.enable_jacoco_code_coverage: + return + + # use explicit agent and execfile locations + # or look for a cassandra build if they are not specified + agent_location = os.environ.get('JACOCO_AGENT_JAR', + os.path.join(self.dtest_config.cassandra_dir, 'build/lib/jars/jacocoagent.jar')) + jacoco_execfile = os.environ.get('JACOCO_EXECFILE', + os.path.join(self.dtest_config.cassandra_dir, 'build/jacoco/jacoco.exec')) + + if os.path.isfile(agent_location): + logger.debug("Jacoco agent found at {}".format(agent_location)) + with open(os.path.join( + self.test_path, cluster_name, 'cassandra.in.sh'), 'w') as f: + + f.write('JVM_OPTS="$JVM_OPTS -javaagent:{jar_path}=destfile={exec_file}"' + .format(jar_path=agent_location, exec_file=jacoco_execfile)) + + if os.path.isfile(jacoco_execfile): + logger.debug("Jacoco execfile found at {}, execution data will be appended".format(jacoco_execfile)) + else: + logger.debug("Jacoco execfile will be created at {}".format(jacoco_execfile)) + else: + logger.debug("Jacoco agent not found or is not file. Execution will not be recorded.") + + def create_ccm_cluster(self, name): + logger.debug("cluster ccm directory: " + self.test_path) + version = self.dtest_config.cassandra_version + + if version: + cluster = Cluster(self.test_path, name, cassandra_version=version) + else: + cluster = Cluster(self.test_path, name, cassandra_dir=self.dtest_config.cassandra_dir) + + if self.dtest_config.use_vnodes: + cluster.set_configuration_options(values={'initial_token': None, 'num_tokens': self.dtest_config.num_tokens}) + else: + cluster.set_configuration_options(values={'num_tokens': None}) + + if self.dtest_config.use_off_heap_memtables: + cluster.set_configuration_options(values={'memtable_allocation_type': 'offheap_objects'}) + + cluster.set_datadir_count(self.dtest_config.data_dir_count) + cluster.set_environment_variable('CASSANDRA_LIBJEMALLOC', self.dtest_config.jemalloc_path) + + return cluster + + def set_cluster_log_levels(self): + """ + The root logger gets configured in the fixture named fixture_logging_setup. + Based on the logging configuration options the user invoked pytest with, + that fixture sets the root logger to that configuration. We then ensure all + Cluster objects we work with "inherit" these logging settings (which we can + lookup off the root logger) + """ + if logging.root.level != 'NOTSET': + log_level = logging.getLevelName(logging.INFO) + else: + log_level = logging.root.level + self.cluster.set_log_level(log_level) + + def initialize_cluster(self): + """ + This method is responsible for initializing and configuring a ccm + cluster for the next set of tests. This can be called for two + different reasons: + * A class of tests is starting + * A test method failed/errored, so the cluster has been wiped + + Subclasses that require custom initialization should generally + do so by overriding post_initialize_cluster(). + """ + # connections = [] + # cluster_options = [] + self.cluster = self.create_ccm_cluster(name='test') + self.init_default_config() + self.maybe_setup_jacoco() + self.set_cluster_log_levels() + + # cls.init_config() + # write_last_test_file(cls.test_path, cls.cluster) + + # cls.post_initialize_cluster() http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/dtest_setup_overrides.py ---------------------------------------------------------------------- diff --git a/dtest_setup_overrides.py b/dtest_setup_overrides.py new file mode 100644 index 0000000..6ea3258 --- /dev/null +++ b/dtest_setup_overrides.py @@ -0,0 +1,3 @@ +class DTestSetupOverrides: + def __init__(self): + self.cluster_options = [] \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/global_row_key_cache_test.py ---------------------------------------------------------------------- diff --git a/global_row_key_cache_test.py b/global_row_key_cache_test.py index c16f793..74c67f4 100644 --- a/global_row_key_cache_test.py +++ b/global_row_key_cache_test.py @@ -1,13 +1,16 @@ import time +import logging from cassandra.concurrent import execute_concurrent_with_args -from dtest import Tester, debug, create_ks +from dtest import Tester, create_ks, create_cf_simple + +logger = logging.getLogger(__name__) class TestGlobalRowKeyCache(Tester): - def functional_test(self): + def test_functional(self): cluster = self.cluster cluster.populate(3) node1 = cluster.nodelist()[0] @@ -15,7 +18,7 @@ class TestGlobalRowKeyCache(Tester): for keycache_size in (0, 10): for rowcache_size in (0, 10): cluster.stop() - debug("Testing with keycache size of %d MB, rowcache size of %d MB " % + logger.debug("Testing with keycache size of %d MB, rowcache size of %d MB " % (keycache_size, rowcache_size)) keyspace_name = 'ks_%d_%d' % (keycache_size, rowcache_size) @@ -29,14 +32,15 @@ class TestGlobalRowKeyCache(Tester): cluster.start() session = self.patient_cql_connection(node1) - create_ks(session, keyspace_name, rf=3) session.set_keyspace(keyspace_name) - session.execute("CREATE TABLE test (k int PRIMARY KEY, v1 int, v2 int)") - session.execute("CREATE TABLE test_clustering (k int, v1 int, v2 int, PRIMARY KEY (k, v1))") - session.execute("CREATE TABLE test_counter (k int PRIMARY KEY, v1 counter)") - session.execute("CREATE TABLE test_counter_clustering (k int, v1 int, v2 counter, PRIMARY KEY (k, v1))") + create_cf_simple(session, 'test', "CREATE TABLE test (k int PRIMARY KEY, v1 int, v2 int)") + create_cf_simple(session, 'test_clustering', + "CREATE TABLE test_clustering (k int, v1 int, v2 int, PRIMARY KEY (k, v1))") + create_cf_simple(session, 'test_counter', "CREATE TABLE test_counter (k int PRIMARY KEY, v1 counter)") + create_cf_simple(session, 'test_counter_clustering', + "CREATE TABLE test_counter_clustering (k int, v1 int, v2 counter, PRIMARY KEY (k, v1))") # insert 100 rows into each table for cf in ('test', 'test_clustering'): @@ -87,12 +91,12 @@ class TestGlobalRowKeyCache(Tester): session.shutdown() # let the data be written to the row/key caches. - debug("Letting caches be saved to disk") + logger.debug("Letting caches be saved to disk") time.sleep(10) - debug("Stopping cluster") + logger.debug("Stopping cluster") cluster.stop() time.sleep(1) - debug("Starting cluster") + logger.debug("Starting cluster") cluster.start() time.sleep(5) # read the data back from row and key caches @@ -108,38 +112,38 @@ class TestGlobalRowKeyCache(Tester): rows = list(session.execute("SELECT * FROM %s" % (cf,))) # one row gets deleted each validation round - self.assertEquals(100 - (validation_round + 1), len(rows)) + assert 100 - (validation_round + 1) == len(rows) # adjust enumeration start to account for row deletions for i, row in enumerate(sorted(rows), start=(validation_round + 1)): - self.assertEquals(i, row.k) - self.assertEquals(i, row.v1) + assert i == row.k + assert i == row.v1 # updated rows will have different values expected_value = validation_round if i < num_updates else i - self.assertEquals(expected_value, row.v2) + assert expected_value == row.v2 # check values of counter tables rows = list(session.execute("SELECT * FROM test_counter")) - self.assertEquals(100, len(rows)) + assert 100 == len(rows) for i, row in enumerate(sorted(rows)): - self.assertEquals(i, row.k) + assert i == row.k # updated rows will get incremented once each round expected_value = i if i < num_updates: expected_value += validation_round + 1 - self.assertEquals(expected_value, row.v1) + assert expected_value == row.v1 rows = list(session.execute("SELECT * FROM test_counter_clustering")) - self.assertEquals(100, len(rows)) + assert 100 == len(rows) for i, row in enumerate(sorted(rows)): - self.assertEquals(i, row.k) - self.assertEquals(i, row.v1) + assert i == row.k + assert i == row.v1 expected_value = i if i < num_updates: expected_value += validation_round + 1 - self.assertEquals(expected_value, row.v2) + assert expected_value == row.v2 http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/hintedhandoff_test.py ---------------------------------------------------------------------- diff --git a/hintedhandoff_test.py b/hintedhandoff_test.py index 6345e3c..68d341e 100644 --- a/hintedhandoff_test.py +++ b/hintedhandoff_test.py @@ -1,11 +1,15 @@ import os import time +import pytest +import logging from cassandra import ConsistencyLevel -from dtest import DISABLE_VNODES, Tester, create_ks +from dtest import Tester, create_ks from tools.data import create_c1c2_table, insert_c1c2, query_c1c2 -from tools.decorators import no_vnodes, since + +since = pytest.mark.since +logger = logging.getLogger(__name__) @since('3.0') @@ -26,7 +30,7 @@ class TestHintedHandoffConfig(Tester): if config_options: cluster.set_configuration_options(values=config_options) - if DISABLE_VNODES: + if not self.dtest_config.use_vnodes: cluster.populate([2]).start() else: tokens = cluster.balanced_tokens(2) @@ -39,7 +43,7 @@ class TestHintedHandoffConfig(Tester): Launch a nodetool command and check there is no error, return the result """ out, err, _ = node.nodetool(cmd) - self.assertEqual('', err) + assert '' == err return out def _do_hinted_handoff(self, node1, node2, enabled, keyspace='ks'): @@ -65,13 +69,13 @@ class TestHintedHandoffConfig(Tester): # Check node2 for all the keys that should have been delivered via HH if enabled or not if not enabled session = self.patient_exclusive_cql_connection(node2, keyspace=keyspace) - for n in xrange(0, 100): + for n in range(0, 100): if enabled: query_c1c2(session, n, ConsistencyLevel.ONE) else: query_c1c2(session, n, ConsistencyLevel.ONE, tolerate_missing=True, must_be_missing=True) - def nodetool_test(self): + def test_nodetool(self): """ Test various nodetool commands """ @@ -79,25 +83,25 @@ class TestHintedHandoffConfig(Tester): for node in node1, node2: res = self._launch_nodetool_cmd(node, 'statushandoff') - self.assertEqual('Hinted handoff is running', res.rstrip()) + assert 'Hinted handoff is running' == res.rstrip() self._launch_nodetool_cmd(node, 'disablehandoff') res = self._launch_nodetool_cmd(node, 'statushandoff') - self.assertEqual('Hinted handoff is not running', res.rstrip()) + assert 'Hinted handoff is not running' == res.rstrip() self._launch_nodetool_cmd(node, 'enablehandoff') res = self._launch_nodetool_cmd(node, 'statushandoff') - self.assertEqual('Hinted handoff is running', res.rstrip()) + assert 'Hinted handoff is running' == res.rstrip() self._launch_nodetool_cmd(node, 'disablehintsfordc dc1') res = self._launch_nodetool_cmd(node, 'statushandoff') - self.assertEqual('Hinted handoff is running{}Data center dc1 is disabled'.format(os.linesep), res.rstrip()) + assert 'Hinted handoff is running{}Data center dc1 is disabled'.format(os.linesep) == res.rstrip() self._launch_nodetool_cmd(node, 'enablehintsfordc dc1') res = self._launch_nodetool_cmd(node, 'statushandoff') - self.assertEqual('Hinted handoff is running', res.rstrip()) + assert 'Hinted handoff is running' == res.rstrip() - def hintedhandoff_disabled_test(self): + def test_hintedhandoff_disabled(self): """ Test gloabl hinted handoff disabled """ @@ -105,11 +109,11 @@ class TestHintedHandoffConfig(Tester): for node in node1, node2: res = self._launch_nodetool_cmd(node, 'statushandoff') - self.assertEqual('Hinted handoff is not running', res.rstrip()) + assert 'Hinted handoff is not running' == res.rstrip() self._do_hinted_handoff(node1, node2, False) - def hintedhandoff_enabled_test(self): + def test_hintedhandoff_enabled(self): """ Test global hinted handoff enabled """ @@ -117,12 +121,12 @@ class TestHintedHandoffConfig(Tester): for node in node1, node2: res = self._launch_nodetool_cmd(node, 'statushandoff') - self.assertEqual('Hinted handoff is running', res.rstrip()) + assert 'Hinted handoff is running' == res.rstrip() self._do_hinted_handoff(node1, node2, True) @since('4.0') - def hintedhandoff_setmaxwindow_test(self): + def test_hintedhandoff_setmaxwindow(self): """ Test global hinted handoff against max_hint_window_in_ms update via nodetool """ @@ -130,18 +134,18 @@ class TestHintedHandoffConfig(Tester): for node in node1, node2: res = self._launch_nodetool_cmd(node, 'statushandoff') - self.assertEqual('Hinted handoff is running', res.rstrip()) + assert 'Hinted handoff is running' == res.rstrip() res = self._launch_nodetool_cmd(node, 'getmaxhintwindow') - self.assertEqual('Current max hint window: 300000 ms', res.rstrip()) + assert 'Current max hint window: 300000 ms' == res.rstrip() self._do_hinted_handoff(node1, node2, True) node1.start(wait_other_notice=True) self._launch_nodetool_cmd(node, 'setmaxhintwindow 1') res = self._launch_nodetool_cmd(node, 'getmaxhintwindow') - self.assertEqual('Current max hint window: 1 ms', res.rstrip()) + assert 'Current max hint window: 1 ms' == res.rstrip() self._do_hinted_handoff(node1, node2, False, keyspace='ks2') - def hintedhandoff_dc_disabled_test(self): + def test_hintedhandoff_dc_disabled(self): """ Test global hinted handoff enabled with the dc disabled """ @@ -150,11 +154,11 @@ class TestHintedHandoffConfig(Tester): for node in node1, node2: res = self._launch_nodetool_cmd(node, 'statushandoff') - self.assertEqual('Hinted handoff is running{}Data center dc1 is disabled'.format(os.linesep), res.rstrip()) + assert 'Hinted handoff is running{}Data center dc1 is disabled'.format(os.linesep) == res.rstrip() self._do_hinted_handoff(node1, node2, False) - def hintedhandoff_dc_reenabled_test(self): + def test_hintedhandoff_dc_reenabled(self): """ Test global hinted handoff enabled with the dc disabled first and then re-enabled """ @@ -163,20 +167,20 @@ class TestHintedHandoffConfig(Tester): for node in node1, node2: res = self._launch_nodetool_cmd(node, 'statushandoff') - self.assertEqual('Hinted handoff is running{}Data center dc1 is disabled'.format(os.linesep), res.rstrip()) + assert 'Hinted handoff is running{}Data center dc1 is disabled'.format(os.linesep) == res.rstrip() for node in node1, node2: self._launch_nodetool_cmd(node, 'enablehintsfordc dc1') res = self._launch_nodetool_cmd(node, 'statushandoff') - self.assertEqual('Hinted handoff is running', res.rstrip()) + assert 'Hinted handoff is running' == res.rstrip() self._do_hinted_handoff(node1, node2, True) class TestHintedHandoff(Tester): - @no_vnodes() - def hintedhandoff_decom_test(self): + @pytest.mark.no_vnodes + def test_hintedhandoff_decom(self): self.cluster.populate(4).start(wait_for_binary_proto=True) [node1, node2, node3, node4] = self.cluster.nodelist() session = self.patient_cql_connection(node1) @@ -192,5 +196,5 @@ class TestHintedHandoff(Tester): node3.decommission(force=force) time.sleep(5) - for x in xrange(0, 100): + for x in range(0, 100): query_c1c2(session, x, ConsistencyLevel.ONE) http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/internode_ssl_test.py ---------------------------------------------------------------------- diff --git a/internode_ssl_test.py b/internode_ssl_test.py index 4149d26..fff9985 100644 --- a/internode_ssl_test.py +++ b/internode_ssl_test.py @@ -1,11 +1,15 @@ -from dtest import Tester, debug, create_ks, create_cf +import logging + +from dtest import Tester, create_ks, create_cf from tools.data import putget from tools.misc import generate_ssl_stores +logger = logging.getLogger(__name__) + class TestInternodeSSL(Tester): - def putget_with_internode_ssl_test(self): + def test_putget_with_internode_ssl(self): """ Simple putget test with internode ssl enabled with default 'all' internode compression @@ -13,7 +17,7 @@ class TestInternodeSSL(Tester): """ self.__putget_with_internode_ssl_test('all') - def putget_with_internode_ssl_without_compression_test(self): + def test_putget_with_internode_ssl_without_compression(self): """ Simple putget test with internode ssl enabled without internode compression @@ -24,10 +28,10 @@ class TestInternodeSSL(Tester): def __putget_with_internode_ssl_test(self, internode_compression): cluster = self.cluster - debug("***using internode ssl***") - generate_ssl_stores(self.test_path) + logger.debug("***using internode ssl***") + generate_ssl_stores(self.fixture_dtest_setup.test_path) cluster.set_configuration_options({'internode_compression': internode_compression}) - cluster.enable_internode_ssl(self.test_path) + cluster.enable_internode_ssl(self.fixture_dtest_setup.test_path) cluster.populate(3).start() http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/jmx_auth_test.py ---------------------------------------------------------------------- diff --git a/jmx_auth_test.py b/jmx_auth_test.py index 99e7d80..99b227f 100644 --- a/jmx_auth_test.py +++ b/jmx_auth_test.py @@ -1,16 +1,19 @@ +import pytest +import logging from distutils.version import LooseVersion from ccmlib.node import ToolError - from dtest import Tester -from tools.decorators import since from tools.jmxutils import apply_jmx_authentication +since = pytest.mark.since +logger = logging.getLogger(__name__) + @since('3.6') class TestJMXAuth(Tester): - def basic_auth_test(self): + def test_basic_auth(self): """ Some basic smoke testing of JMX authentication and authorization. Uses nodetool as a means of exercising the JMX interface as JolokiaAgent @@ -29,21 +32,21 @@ class TestJMXAuth(Tester): session.execute("GRANT DESCRIBE ON ALL MBEANS TO jmx_user") session.execute("CREATE ROLE test WITH LOGIN=true and PASSWORD='abc123'") - with self.assertRaisesRegexp(ToolError, self.authentication_fail_message(node, 'baduser')): + with pytest.raises(ToolError, matches=self.authentication_fail_message(node, 'baduser')): node.nodetool('-u baduser -pw abc123 gossipinfo') - with self.assertRaisesRegexp(ToolError, self.authentication_fail_message(node, 'test')): + with pytest.raises(ToolError, matches=self.authentication_fail_message(node, 'test')): node.nodetool('-u test -pw badpassword gossipinfo') - with self.assertRaisesRegexp(ToolError, "Required key 'username' is missing"): + with pytest.raises(ToolError, matches="Required key 'username' is missing"): node.nodetool('gossipinfo') # role must have LOGIN attribute - with self.assertRaisesRegexp(ToolError, 'jmx_user is not permitted to log in'): + with pytest.raises(ToolError, matches='jmx_user is not permitted to log in'): node.nodetool('-u jmx_user -pw 321cba gossipinfo') # test doesn't yet have any privileges on the necessary JMX resources - with self.assertRaisesRegexp(ToolError, 'Access Denied'): + with pytest.raises(ToolError, matches='Access Denied'): node.nodetool('-u test -pw abc123 gossipinfo') session.execute("GRANT jmx_user TO test") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org