http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/conftest.py ---------------------------------------------------------------------- diff --git a/conftest.py b/conftest.py new file mode 100644 index 0000000..650e9c8 --- /dev/null +++ b/conftest.py @@ -0,0 +1,489 @@ +import pytest +import logging +import os +import shutil +import time +import re +import platform +import copy +import inspect +import subprocess + +from dtest import running_in_docker, cleanup_docker_environment_before_test_execution + +from datetime import datetime +from distutils.version import LooseVersion +from netifaces import AF_INET +from psutil import virtual_memory + +import netifaces as ni + +from ccmlib.common import validate_install_dir, get_version_from_build, is_win + +from dtest_setup import DTestSetup +from dtest_setup_overrides import DTestSetupOverrides + +logger = logging.getLogger(__name__) + + +class DTestConfig: + def __init__(self): + self.use_vnodes = True + self.use_off_heap_memtables = False + self.num_tokens = -1 + self.data_dir_count = -1 + self.force_execution_of_resource_intensive_tests = False + self.skip_resource_intensive_tests = False + self.cassandra_dir = None + self.cassandra_version = None + self.delete_logs = False + self.execute_upgrade_tests = False + self.disable_active_log_watching = False + self.keep_test_dir = False + self.enable_jacoco_code_coverage = False + self.jemalloc_path = find_libjemalloc() + + def setup(self, request): + self.use_vnodes = request.config.getoption("--use-vnodes") + self.use_off_heap_memtables = request.config.getoption("--use-off-heap-memtables") + self.num_tokens = request.config.getoption("--num-tokens") + self.data_dir_count = request.config.getoption("--data-dir-count-per-instance") + self.force_execution_of_resource_intensive_tests = request.config.getoption("--force-resource-intensive-tests") + self.skip_resource_intensive_tests = request.config.getoption("--skip-resource-intensive-tests") + if request.config.getoption("--cassandra-dir") is not None: + self.cassandra_dir = os.path.expanduser(request.config.getoption("--cassandra-dir")) + self.cassandra_version = request.config.getoption("--cassandra-version") + self.delete_logs = request.config.getoption("--delete-logs") + self.execute_upgrade_tests = request.config.getoption("--execute-upgrade-tests") + self.disable_active_log_watching = request.config.getoption("--disable-active-log-watching") + self.keep_test_dir = request.config.getoption("--keep-test-dir") + self.enable_jacoco_code_coverage = request.config.getoption("--enable-jacoco-code-coverage") + + +def check_required_loopback_interfaces_available(): + """ + We need at least 3 loopback interfaces configured to run almost all dtests. On Linux, loopback + interfaces are automatically created as they are used, but on Mac they need to be explicitly + created. Check if we're running on Mac (Darwin), and if so check we have at least 3 loopback + interfaces available, otherwise bail out so we don't run the tests in a known bad config and + give the user some helpful advice on how to get their machine into a good known config + """ + if platform.system() == "Darwin": + if len(ni.ifaddresses('lo0')[AF_INET]) < 9: + pytest.exit("At least 9 loopback interfaces are required to run dtests. " + "On Mac you can create the required loopback interfaces by running " + "'for i in {1..9}; do sudo ifconfig lo0 alias 127.0.0.$i up; done;'") + + +def pytest_addoption(parser): + parser.addoption("--use-vnodes", action="store_true", default=False, + help="Determines wither or not to setup clusters using vnodes for tests") + parser.addoption("--use-off-heap-memtables", action="store_true", default=False, + help="Enable Off Heap Memtables when creating test clusters for tests") + parser.addoption("--num-tokens", action="store", default=256, + help="Number of tokens to set num_tokens yaml setting to when creating instances " + "with vnodes enabled") + parser.addoption("--data-dir-count-per-instance", action="store", default=3, + help="Control the number of data directories to create per instance") + parser.addoption("--force-resource-intensive-tests", action="store_true", default=False, + help="Forces the execution of tests marked as resource_intensive") + parser.addoption("--skip-resource-intensive-tests", action="store_true", default=False, + help="Skip all tests marked as resource_intensive") + parser.addoption("--cassandra-dir", action="store", default=None, + help="The directory containing the built C* artifacts to run the tests against. " + "(e.g. the path to the root of a cloned C* git directory. Before executing dtests using " + "this directory you must build C* with 'ant clean jar'). If you're doing C* development and " + "want to run the tests this is almost always going to be the correct option.") + parser.addoption("--cassandra-version", action="store", default=None, + help="A specific C* version to run the dtests against. The dtest framework will " + "pull the required artifacts for this version.") + parser.addoption("--delete-logs", action="store_true", default=False, + help="Delete all generated logs created by a test after the completion of a test.") + parser.addoption("--execute-upgrade-tests", action="store_true", default=False, + help="Execute Cassandra Upgrade Tests (e.g. tests annotated with the upgrade_test mark)") + parser.addoption("--disable-active-log-watching", action="store_true", default=False, + help="Disable ccm active log watching, which will cause dtests to check for errors in the " + "logs in a single operation instead of semi-realtime processing by consuming " + "ccm _log_error_handler callbacks") + parser.addoption("--keep-test-dir", action="store_true", default=False, + help="Do not remove/cleanup the test ccm cluster directory and it's artifacts " + "after the test completes") + parser.addoption("--enable-jacoco-code-coverage", action="store_true", default=False, + help="Enable JaCoCo Code Coverage Support") + + +def sufficient_system_resources_for_resource_intensive_tests(): + mem = virtual_memory() + total_mem_gb = mem.total/1024/1024/1024 + logger.info("total available system memory is %dGB" % total_mem_gb) + # todo kjkj: do not hard code our bound.. for now just do 9 instances at 3gb a piece + return total_mem_gb >= 9*3 + + +@pytest.fixture(scope='function', autouse=True) +def fixture_dtest_setup_overrides(): + """ + no-op default implementation of fixture_dtest_setup_overrides. + we run this when a test class hasn't implemented their own + fixture_dtest_setup_overrides + """ + return DTestSetupOverrides() + + +""" +Not exactly sure why :\ but, this fixture needs to be scoped to function level and not +session or class. If you invoke pytest with tests across multiple test classes, when scopped +at session, the root logger appears to get reset between each test class invocation. +this means that the first test to run not from the first test class (and all subsequent +tests), will have the root logger reset and see a level of NOTSET. Scoping it at the +class level seems to work, and I guess it's not that much extra overhead to setup the +logger once per test class vs. once per session in the grand scheme of things. +""" +@pytest.fixture(scope="function", autouse=True) +def fixture_logging_setup(request): + # set the root logger level to whatever the user asked for + # all new loggers created will use the root logger as a template + # essentially making this the "default" active log level + log_level = logging.INFO + try: + # first see if logging level overridden by user as command line argument + log_level_from_option = pytest.config.getoption("--log-level") + if log_level_from_option is not None: + log_level = logging.getLevelName(log_level_from_option) + else: + raise ValueError + except ValueError: + # nope, user didn't specify it as a command line argument to pytest, check if + # we have a default in the loaded pytest.ini. Note: words are seperated in variables + # in .ini land with a "_" while the command line arguments use "-" + if pytest.config.inicfg.get("log_level") is not None: + log_level = logging.getLevelName(pytest.config.inicfg.get("log_level")) + + logging.root.setLevel(log_level) + + logging_format = None + try: + # first see if logging level overridden by user as command line argument + log_format_from_option = pytest.config.getoption("--log-format") + if log_format_from_option is not None: + logging_format = log_format_from_option + else: + raise ValueError + except ValueError: + if pytest.config.inicfg.get("log_format") is not None: + logging_format = pytest.config.inicfg.get("log_format") + + logging.basicConfig(level=log_level, + format=logging_format) + + # next, regardless of the level we set above (and requested by the user), + # reconfigure the "cassandra" logger to minimum INFO level to override the + # logging level that the "cassandra.*" imports should use; DEBUG is just + # insanely noisy and verbose, with the extra logging of very limited help + # in the context of dtest execution + if log_level == logging.DEBUG: + cassandra_module_log_level = logging.INFO + else: + cassandra_module_log_level = log_level + logging.getLogger("cassandra").setLevel(cassandra_module_log_level) + + +@pytest.fixture(scope="session") +def log_global_env_facts(fixture_dtest_config): + if pytest.config.pluginmanager.hasplugin('junitxml'): + my_junit = getattr(pytest.config, '_xml', None) + my_junit.add_global_property('USE_VNODES', fixture_dtest_config.use_vnodes) + + +@pytest.fixture +def fixture_dtest_config(request, fixture_logging_setup): + # although we don't use fixture_logging_setup here, we do want to + # have that fixture run as a prerequisite to this one.. and right now + # this is the only way that can be done with pytests + dtest_config = DTestConfig() + dtest_config.setup(request) + return dtest_config + + +@pytest.fixture(scope='function', autouse=True) +def fixture_maybe_skip_tests_requiring_novnodes(request): + """ + Fixture run before the start of every test function that checks if the test is marked with + the no_vnodes annotation but the tests were started with a configuration that + has vnodes enabled. This should always be a no-op as we explicitly deselect tests + in pytest_collection_modifyitems that match this configuration -- but this is explicit :) + """ + if request.node.get_marker('no_vnodes'): + if request.config.getoption("--use-vnodes"): + pytest.skip("Skipping test marked with no_vnodes as tests executed with vnodes enabled via the " + "--use-vnodes command line argument") + + +@pytest.fixture(scope='function', autouse=True) +def fixture_log_test_name_and_date(request): + logger.info("Starting execution of %s at %s" % (request.node.name, str(datetime.now()))) + + +def _filter_errors(dtest_setup, errors): + """Filter errors, removing those that match ignore_log_patterns in the current DTestSetup""" + for e in errors: + for pattern in dtest_setup.ignore_log_patterns: + if re.search(pattern, repr(e)): + break + else: + yield e + + +def check_logs_for_errors(dtest_setup): + errors = [] + for node in dtest_setup.cluster.nodelist(): + errors = list(_filter_errors(dtest_setup, ['\n'.join(msg) for msg in node.grep_log_for_errors()])) + if len(errors) is not 0: + for error in errors: + if isinstance(error, (bytes, bytearray)): + error_str = error.decode("utf-8").strip() + else: + error_str = error.strip() + + if error_str: + logger.error("Unexpected error in {node_name} log, error: \n{error}" + .format(node_name=node.name, error=error_str)) + errors.append(error_str) + break + return errors + + +def copy_logs(request, cluster, directory=None, name=None): + """Copy the current cluster's log files somewhere, by default to LOG_SAVED_DIR with a name of 'last'""" + log_saved_dir = "logs" + try: + os.mkdir(log_saved_dir) + except OSError: + pass + + if directory is None: + directory = log_saved_dir + if name is None: + name = os.path.join(log_saved_dir, "last") + else: + name = os.path.join(directory, name) + if not os.path.exists(directory): + os.mkdir(directory) + logs = [(node.name, node.logfilename(), node.debuglogfilename(), node.gclogfilename(), node.compactionlogfilename()) + for node in list(cluster.nodes.values())] + if len(logs) is not 0: + basedir = str(int(time.time() * 1000)) + '_' + request.node.name + logdir = os.path.join(directory, basedir) + os.mkdir(logdir) + for n, log, debuglog, gclog, compactionlog in logs: + if os.path.exists(log): + assert os.path.getsize(log) >= 0 + shutil.copyfile(log, os.path.join(logdir, n + ".log")) + if os.path.exists(debuglog): + assert os.path.getsize(debuglog) >= 0 + shutil.copyfile(debuglog, os.path.join(logdir, n + "_debug.log")) + if os.path.exists(gclog): + assert os.path.getsize(gclog) >= 0 + shutil.copyfile(gclog, os.path.join(logdir, n + "_gc.log")) + if os.path.exists(compactionlog): + assert os.path.getsize(compactionlog) >= 0 + shutil.copyfile(compactionlog, os.path.join(logdir, n + "_compaction.log")) + if os.path.exists(name): + os.unlink(name) + if not is_win(): + os.symlink(basedir, name) + + +def reset_environment_vars(initial_environment): + pytest_current_test = os.environ.get('PYTEST_CURRENT_TEST') + os.environ.clear() + os.environ.update(initial_environment) + os.environ['PYTEST_CURRENT_TEST'] = pytest_current_test + + + + + +@pytest.fixture(scope='function', autouse=False) +def fixture_dtest_setup(request, parse_dtest_config, fixture_dtest_setup_overrides, fixture_logging_setup): + if running_in_docker(): + cleanup_docker_environment_before_test_execution() + + # do all of our setup operations to get the enviornment ready for the actual test + # to run (e.g. bring up a cluster with the necessary config, populate variables, etc) + initial_environment = copy.deepcopy(os.environ) + dtest_setup = DTestSetup(dtest_config=parse_dtest_config, setup_overrides=fixture_dtest_setup_overrides) + dtest_setup.initialize_cluster() + + if not parse_dtest_config.disable_active_log_watching: + dtest_setup.log_watch_thread = dtest_setup.begin_active_log_watch() + + # at this point we're done with our setup operations in this fixture + # yield to allow the actual test to run + yield dtest_setup + + # phew! we're back after executing the test, now we need to do + # all of our teardown and cleanup operations + + reset_environment_vars(initial_environment) + dtest_setup.jvm_args = [] + + for con in dtest_setup.connections: + con.cluster.shutdown() + dtest_setup.connections = [] + + failed = False + try: + if not dtest_setup.allow_log_errors: + errors = check_logs_for_errors(dtest_setup) + if len(errors) > 0: + failed = True + pytest.fail(msg='Unexpected error found in node logs (see stdout for full details). Errors: [{errors}]' + .format(errors=str.join(", ", errors)), pytrace=False) + finally: + try: + # save the logs for inspection + if failed or not parse_dtest_config.delete_logs: + copy_logs(request, dtest_setup.cluster) + except Exception as e: + logger.error("Error saving log:", str(e)) + finally: + dtest_setup.cleanup_cluster() + + +def _skip_msg(current_running_version, since_version, max_version): + if current_running_version < since_version: + return "%s < %s" % (current_running_version, since_version) + if max_version and current_running_version > max_version: + return "%s > %s" % (current_running_version, max_version) + + +@pytest.fixture(autouse=True) +def fixture_since(request, fixture_dtest_setup): + if request.node.get_marker('since'): + max_version_str = request.node.get_marker('since').kwargs.get('max_version', None) + max_version = None + if max_version_str: + max_version = LooseVersion(max_version_str) + + since_str = request.node.get_marker('since').args[0] + since = LooseVersion(since_str) + current_running_version = fixture_dtest_setup.cluster.version() + skip_msg = _skip_msg(current_running_version, since, max_version) + if skip_msg: + pytest.skip(skip_msg) + + +@pytest.fixture(scope='session', autouse=True) +def install_debugging_signal_handler(): + import faulthandler + faulthandler.enable() + + +@pytest.fixture(scope='function') +def parse_dtest_config(request): + dtest_config = DTestConfig() + dtest_config.setup(request) + + # if we're on mac, check that we have the required loopback interfaces before doing anything! + check_required_loopback_interfaces_available() + + try: + if dtest_config.cassandra_dir is not None: + validate_install_dir(dtest_config.cassandra_dir) + except Exception as e: + pytest.exit("{}. Did you remember to build C*? ('ant clean jar')".format(e)) + + yield dtest_config + + +def pytest_collection_modifyitems(items, config): + """ + This function is called upon during the pytest test collection phase and allows for modification + of the test items within the list + """ + if not config.getoption("--collect-only") and config.getoption("--cassandra-dir") is None: + if config.getoption("--cassandra-version") is None: + raise Exception("Required dtest arguments were missing! You must provide either --cassandra-dir " + "or --cassandra-version. Refer to the documentation or invoke the help with --help.") + + selected_items = [] + deselected_items = [] + + sufficient_system_resources_resource_intensive = sufficient_system_resources_for_resource_intensive_tests() + logger.debug("has sufficient resources? %s" % sufficient_system_resources_resource_intensive) + + for item in items: + # set a timeout for all tests, it may be overwritten at the test level with an additional marker + if not item.get_marker("timeout"): + item.add_marker(pytest.mark.timeout(60*15)) + + deselect_test = False + + if item.get_marker("resource_intensive"): + if config.getoption("--force-resource-intensive-tests"): + pass + if config.getoption("--skip-resource-intensive-tests"): + deselect_test = True + logger.info("SKIP: Deselecting test %s as test marked resource_intensive. To force execution of " + "this test re-run with the --force-resource-intensive-tests command line argument" % item.name) + if not sufficient_system_resources_resource_intensive: + deselect_test = True + logger.info("SKIP: Deselecting resource_intensive test %s due to insufficient system resources" % item.name) + + if item.get_marker("no_vnodes"): + if config.getoption("--use-vnodes"): + deselect_test = True + logger.info("SKIP: Deselecting test %s as the test requires vnodes to be disabled. To run this test, " + "re-run without the --use-vnodes command line argument" % item.name) + + if item.get_marker("vnodes"): + if not config.getoption("--use-vnodes"): + deselect_test = True + logger.info("SKIP: Deselecting test %s as the test requires vnodes to be enabled. To run this test, " + "re-run with the --use-vnodes command line argument" % item.name) + + for test_item_class in inspect.getmembers(item.module, inspect.isclass): + if not hasattr(test_item_class[1], "pytestmark"): + continue + + for module_pytest_mark in test_item_class[1].pytestmark: + if module_pytest_mark.name == "upgrade_test": + if not config.getoption("--execute-upgrade-tests"): + deselect_test = True + + if item.get_marker("upgrade_test"): + if not config.getoption("--execute-upgrade-tests"): + deselect_test = True + + # todo kjkj: deal with no_offheap_memtables mark + + if deselect_test: + deselected_items.append(item) + else: + selected_items.append(item) + + config.hook.pytest_deselected(items=deselected_items) + items[:] = selected_items + + +# Determine the location of the libjemalloc jar so that we can specify it +# through environment variables when start Cassandra. This reduces startup +# time, making the dtests run faster. +def find_libjemalloc(): + if is_win(): + # let the normal bat script handle finding libjemalloc + return "" + + this_dir = os.path.dirname(os.path.realpath(__file__)) + script = os.path.join(this_dir, "findlibjemalloc.sh") + try: + p = subprocess.Popen([script], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stdout, stderr = p.communicate() + if stderr or not stdout: + return "-" # tells C* not to look for libjemalloc + else: + return stdout + except Exception as exc: + print("Failed to run script to prelocate libjemalloc ({}): {}".format(script, exc)) + return ""
http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/consistency_test.py ---------------------------------------------------------------------- diff --git a/consistency_test.py b/consistency_test.py index 2eaa2ca..368dba0 100644 --- a/consistency_test.py +++ b/consistency_test.py @@ -1,36 +1,30 @@ -import Queue +import queue import sys import threading import time +import pytest +import logging from collections import OrderedDict, namedtuple from copy import deepcopy from cassandra import ConsistencyLevel, consistency_value_to_name from cassandra.query import SimpleStatement -from nose.plugins.attrib import attr -from nose.tools import assert_greater_equal from tools.assertions import (assert_all, assert_length_equal, assert_none, assert_unavailable) -from dtest import DISABLE_VNODES, MultiError, Tester, debug, create_ks, create_cf +from dtest import MultiError, Tester, create_ks, create_cf from tools.data import (create_c1c2_table, insert_c1c2, insert_columns, query_c1c2, rows_to_list) -from tools.decorators import since from tools.jmxutils import JolokiaAgent, make_mbean, remove_perf_disable_shared_mem +since = pytest.mark.since +logger = logging.getLogger(__name__) + ExpectedConsistency = namedtuple('ExpectedConsistency', ('num_write_nodes', 'num_read_nodes', 'is_strong')) class TestHelper(Tester): - def __init__(self, *args, **kwargs): - Tester.__init__(self, *args, **kwargs) - self.lock = threading.Lock() - - def log(self, message): - with self.lock: - debug(message) - def _is_local(self, cl): return (cl == ConsistencyLevel.LOCAL_QUORUM or cl == ConsistencyLevel.LOCAL_ONE or @@ -51,12 +45,12 @@ class TestHelper(Tester): ConsistencyLevel.ONE: 1, ConsistencyLevel.TWO: 2, ConsistencyLevel.THREE: 3, - ConsistencyLevel.QUORUM: sum(rf_factors) / 2 + 1, + ConsistencyLevel.QUORUM: sum(rf_factors) // 2 + 1, ConsistencyLevel.ALL: sum(rf_factors), - ConsistencyLevel.LOCAL_QUORUM: rf_factors[dc] / 2 + 1, - ConsistencyLevel.EACH_QUORUM: rf_factors[dc] / 2 + 1, - ConsistencyLevel.SERIAL: sum(rf_factors) / 2 + 1, - ConsistencyLevel.LOCAL_SERIAL: rf_factors[dc] / 2 + 1, + ConsistencyLevel.LOCAL_QUORUM: rf_factors[dc] // 2 + 1, + ConsistencyLevel.EACH_QUORUM: rf_factors[dc] // 2 + 1, + ConsistencyLevel.SERIAL: sum(rf_factors) // 2 + 1, + ConsistencyLevel.LOCAL_SERIAL: rf_factors[dc] // 2 + 1, ConsistencyLevel.LOCAL_ONE: 1, }[cl] @@ -73,7 +67,7 @@ class TestHelper(Tester): :return: the data center corresponding to this node """ dc = 0 - for i in xrange(1, len(nodes)): + for i in range(1, len(nodes)): if idx < sum(nodes[:i]): break dc += 1 @@ -101,7 +95,7 @@ class TestHelper(Tester): if self._is_local(cl): return num_nodes_alive[current] >= self._required_nodes(cl, rf_factors, current) elif cl == ConsistencyLevel.EACH_QUORUM: - for i in xrange(0, len(rf_factors)): + for i in range(0, len(rf_factors)): if num_nodes_alive[i] < self._required_nodes(cl, rf_factors, i): return False return True @@ -132,7 +126,7 @@ class TestHelper(Tester): # StorageProxy.getLiveSortedEndpoints(), which is called by the AbstractReadExecutor # to determine the target replicas. The default case, a SimpleSnitch wrapped in # a dynamic snitch, may rarely choose a different replica. - debug('Changing snitch for single dc case') + logger.debug('Changing snitch for single dc case') for node in cluster.nodelist(): node.data_center = 'dc1' cluster.set_configuration_options(values={ @@ -208,7 +202,7 @@ class TestHelper(Tester): expected = [[userid, age]] if age else [] ret = rows_to_list(res) == expected if check_ret: - self.assertTrue(ret, "Got {} from {}, expected {} at {}".format(rows_to_list(res), session.cluster.contact_points, expected, consistency_value_to_name(consistency))) + assert ret, "Got {} from {}, expected {} at {}".format(rows_to_list(res), session.cluster.contact_points, expected, consistency_value_to_name(consistency)) return ret def create_counters_table(self, session, requires_local_reads): @@ -233,10 +227,10 @@ class TestHelper(Tester): statement = SimpleStatement("SELECT * from counters WHERE id = {}".format(id), consistency_level=consistency) ret = rows_to_list(session.execute(statement)) if check_ret: - self.assertEqual(ret[0][1], val, "Got {} from {}, expected {} at {}".format(ret[0][1], + assert ret[0][1] == val, "Got {} from {}, expected {} at {}".format(ret[0][1], session.cluster.contact_points, val, - consistency_value_to_name(consistency))) + consistency_value_to_name(consistency)) return ret[0][1] if ret else 0 @@ -255,8 +249,8 @@ class TestAvailability(TestHelper): rf = self.rf num_alive = nodes - for node in xrange(nodes): - debug('Testing node {} in single dc with {} nodes alive'.format(node, num_alive)) + for node in range(nodes): + logger.debug('Testing node {} in single dc with {} nodes alive'.format(node, num_alive)) session = self.patient_exclusive_cql_connection(cluster.nodelist()[node], self.ksname) for combination in combinations: self._test_insert_query_from_node(session, 0, [rf], [num_alive], *combination) @@ -274,12 +268,12 @@ class TestAvailability(TestHelper): rf = self.rf nodes_alive = deepcopy(nodes) - rf_factors = rf.values() + rf_factors = list(rf.values()) - for i in xrange(0, len(nodes)): # for each dc - self.log('Testing dc {} with rf {} and {} nodes alive'.format(i, rf_factors[i], nodes_alive)) - for n in xrange(nodes[i]): # for each node in this dc - self.log('Testing node {} in dc {} with {} nodes alive'.format(n, i, nodes_alive)) + for i in range(0, len(nodes)): # for each dc + logger.debug('Testing dc {} with rf {} and {} nodes alive'.format(i, rf_factors[i], nodes_alive)) + for n in range(nodes[i]): # for each node in this dc + logger.debug('Testing node {} in dc {} with {} nodes alive'.format(n, i, nodes_alive)) node = n + sum(nodes[:i]) session = self.patient_exclusive_cql_connection(cluster.nodelist()[node], self.ksname) for combination in combinations: @@ -292,7 +286,7 @@ class TestAvailability(TestHelper): """ Test availability for read and write via the session passed in as a parameter. """ - self.log("Connected to %s for %s/%s/%s" % + logger.debug("Connected to %s for %s/%s/%s" % (session.cluster.contact_points, consistency_value_to_name(write_cl), consistency_value_to_name(read_cl), consistency_value_to_name(serial_cl))) start = 0 @@ -300,13 +294,13 @@ class TestAvailability(TestHelper): age = 30 if self._should_succeed(write_cl, rf_factors, num_nodes_alive, dc_idx): - for n in xrange(start, end): + for n in range(start, end): self.insert_user(session, n, age, write_cl, serial_cl) else: assert_unavailable(self.insert_user, session, end, age, write_cl, serial_cl) if self._should_succeed(read_cl, rf_factors, num_nodes_alive, dc_idx): - for n in xrange(start, end): + for n in range(start, end): self.query_user(session, n, age, read_cl, check_ret) else: assert_unavailable(self.query_user, session, end, age, read_cl, check_ret) @@ -361,7 +355,7 @@ class TestAvailability(TestHelper): self._test_simple_strategy(combinations) - @attr("resource-intensive") + @pytest.mark.resource_intensive def test_network_topology_strategy(self): """ Test for multiple datacenters, using network topology replication strategy. @@ -393,7 +387,7 @@ class TestAvailability(TestHelper): self._test_network_topology_strategy(combinations) - @attr("resource-intensive") + @pytest.mark.resource_intensive @since("3.0") def test_network_topology_strategy_each_quorum(self): """ @@ -432,7 +426,7 @@ class TestAccuracy(TestHelper): self.read_cl = read_cl self.serial_cl = serial_cl - outer.log('Testing accuracy with WRITE/READ/SERIAL consistency set to {}/{}/{} (keys : {} to {})' + logger.debug('Testing accuracy with WRITE/READ/SERIAL consistency set to {}/{}/{} (keys : {} to {})' .format(consistency_value_to_name(write_cl), consistency_value_to_name(read_cl), consistency_value_to_name(serial_cl), start, end - 1)) def get_expected_consistency(self, idx): @@ -459,12 +453,10 @@ class TestAccuracy(TestHelper): for s in sessions: if outer.query_user(s, n, val, read_cl, check_ret=expected_consistency.is_strong): num += 1 - assert_greater_equal(num, expected_consistency.num_write_nodes, - "Failed to read value from sufficient number of nodes," - " required {} but got {} - [{}, {}]" - .format(expected_consistency.num_write_nodes, num, n, val)) + assert num >= expected_consistency.num_write_nodes, "Failed to read value from sufficient number of nodes," + \ + " required {} but got {} - [{}, {}]".format(expected_consistency.num_write_nodes, num, n, val) - for n in xrange(start, end): + for n in range(start, end): age = 30 for s in range(0, len(sessions)): outer.insert_user(sessions[s], n, age, write_cl, serial_cl) @@ -499,12 +491,10 @@ class TestAccuracy(TestHelper): for s in sessions: results.append(outer.query_counter(s, n, val, read_cl, check_ret=expected_consistency.is_strong)) - assert_greater_equal(results.count(val), expected_consistency.num_write_nodes, - "Failed to read value from sufficient number of nodes, required {} nodes to have a" - " counter value of {} at key {}, instead got these values: {}" - .format(expected_consistency.num_write_nodes, val, n, results)) + assert results.count(val) >= expected_consistency.num_write_nodes, "Failed to read value from sufficient number of nodes, required {} nodes to have a" + \ + " counter value of {} at key {}, instead got these values: {}".format(expected_consistency.num_write_nodes, val, n, results) - for n in xrange(start, end): + for n in range(start, end): c = 1 for s in range(0, len(sessions)): outer.update_counter(sessions[s], n, write_cl, serial_cl) @@ -534,15 +524,15 @@ class TestAccuracy(TestHelper): self._start_cluster(save_sessions=True, requires_local_reads=requires_local_reads) - input_queue = Queue.Queue() - exceptions_queue = Queue.Queue() + input_queue = queue.Queue() + exceptions_queue = queue.Queue() def run(): while not input_queue.empty(): try: v = TestAccuracy.Validation(self, self.sessions, nodes, rf_factors, *input_queue.get(block=False)) valid_fcn(v) - except Queue.Empty: + except queue.Empty: pass except Exception: exceptions_queue.put(sys.exc_info()) @@ -560,17 +550,17 @@ class TestAccuracy(TestHelper): t.start() threads.append(t) - self.log("Waiting for workers to complete") + logger.debug("Waiting for workers to complete") while exceptions_queue.empty(): time.sleep(0.1) - if len(filter(lambda t: t.isAlive(), threads)) == 0: + if len([t for t in threads if t.isAlive()]) == 0: break if not exceptions_queue.empty(): - _, exceptions, tracebacks = zip(*exceptions_queue.queue) + _, exceptions, tracebacks = list(zip(*exceptions_queue.queue)) raise MultiError(exceptions=exceptions, tracebacks=tracebacks) - @attr("resource-intensive") + @pytest.mark.resource_intensive def test_simple_strategy_users(self): """ Test for a single datacenter, users table, only the each quorum reads. @@ -599,10 +589,10 @@ class TestAccuracy(TestHelper): (ConsistencyLevel.QUORUM, ConsistencyLevel.LOCAL_SERIAL, ConsistencyLevel.SERIAL), ] - self.log("Testing single dc, users") + logger.debug("Testing single dc, users") self._run_test_function_in_parallel(TestAccuracy.Validation.validate_users, [self.nodes], [self.rf], combinations) - @attr("resource-intensive") + @pytest.mark.resource_intensive @since("3.0") def test_simple_strategy_each_quorum_users(self): """ @@ -617,10 +607,10 @@ class TestAccuracy(TestHelper): (ConsistencyLevel.EACH_QUORUM, ConsistencyLevel.EACH_QUORUM), ] - self.log("Testing single dc, users, each quorum reads") + logger.debug("Testing single dc, users, each quorum reads") self._run_test_function_in_parallel(TestAccuracy.Validation.validate_users, [self.nodes], [self.rf], combinations) - @attr("resource-intensive") + @pytest.mark.resource_intensive def test_network_topology_strategy_users(self): """ Test for multiple datacenters, users table. @@ -653,10 +643,10 @@ class TestAccuracy(TestHelper): (ConsistencyLevel.LOCAL_QUORUM, ConsistencyLevel.SERIAL, ConsistencyLevel.LOCAL_SERIAL), ] - self.log("Testing multiple dcs, users") - self._run_test_function_in_parallel(TestAccuracy.Validation.validate_users, self.nodes, self.rf.values(), combinations), + logger.debug("Testing multiple dcs, users") + self._run_test_function_in_parallel(TestAccuracy.Validation.validate_users, self.nodes, list(self.rf.values()), combinations), - @attr("resource-intensive") + @pytest.mark.resource_intensive @since("3.0") def test_network_topology_strategy_each_quorum_users(self): """ @@ -672,8 +662,8 @@ class TestAccuracy(TestHelper): (ConsistencyLevel.EACH_QUORUM, ConsistencyLevel.EACH_QUORUM), ] - self.log("Testing multiple dcs, users, each quorum reads") - self._run_test_function_in_parallel(TestAccuracy.Validation.validate_users, self.nodes, self.rf.values(), combinations) + logger.debug("Testing multiple dcs, users, each quorum reads") + self._run_test_function_in_parallel(TestAccuracy.Validation.validate_users, self.nodes, list(self.rf.values()), combinations) def test_simple_strategy_counters(self): """ @@ -700,7 +690,7 @@ class TestAccuracy(TestHelper): (ConsistencyLevel.LOCAL_QUORUM, ConsistencyLevel.LOCAL_QUORUM), ] - self.log("Testing single dc, counters") + logger.debug("Testing single dc, counters") self._run_test_function_in_parallel(TestAccuracy.Validation.validate_counters, [self.nodes], [self.rf], combinations) @since("3.0") @@ -718,10 +708,10 @@ class TestAccuracy(TestHelper): (ConsistencyLevel.EACH_QUORUM, ConsistencyLevel.EACH_QUORUM), ] - self.log("Testing single dc, counters, each quorum reads") + logger.debug("Testing single dc, counters, each quorum reads") self._run_test_function_in_parallel(TestAccuracy.Validation.validate_counters, [self.nodes], [self.rf], combinations) - @attr("resource-intensive") + @pytest.mark.resource_intensive def test_network_topology_strategy_counters(self): """ Test for multiple datacenters, counters table. @@ -749,10 +739,10 @@ class TestAccuracy(TestHelper): (ConsistencyLevel.TWO, ConsistencyLevel.ONE), ] - self.log("Testing multiple dcs, counters") - self._run_test_function_in_parallel(TestAccuracy.Validation.validate_counters, self.nodes, self.rf.values(), combinations), + logger.debug("Testing multiple dcs, counters") + self._run_test_function_in_parallel(TestAccuracy.Validation.validate_counters, self.nodes, list(self.rf.values()), combinations), - @attr("resource-intensive") + @pytest.mark.resource_intensive @since("3.0") def test_network_topology_strategy_each_quorum_counters(self): """ @@ -768,8 +758,8 @@ class TestAccuracy(TestHelper): (ConsistencyLevel.EACH_QUORUM, ConsistencyLevel.EACH_QUORUM), ] - self.log("Testing multiple dcs, counters, each quorum reads") - self._run_test_function_in_parallel(TestAccuracy.Validation.validate_counters, self.nodes, self.rf.values(), combinations), + logger.debug("Testing multiple dcs, counters, each quorum reads") + self._run_test_function_in_parallel(TestAccuracy.Validation.validate_counters, self.nodes, list(self.rf.values()), combinations), class TestConsistency(Tester): @@ -1105,7 +1095,7 @@ class TestConsistency(Tester): srp = make_mbean('metrics', type='Table', name='ShortReadProtectionRequests', keyspace='test', scope='test') with JolokiaAgent(node1) as jmx: # 4 srp requests for node1 and 5 for node2, total of 9 - self.assertEqual(9, jmx.read_attribute(srp, 'Count')) + assert 9 == jmx.read_attribute(srp, 'Count') @since('3.0') def test_12872(self): @@ -1174,12 +1164,17 @@ class TestConsistency(Tester): [[0], [4]], cl=ConsistencyLevel.ALL) - def short_read_test(self): + def test_short_read(self): """ @jira_ticket CASSANDRA-9460 """ cluster = self.cluster + # this test causes the python driver to be extremely noisy due to + # frequent starting and stopping of nodes. let's move the log level + # of the driver to ERROR for this test only + logging.getLogger("cassandra").setLevel('ERROR') + # Disable hinted handoff and set batch commit log so this doesn't # interfer with the test cluster.set_configuration_options(values={'hinted_handoff_enabled': False}) @@ -1196,13 +1191,13 @@ class TestConsistency(Tester): reversed_key = 'reversed' # Repeat this test 10 times to make it more easy to spot a null pointer exception caused by a race, see CASSANDRA-9460 - for k in xrange(10): + for k in range(10): # insert 9 columns in two rows insert_columns(self, session, normal_key, 9) insert_columns(self, session, reversed_key, 9) # Delete 3 first columns (and 3 last columns, for the reversed version) with a different node dead each time - for node, column_number_to_delete in zip(range(1, 4), range(3)): + for node, column_number_to_delete in zip(list(range(1, 4)), list(range(3))): self.stop_node(node) self.delete(node, normal_key, column_number_to_delete) self.delete(node, reversed_key, 8 - column_number_to_delete) @@ -1218,8 +1213,8 @@ class TestConsistency(Tester): assert_length_equal(res, 3) # value 0, 1 and 2 have been deleted - for i in xrange(1, 4): - self.assertEqual('value{}'.format(i + 2), res[i - 1][1]) + for i in range(1, 4): + assert 'value{}'.format(i + 2) == res[i - 1][1] # Query 3 firsts columns in reverse order session = self.patient_cql_connection(node1, 'ks') @@ -1231,12 +1226,12 @@ class TestConsistency(Tester): assert_length_equal(res, 3) # value 6, 7 and 8 have been deleted - for i in xrange(0, 3): - self.assertEqual('value{}'.format(5 - i), res[i][1]) + for i in range(0, 3): + assert 'value{}'.format(5 - i) == res[i][1] session.execute('TRUNCATE cf') - def short_read_delete_test(self): + def test_short_read_delete(self): """ Test short reads ultimately leaving no columns alive [#4000] """ cluster = self.cluster @@ -1269,7 +1264,7 @@ class TestConsistency(Tester): assert_none(session, "SELECT c, v FROM cf WHERE key=\'k0\' LIMIT 1", cl=ConsistencyLevel.QUORUM) - def short_read_quorum_delete_test(self): + def test_short_read_quorum_delete(self): """ @jira_ticket CASSANDRA-8933 """ @@ -1311,11 +1306,11 @@ class TestConsistency(Tester): node3.stop(wait_other_notice=True) assert_none(session, "SELECT * FROM t WHERE id = 0 LIMIT 1", cl=ConsistencyLevel.QUORUM) - def readrepair_test(self): + def test_readrepair(self): cluster = self.cluster cluster.set_configuration_options(values={'hinted_handoff_enabled': False}) - if DISABLE_VNODES: + if not self.dtest_config.use_vnodes: cluster.populate(2).start() else: tokens = cluster.balanced_tokens(2) @@ -1333,43 +1328,43 @@ class TestConsistency(Tester): node2.start(wait_for_binary_proto=True, wait_other_notice=True) # query everything to cause RR - for n in xrange(0, 10000): + for n in range(0, 10000): query_c1c2(session, n, ConsistencyLevel.QUORUM) node1.stop(wait_other_notice=True) # Check node2 for all the keys that should have been repaired session = self.patient_cql_connection(node2, keyspace='ks') - for n in xrange(0, 10000): + for n in range(0, 10000): query_c1c2(session, n, ConsistencyLevel.ONE) - def quorum_available_during_failure_test(self): - CL = ConsistencyLevel.QUORUM - RF = 3 + def test_quorum_available_during_failure(self): + cl = ConsistencyLevel.QUORUM + rf = 3 - debug("Creating a ring") + logger.debug("Creating a ring") cluster = self.cluster - if DISABLE_VNODES: + if not self.dtest_config.use_vnodes: cluster.populate(3).start() else: tokens = cluster.balanced_tokens(3) cluster.populate(3, tokens=tokens).start() node1, node2, node3 = cluster.nodelist() - debug("Set to talk to node 2") + logger.debug("Set to talk to node 2") session = self.patient_cql_connection(node2) - create_ks(session, 'ks', RF) + create_ks(session, 'ks', rf) create_c1c2_table(self, session) - debug("Generating some data") - insert_c1c2(session, n=100, consistency=CL) + logger.debug("Generating some data") + insert_c1c2(session, n=100, consistency=cl) - debug("Taking down node1") + logger.debug("Taking down node1") node1.stop(wait_other_notice=True) - debug("Reading back data.") - for n in xrange(100): - query_c1c2(session, n, CL) + logger.debug("Reading back data.") + for n in range(100): + query_c1c2(session, n, cl) def stop_node(self, node_number): to_stop = self.cluster.nodes["node%d" % node_number] http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/consistent_bootstrap_test.py ---------------------------------------------------------------------- diff --git a/consistent_bootstrap_test.py b/consistent_bootstrap_test.py index ada9b39..24626f3 100644 --- a/consistent_bootstrap_test.py +++ b/consistent_bootstrap_test.py @@ -1,92 +1,100 @@ +import pytest +import logging + from cassandra import ConsistencyLevel -from dtest import Tester, debug, create_ks +from dtest import Tester, create_ks from tools.data import create_c1c2_table, insert_c1c2, query_c1c2 -from tools.decorators import no_vnodes from tools.misc import new_node +logger = logging.getLogger(__name__) + class TestBootstrapConsistency(Tester): - @no_vnodes() - def consistent_reads_after_move_test(self): - debug("Creating a ring") + @pytest.mark.no_vnodes + def test_consistent_reads_after_move(self): + logger.debug("Creating a ring") cluster = self.cluster - cluster.set_configuration_options(values={'hinted_handoff_enabled': False, 'write_request_timeout_in_ms': 60000, - 'read_request_timeout_in_ms': 60000, 'dynamic_snitch_badness_threshold': 0.0}) + cluster.set_configuration_options(values={'hinted_handoff_enabled': False, + 'write_request_timeout_in_ms': 60000, + 'read_request_timeout_in_ms': 60000, + 'dynamic_snitch_badness_threshold': 0.0}) cluster.set_batch_commitlog(enabled=True) cluster.populate(3, tokens=[0, 2**48, 2**62]).start() node1, node2, node3 = cluster.nodelist() - debug("Set to talk to node 2") + logger.debug("Set to talk to node 2") n2session = self.patient_cql_connection(node2) create_ks(n2session, 'ks', 2) create_c1c2_table(self, n2session) - debug("Generating some data for all nodes") - insert_c1c2(n2session, keys=range(10, 20), consistency=ConsistencyLevel.ALL) + logger.debug("Generating some data for all nodes") + insert_c1c2(n2session, keys=list(range(10, 20)), consistency=ConsistencyLevel.ALL) node1.flush() - debug("Taking down node1") + logger.debug("Taking down node1") node1.stop(wait_other_notice=True) - debug("Writing data to node2") - insert_c1c2(n2session, keys=range(30, 1000), consistency=ConsistencyLevel.ONE) + logger.debug("Writing data to node2") + insert_c1c2(n2session, keys=list(range(30, 1000)), consistency=ConsistencyLevel.ONE) node2.flush() - debug("Restart node1") + logger.debug("Restart node1") node1.start(wait_other_notice=True) - debug("Move token on node3") + logger.debug("Move token on node3") node3.move(2) - debug("Checking that no data was lost") - for n in xrange(10, 20): + logger.debug("Checking that no data was lost") + for n in range(10, 20): query_c1c2(n2session, n, ConsistencyLevel.ALL) - for n in xrange(30, 1000): + for n in range(30, 1000): query_c1c2(n2session, n, ConsistencyLevel.ALL) - def consistent_reads_after_bootstrap_test(self): - debug("Creating a ring") + def test_consistent_reads_after_bootstrap(self): + logger.debug("Creating a ring") cluster = self.cluster - cluster.set_configuration_options(values={'hinted_handoff_enabled': False, 'write_request_timeout_in_ms': 60000, - 'read_request_timeout_in_ms': 60000, 'dynamic_snitch_badness_threshold': 0.0}) + cluster.set_configuration_options(values={'hinted_handoff_enabled': False, + 'write_request_timeout_in_ms': 60000, + 'read_request_timeout_in_ms': 60000, + 'dynamic_snitch_badness_threshold': 0.0}) cluster.set_batch_commitlog(enabled=True) cluster.populate(2) node1, node2 = cluster.nodelist() cluster.start(wait_for_binary_proto=True, wait_other_notice=True) - debug("Set to talk to node 2") + logger.debug("Set to talk to node 2") n2session = self.patient_cql_connection(node2) create_ks(n2session, 'ks', 2) create_c1c2_table(self, n2session) - debug("Generating some data for all nodes") - insert_c1c2(n2session, keys=range(10, 20), consistency=ConsistencyLevel.ALL) + logger.debug("Generating some data for all nodes") + insert_c1c2(n2session, keys=list(range(10, 20)), consistency=ConsistencyLevel.ALL) node1.flush() - debug("Taking down node1") + logger.debug("Taking down node1") node1.stop(wait_other_notice=True) - debug("Writing data to only node2") - insert_c1c2(n2session, keys=range(30, 1000), consistency=ConsistencyLevel.ONE) + logger.debug("Writing data to only node2") + insert_c1c2(n2session, keys=list(range(30, 1000)), consistency=ConsistencyLevel.ONE) node2.flush() - debug("Restart node1") + logger.debug("Restart node1") node1.start(wait_other_notice=True) - debug("Bootstraping node3") + logger.debug("Bootstraping node3") node3 = new_node(cluster) node3.start(wait_for_binary_proto=True) n3session = self.patient_cql_connection(node3) n3session.execute("USE ks") - debug("Checking that no data was lost") - for n in xrange(10, 20): + logger.debug("Checking that no data was lost") + for n in range(10, 20): query_c1c2(n3session, n, ConsistencyLevel.ALL) - for n in xrange(30, 1000): + for n in range(30, 1000): query_c1c2(n3session, n, ConsistencyLevel.ALL) http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/counter_test.py ---------------------------------------------------------------------- diff --git a/counter_test.py b/counter_test.py new file mode 100644 index 0000000..ae87d95 --- /dev/null +++ b/counter_test.py @@ -0,0 +1,417 @@ +import random +import time +import uuid +import pytest +import logging + +from cassandra import ConsistencyLevel +from cassandra.query import SimpleStatement + +from tools.assertions import assert_invalid, assert_length_equal, assert_one +from dtest import Tester, create_ks, create_cf +from tools.data import rows_to_list + +since = pytest.mark.since +logger = logging.getLogger(__name__) + + +class TestCounters(Tester): + + @since('3.0', max_version='3.12') + def test_13691(self): + """ + 2.0 -> 2.1 -> 3.0 counters upgrade test + @jira_ticket CASSANDRA-13691 + """ + cluster = self.cluster + default_install_dir = cluster.get_install_dir() + + # + # set up a 2.0 cluster with 3 nodes and set up schema + # + + cluster.set_install_dir(version='2.0.17') + cluster.populate(3) + cluster.start() + + node1, node2, node3 = cluster.nodelist() + + session = self.patient_cql_connection(node1) + session.execute(""" + CREATE KEYSPACE test + WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3}; + """) + session.execute("CREATE TABLE test.test (id int PRIMARY KEY, c counter);") + + # + # generate some 2.0 counter columns with local shards + # + + query = "UPDATE test.test SET c = c + 1 WHERE id = ?" + prepared = session.prepare(query) + for i in range(0, 1000): + session.execute(prepared, [i]) + + cluster.flush() + cluster.stop() + + # + # upgrade cluster to 2.1 + # + + cluster.set_install_dir(version='2.1.17') + cluster.start() + cluster.nodetool("upgradesstables") + + # + # upgrade node3 to current (3.0.x or 3.11.x) + # + + node3.stop(wait_other_notice=True) + node3.set_install_dir(install_dir=default_install_dir) + node3.start(wait_other_notice=True) + + # + # with a 2.1 coordinator, try to read the table with CL.ALL + # + + session = self.patient_cql_connection(node1, consistency_level=ConsistencyLevel.ALL) + assert_one(session, "SELECT COUNT(*) FROM test.test", [1000]) + + @pytest.mark.vnodes + def test_counter_leader_with_partial_view(self): + """ + Test leader election with a starting node. + + Testing that nodes do not elect as mutation leader a node with a partial view on the cluster. + Note that byteman rules can be syntax checked via the following command: + sh ./bin/bytemancheck.sh -cp ~/path_to/apache-cassandra-3.0.14-SNAPSHOT.jar ~/path_to/rule.btm + + @jira_ticket CASSANDRA-13043 + """ + cluster = self.cluster + + cluster.populate(3, use_vnodes=True, install_byteman=True) + nodes = cluster.nodelist() + # Have node 1 and 3 cheat a bit during the leader election for a counter mutation; note that cheating + # takes place iff there is an actual chance for node 2 to be picked. + if cluster.version() < '4.0': + nodes[0].update_startup_byteman_script('./byteman/pre4.0/election_counter_leader_favor_node2.btm') + nodes[2].update_startup_byteman_script('./byteman/pre4.0/election_counter_leader_favor_node2.btm') + else: + nodes[0].update_startup_byteman_script('./byteman/4.0/election_counter_leader_favor_node2.btm') + nodes[2].update_startup_byteman_script('./byteman/4.0/election_counter_leader_favor_node2.btm') + + cluster.start(wait_for_binary_proto=True) + session = self.patient_cql_connection(nodes[0]) + create_ks(session, 'ks', 3) + create_cf(session, 'cf', validation="CounterColumnType", columns={'c': 'counter'}) + + # Now stop the node and restart but first install a rule to slow down how fast node 2 will update the list + # nodes that are alive + nodes[1].stop(wait=True, wait_other_notice=False) + nodes[1].update_startup_byteman_script('./byteman/gossip_alive_callback_sleep.btm') + nodes[1].start(no_wait=True, wait_other_notice=False) + + # Until node 2 is fully alive try to force other nodes to pick him as mutation leader. + # If CASSANDRA-13043 is fixed, they will not. Otherwise they will do, but since we are slowing down how + # fast node 2 updates the list of nodes that are alive, it will just have a partial view on the cluster + # and thus will raise an 'UnavailableException' exception. + nb_attempts = 50000 + for i in range(0, nb_attempts): + # Change the name of the counter for the sake of randomization + q = SimpleStatement( + query_string="UPDATE ks.cf SET c = c + 1 WHERE key = 'counter_%d'" % i, + consistency_level=ConsistencyLevel.QUORUM + ) + session.execute(q) + + def test_simple_increment(self): + """ Simple incrementation test (Created for #3465, that wasn't a bug) """ + cluster = self.cluster + + cluster.populate(3).start() + nodes = cluster.nodelist() + + session = self.patient_cql_connection(nodes[0]) + create_ks(session, 'ks', 3) + create_cf(session, 'cf', validation="CounterColumnType", columns={'c': 'counter'}) + + sessions = [self.patient_cql_connection(node, 'ks') for node in nodes] + nb_increment = 50 + nb_counter = 10 + + for i in range(0, nb_increment): + for c in range(0, nb_counter): + session = sessions[(i + c) % len(nodes)] + query = SimpleStatement("UPDATE cf SET c = c + 1 WHERE key = 'counter%i'" % c, consistency_level=ConsistencyLevel.QUORUM) + session.execute(query) + + session = sessions[i % len(nodes)] + keys = ",".join(["'counter%i'" % c for c in range(0, nb_counter)]) + query = SimpleStatement("SELECT key, c FROM cf WHERE key IN (%s)" % keys, consistency_level=ConsistencyLevel.QUORUM) + res = list(session.execute(query)) + + assert_length_equal(res, nb_counter) + for c in range(0, nb_counter): + assert len(res[c]) == 2, "Expecting key and counter for counter {}, got {}".format(c, str(res[c])) + assert res[c][1] == i + 1, "Expecting counter {} = {}, got {}".format(c, i + 1, res[c][0]) + + def test_upgrade(self): + """ Test for bug of #4436 """ + cluster = self.cluster + + cluster.populate(2).start() + nodes = cluster.nodelist() + + session = self.patient_cql_connection(nodes[0]) + create_ks(session, 'ks', 2) + + query = """ + CREATE TABLE counterTable ( + k int PRIMARY KEY, + c counter + ) + """ + query = query + "WITH compression = { 'sstable_compression' : 'SnappyCompressor' }" + + session.execute(query) + time.sleep(2) + + keys = list(range(0, 4)) + updates = 50 + + def make_updates(): + session = self.patient_cql_connection(nodes[0], keyspace='ks') + upd = "UPDATE counterTable SET c = c + 1 WHERE k = %d;" + batch = " ".join(["BEGIN COUNTER BATCH"] + [upd % x for x in keys] + ["APPLY BATCH;"]) + + for i in range(0, updates): + query = SimpleStatement(batch, consistency_level=ConsistencyLevel.QUORUM) + session.execute(query) + + def check(i): + session = self.patient_cql_connection(nodes[0], keyspace='ks') + query = SimpleStatement("SELECT * FROM counterTable", consistency_level=ConsistencyLevel.QUORUM) + rows = list(session.execute(query)) + + assert len(rows) == len(keys), "Expected {} rows, got {}: {}".format(len(keys), len(rows), str(rows)) + for row in rows: + assert row[1], i * updates == "Unexpected value {}".format(str(row)) + + def rolling_restart(): + # Rolling restart + for i in range(0, 2): + time.sleep(.2) + nodes[i].nodetool("drain") + nodes[i].stop(wait_other_notice=False) + nodes[i].start(wait_other_notice=True, wait_for_binary_proto=True) + time.sleep(.2) + + make_updates() + check(1) + rolling_restart() + + make_updates() + check(2) + rolling_restart() + + make_updates() + check(3) + rolling_restart() + + check(3) + + def test_counter_consistency(self): + """ + Do a bunch of writes with ONE, read back with ALL and check results. + """ + cluster = self.cluster + cluster.populate(3).start() + node1, node2, node3 = cluster.nodelist() + session = self.patient_cql_connection(node1) + create_ks(session, 'counter_tests', 3) + + stmt = """ + CREATE TABLE counter_table ( + id uuid PRIMARY KEY, + counter_one COUNTER, + counter_two COUNTER, + ) + """ + session.execute(stmt) + + counters = [] + # establish 50 counters (2x25 rows) + for i in range(25): + _id = str(uuid.uuid4()) + counters.append( + {_id: {'counter_one': 1, 'counter_two': 1}} + ) + + query = SimpleStatement(""" + UPDATE counter_table + SET counter_one = counter_one + 1, counter_two = counter_two + 1 + where id = {uuid}""".format(uuid=_id), consistency_level=ConsistencyLevel.ONE) + session.execute(query) + + # increment a bunch of counters with CL.ONE + for i in range(10000): + counter = counters[random.randint(0, len(counters) - 1)] + counter_id = list(counter.keys())[0] + + query = SimpleStatement(""" + UPDATE counter_table + SET counter_one = counter_one + 2 + where id = {uuid}""".format(uuid=counter_id), consistency_level=ConsistencyLevel.ONE) + session.execute(query) + + query = SimpleStatement(""" + UPDATE counter_table + SET counter_two = counter_two + 10 + where id = {uuid}""".format(uuid=counter_id), consistency_level=ConsistencyLevel.ONE) + session.execute(query) + + query = SimpleStatement(""" + UPDATE counter_table + SET counter_one = counter_one - 1 + where id = {uuid}""".format(uuid=counter_id), consistency_level=ConsistencyLevel.ONE) + session.execute(query) + + query = SimpleStatement(""" + UPDATE counter_table + SET counter_two = counter_two - 5 + where id = {uuid}""".format(uuid=counter_id), consistency_level=ConsistencyLevel.ONE) + session.execute(query) + + # update expectations to match (assumed) db state + counter[counter_id]['counter_one'] += 1 + counter[counter_id]['counter_two'] += 5 + + # let's verify the counts are correct, using CL.ALL + for counter_dict in counters: + counter_id = list(counter_dict.keys())[0] + + query = SimpleStatement(""" + SELECT counter_one, counter_two + FROM counter_table WHERE id = {uuid} + """.format(uuid=counter_id), consistency_level=ConsistencyLevel.ALL) + rows = list(session.execute(query)) + + counter_one_actual, counter_two_actual = rows[0] + + assert counter_one_actual == counter_dict[counter_id]['counter_one'] + assert counter_two_actual == counter_dict[counter_id]['counter_two'] + + def test_multi_counter_update(self): + """ + Test for singlular update statements that will affect multiple counters. + """ + cluster = self.cluster + cluster.populate(3).start() + node1, node2, node3 = cluster.nodelist() + session = self.patient_cql_connection(node1) + create_ks(session, 'counter_tests', 3) + + session.execute(""" + CREATE TABLE counter_table ( + id text, + myuuid uuid, + counter_one COUNTER, + PRIMARY KEY (id, myuuid)) + """) + + expected_counts = {} + + # set up expectations + for i in range(1, 6): + _id = uuid.uuid4() + + expected_counts[_id] = i + + for k, v in list(expected_counts.items()): + session.execute(""" + UPDATE counter_table set counter_one = counter_one + {v} + WHERE id='foo' and myuuid = {k} + """.format(k=k, v=v)) + + for k, v in list(expected_counts.items()): + count = list(session.execute(""" + SELECT counter_one FROM counter_table + WHERE id = 'foo' and myuuid = {k} + """.format(k=k))) + + assert v == count[0][0] + + @since("2.0", max_version="3.X") + def test_validate_empty_column_name(self): + cluster = self.cluster + cluster.populate(1).start() + node1 = cluster.nodelist()[0] + session = self.patient_cql_connection(node1) + create_ks(session, 'counter_tests', 1) + + session.execute(""" + CREATE TABLE compact_counter_table ( + pk int, + ck text, + value counter, + PRIMARY KEY (pk, ck)) + WITH COMPACT STORAGE + """) + + assert_invalid(session, "UPDATE compact_counter_table SET value = value + 1 WHERE pk = 0 AND ck = ''") + assert_invalid(session, "UPDATE compact_counter_table SET value = value - 1 WHERE pk = 0 AND ck = ''") + + session.execute("UPDATE compact_counter_table SET value = value + 5 WHERE pk = 0 AND ck = 'ck'") + session.execute("UPDATE compact_counter_table SET value = value - 2 WHERE pk = 0 AND ck = 'ck'") + + assert_one(session, "SELECT pk, ck, value FROM compact_counter_table", [0, 'ck', 3]) + + @since('2.0') + def test_drop_counter_column(self): + """Test for CASSANDRA-7831""" + cluster = self.cluster + cluster.populate(1).start() + node1, = cluster.nodelist() + session = self.patient_cql_connection(node1) + create_ks(session, 'counter_tests', 1) + + session.execute("CREATE TABLE counter_bug (t int, c counter, primary key(t))") + + session.execute("UPDATE counter_bug SET c = c + 1 where t = 1") + row = list(session.execute("SELECT * from counter_bug")) + + assert rows_to_list(row)[0] == [1, 1] + assert len(row) == 1 + + session.execute("ALTER TABLE counter_bug drop c") + + assert_invalid(session, "ALTER TABLE counter_bug add c counter", "Cannot re-add previously dropped counter column c") + + @since("2.0", max_version="3.X") # Compact Storage + def test_compact_counter_cluster(self): + """ + @jira_ticket CASSANDRA-12219 + This test will fail on 3.0.0 - 3.0.8, and 3.1 - 3.8 + """ + cluster = self.cluster + cluster.populate(3).start() + node1 = cluster.nodelist()[0] + session = self.patient_cql_connection(node1) + create_ks(session, 'counter_tests', 1) + + session.execute(""" + CREATE TABLE IF NOT EXISTS counter_cs ( + key bigint PRIMARY KEY, + data counter + ) WITH COMPACT STORAGE + """) + + for outer in range(0, 5): + for idx in range(0, 5): + session.execute("UPDATE counter_cs SET data = data + 1 WHERE key = {k}".format(k=idx)) + + for idx in range(0, 5): + row = list(session.execute("SELECT data from counter_cs where key = {k}".format(k=idx))) + assert rows_to_list(row)[0][0] == 5 http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/counter_tests.py ---------------------------------------------------------------------- diff --git a/counter_tests.py b/counter_tests.py deleted file mode 100644 index 1de495d..0000000 --- a/counter_tests.py +++ /dev/null @@ -1,414 +0,0 @@ -import random -import time -import uuid - -from cassandra import ConsistencyLevel -from cassandra.query import SimpleStatement - -from tools.assertions import assert_invalid, assert_length_equal, assert_one -from dtest import Tester, create_ks, create_cf -from tools.data import rows_to_list -from tools.decorators import since - - -class TestCounters(Tester): - - @since('3.0', max_version='3.12') - def test_13691(self): - """ - 2.0 -> 2.1 -> 3.0 counters upgrade test - @jira_ticket CASSANDRA-13691 - """ - cluster = self.cluster - default_install_dir = cluster.get_install_dir() - - # - # set up a 2.0 cluster with 3 nodes and set up schema - # - - cluster.set_install_dir(version='2.0.17') - cluster.populate(3) - cluster.start() - - node1, node2, node3 = cluster.nodelist() - - session = self.patient_cql_connection(node1) - session.execute(""" - CREATE KEYSPACE test - WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3}; - """) - session.execute("CREATE TABLE test.test (id int PRIMARY KEY, c counter);") - - # - # generate some 2.0 counter columns with local shards - # - - query = "UPDATE test.test SET c = c + 1 WHERE id = ?" - prepared = session.prepare(query) - for i in range(0, 1000): - session.execute(prepared, [i]) - - cluster.flush() - cluster.stop() - - # - # upgrade cluster to 2.1 - # - - cluster.set_install_dir(version='2.1.17') - cluster.start() - cluster.nodetool("upgradesstables") - - # - # upgrade node3 to current (3.0.x or 3.11.x) - # - - node3.stop(wait_other_notice=True) - node3.set_install_dir(install_dir=default_install_dir) - node3.start(wait_other_notice=True) - - # - # with a 2.1 coordinator, try to read the table with CL.ALL - # - - session = self.patient_cql_connection(node1, consistency_level=ConsistencyLevel.ALL) - assert_one(session, "SELECT COUNT(*) FROM test.test", [1000]) - - def counter_leader_with_partial_view_test(self): - """ - Test leader election with a starting node. - - Testing that nodes do not elect as mutation leader a node with a partial view on the cluster. - Note that byteman rules can be syntax checked via the following command: - sh ./bin/bytemancheck.sh -cp ~/path_to/apache-cassandra-3.0.14-SNAPSHOT.jar ~/path_to/rule.btm - - @jira_ticket CASSANDRA-13043 - """ - cluster = self.cluster - - cluster.populate(3, use_vnodes=True, install_byteman=True) - nodes = cluster.nodelist() - # Have node 1 and 3 cheat a bit during the leader election for a counter mutation; note that cheating - # takes place iff there is an actual chance for node 2 to be picked. - if cluster.version() < '4.0': - nodes[0].update_startup_byteman_script('./byteman/pre4.0/election_counter_leader_favor_node2.btm') - nodes[2].update_startup_byteman_script('./byteman/pre4.0/election_counter_leader_favor_node2.btm') - else: - nodes[0].update_startup_byteman_script('./byteman/4.0/election_counter_leader_favor_node2.btm') - nodes[2].update_startup_byteman_script('./byteman/4.0/election_counter_leader_favor_node2.btm') - - cluster.start(wait_for_binary_proto=True) - session = self.patient_cql_connection(nodes[0]) - create_ks(session, 'ks', 3) - create_cf(session, 'cf', validation="CounterColumnType", columns={'c': 'counter'}) - - # Now stop the node and restart but first install a rule to slow down how fast node 2 will update the list - # nodes that are alive - nodes[1].stop(wait=True, wait_other_notice=False) - nodes[1].update_startup_byteman_script('./byteman/gossip_alive_callback_sleep.btm') - nodes[1].start(no_wait=True, wait_other_notice=False) - - # Until node 2 is fully alive try to force other nodes to pick him as mutation leader. - # If CASSANDRA-13043 is fixed, they will not. Otherwise they will do, but since we are slowing down how - # fast node 2 updates the list of nodes that are alive, it will just have a partial view on the cluster - # and thus will raise an 'UnavailableException' exception. - nb_attempts = 50000 - for i in xrange(0, nb_attempts): - # Change the name of the counter for the sake of randomization - q = SimpleStatement( - query_string="UPDATE ks.cf SET c = c + 1 WHERE key = 'counter_%d'" % i, - consistency_level=ConsistencyLevel.QUORUM - ) - session.execute(q) - - def simple_increment_test(self): - """ Simple incrementation test (Created for #3465, that wasn't a bug) """ - cluster = self.cluster - - cluster.populate(3).start() - nodes = cluster.nodelist() - - session = self.patient_cql_connection(nodes[0]) - create_ks(session, 'ks', 3) - create_cf(session, 'cf', validation="CounterColumnType", columns={'c': 'counter'}) - - sessions = [self.patient_cql_connection(node, 'ks') for node in nodes] - nb_increment = 50 - nb_counter = 10 - - for i in xrange(0, nb_increment): - for c in xrange(0, nb_counter): - session = sessions[(i + c) % len(nodes)] - query = SimpleStatement("UPDATE cf SET c = c + 1 WHERE key = 'counter%i'" % c, consistency_level=ConsistencyLevel.QUORUM) - session.execute(query) - - session = sessions[i % len(nodes)] - keys = ",".join(["'counter%i'" % c for c in xrange(0, nb_counter)]) - query = SimpleStatement("SELECT key, c FROM cf WHERE key IN (%s)" % keys, consistency_level=ConsistencyLevel.QUORUM) - res = list(session.execute(query)) - - assert_length_equal(res, nb_counter) - for c in xrange(0, nb_counter): - self.assertEqual(len(res[c]), 2, "Expecting key and counter for counter {}, got {}".format(c, str(res[c]))) - self.assertEqual(res[c][1], i + 1, "Expecting counter {} = {}, got {}".format(c, i + 1, res[c][0])) - - def upgrade_test(self): - """ Test for bug of #4436 """ - - cluster = self.cluster - - cluster.populate(2).start() - nodes = cluster.nodelist() - - session = self.patient_cql_connection(nodes[0]) - create_ks(session, 'ks', 2) - - query = """ - CREATE TABLE counterTable ( - k int PRIMARY KEY, - c counter - ) - """ - query = query + "WITH compression = { 'sstable_compression' : 'SnappyCompressor' }" - - session.execute(query) - time.sleep(2) - - keys = range(0, 4) - updates = 50 - - def make_updates(): - session = self.patient_cql_connection(nodes[0], keyspace='ks') - upd = "UPDATE counterTable SET c = c + 1 WHERE k = %d;" - batch = " ".join(["BEGIN COUNTER BATCH"] + [upd % x for x in keys] + ["APPLY BATCH;"]) - - for i in range(0, updates): - query = SimpleStatement(batch, consistency_level=ConsistencyLevel.QUORUM) - session.execute(query) - - def check(i): - session = self.patient_cql_connection(nodes[0], keyspace='ks') - query = SimpleStatement("SELECT * FROM counterTable", consistency_level=ConsistencyLevel.QUORUM) - rows = list(session.execute(query)) - - self.assertEqual(len(rows), len(keys), "Expected {} rows, got {}: {}".format(len(keys), len(rows), str(rows))) - for row in rows: - self.assertEqual(row[1], i * updates, "Unexpected value {}".format(str(row))) - - def rolling_restart(): - # Rolling restart - for i in range(0, 2): - time.sleep(.2) - nodes[i].nodetool("drain") - nodes[i].stop(wait_other_notice=False) - nodes[i].start(wait_other_notice=True, wait_for_binary_proto=True) - time.sleep(.2) - - make_updates() - check(1) - rolling_restart() - - make_updates() - check(2) - rolling_restart() - - make_updates() - check(3) - rolling_restart() - - check(3) - - def counter_consistency_test(self): - """ - Do a bunch of writes with ONE, read back with ALL and check results. - """ - cluster = self.cluster - cluster.populate(3).start() - node1, node2, node3 = cluster.nodelist() - session = self.patient_cql_connection(node1) - create_ks(session, 'counter_tests', 3) - - stmt = """ - CREATE TABLE counter_table ( - id uuid PRIMARY KEY, - counter_one COUNTER, - counter_two COUNTER, - ) - """ - session.execute(stmt) - - counters = [] - # establish 50 counters (2x25 rows) - for i in xrange(25): - _id = str(uuid.uuid4()) - counters.append( - {_id: {'counter_one': 1, 'counter_two': 1}} - ) - - query = SimpleStatement(""" - UPDATE counter_table - SET counter_one = counter_one + 1, counter_two = counter_two + 1 - where id = {uuid}""".format(uuid=_id), consistency_level=ConsistencyLevel.ONE) - session.execute(query) - - # increment a bunch of counters with CL.ONE - for i in xrange(10000): - counter = counters[random.randint(0, len(counters) - 1)] - counter_id = counter.keys()[0] - - query = SimpleStatement(""" - UPDATE counter_table - SET counter_one = counter_one + 2 - where id = {uuid}""".format(uuid=counter_id), consistency_level=ConsistencyLevel.ONE) - session.execute(query) - - query = SimpleStatement(""" - UPDATE counter_table - SET counter_two = counter_two + 10 - where id = {uuid}""".format(uuid=counter_id), consistency_level=ConsistencyLevel.ONE) - session.execute(query) - - query = SimpleStatement(""" - UPDATE counter_table - SET counter_one = counter_one - 1 - where id = {uuid}""".format(uuid=counter_id), consistency_level=ConsistencyLevel.ONE) - session.execute(query) - - query = SimpleStatement(""" - UPDATE counter_table - SET counter_two = counter_two - 5 - where id = {uuid}""".format(uuid=counter_id), consistency_level=ConsistencyLevel.ONE) - session.execute(query) - - # update expectations to match (assumed) db state - counter[counter_id]['counter_one'] += 1 - counter[counter_id]['counter_two'] += 5 - - # let's verify the counts are correct, using CL.ALL - for counter_dict in counters: - counter_id = counter_dict.keys()[0] - - query = SimpleStatement(""" - SELECT counter_one, counter_two - FROM counter_table WHERE id = {uuid} - """.format(uuid=counter_id), consistency_level=ConsistencyLevel.ALL) - rows = list(session.execute(query)) - - counter_one_actual, counter_two_actual = rows[0] - - self.assertEqual(counter_one_actual, counter_dict[counter_id]['counter_one']) - self.assertEqual(counter_two_actual, counter_dict[counter_id]['counter_two']) - - def multi_counter_update_test(self): - """ - Test for singlular update statements that will affect multiple counters. - """ - cluster = self.cluster - cluster.populate(3).start() - node1, node2, node3 = cluster.nodelist() - session = self.patient_cql_connection(node1) - create_ks(session, 'counter_tests', 3) - - session.execute(""" - CREATE TABLE counter_table ( - id text, - myuuid uuid, - counter_one COUNTER, - PRIMARY KEY (id, myuuid)) - """) - - expected_counts = {} - - # set up expectations - for i in range(1, 6): - _id = uuid.uuid4() - - expected_counts[_id] = i - - for k, v in expected_counts.items(): - session.execute(""" - UPDATE counter_table set counter_one = counter_one + {v} - WHERE id='foo' and myuuid = {k} - """.format(k=k, v=v)) - - for k, v in expected_counts.items(): - count = list(session.execute(""" - SELECT counter_one FROM counter_table - WHERE id = 'foo' and myuuid = {k} - """.format(k=k))) - - self.assertEqual(v, count[0][0]) - - @since("2.0", max_version="3.X") - def validate_empty_column_name_test(self): - cluster = self.cluster - cluster.populate(1).start() - node1 = cluster.nodelist()[0] - session = self.patient_cql_connection(node1) - create_ks(session, 'counter_tests', 1) - - session.execute(""" - CREATE TABLE compact_counter_table ( - pk int, - ck text, - value counter, - PRIMARY KEY (pk, ck)) - WITH COMPACT STORAGE - """) - - assert_invalid(session, "UPDATE compact_counter_table SET value = value + 1 WHERE pk = 0 AND ck = ''") - assert_invalid(session, "UPDATE compact_counter_table SET value = value - 1 WHERE pk = 0 AND ck = ''") - - session.execute("UPDATE compact_counter_table SET value = value + 5 WHERE pk = 0 AND ck = 'ck'") - session.execute("UPDATE compact_counter_table SET value = value - 2 WHERE pk = 0 AND ck = 'ck'") - - assert_one(session, "SELECT pk, ck, value FROM compact_counter_table", [0, 'ck', 3]) - - @since('2.0') - def drop_counter_column_test(self): - """Test for CASSANDRA-7831""" - cluster = self.cluster - cluster.populate(1).start() - node1, = cluster.nodelist() - session = self.patient_cql_connection(node1) - create_ks(session, 'counter_tests', 1) - - session.execute("CREATE TABLE counter_bug (t int, c counter, primary key(t))") - - session.execute("UPDATE counter_bug SET c = c + 1 where t = 1") - row = list(session.execute("SELECT * from counter_bug")) - - self.assertEqual(rows_to_list(row)[0], [1, 1]) - self.assertEqual(len(row), 1) - - session.execute("ALTER TABLE counter_bug drop c") - - assert_invalid(session, "ALTER TABLE counter_bug add c counter", "Cannot re-add previously dropped counter column c") - - @since("2.0", max_version="3.X") # Compact Storage - def compact_counter_cluster_test(self): - """ - @jira_ticket CASSANDRA-12219 - This test will fail on 3.0.0 - 3.0.8, and 3.1 - 3.8 - """ - - cluster = self.cluster - cluster.populate(3).start() - node1 = cluster.nodelist()[0] - session = self.patient_cql_connection(node1) - create_ks(session, 'counter_tests', 1) - - session.execute(""" - CREATE TABLE IF NOT EXISTS counter_cs ( - key bigint PRIMARY KEY, - data counter - ) WITH COMPACT STORAGE - """) - - for outer in range(0, 5): - for idx in range(0, 5): - session.execute("UPDATE counter_cs SET data = data + 1 WHERE key = {k}".format(k=idx)) - - for idx in range(0, 5): - row = list(session.execute("SELECT data from counter_cs where key = {k}".format(k=idx))) - self.assertEqual(rows_to_list(row)[0][0], 5) http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/cql_prepared_test.py ---------------------------------------------------------------------- diff --git a/cql_prepared_test.py b/cql_prepared_test.py index 0dfe6f0..c039b90 100644 --- a/cql_prepared_test.py +++ b/cql_prepared_test.py @@ -1,7 +1,11 @@ import time +import pytest +import logging from dtest import Tester, create_ks -from tools.decorators import since + +since = pytest.mark.since +logger = logging.getLogger(__name__) @since("1.2") @@ -18,7 +22,7 @@ class TestCQL(Tester): create_ks(session, 'ks', 1) return session - def batch_preparation_test(self): + def test_batch_preparation(self): """ Test preparation of batch statement (#4202) """ session = self.prepare() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org