http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/upgrade_tests/thrift_upgrade_test.py ---------------------------------------------------------------------- diff --git a/upgrade_tests/thrift_upgrade_test.py b/upgrade_tests/thrift_upgrade_test.py index 0943d8a..42343a2 100644 --- a/upgrade_tests/thrift_upgrade_test.py +++ b/upgrade_tests/thrift_upgrade_test.py @@ -1,21 +1,21 @@ -# coding: utf-8 - import itertools -from unittest import skipUnless +import pytest +import logging from cassandra.query import dict_factory -from nose.tools import assert_equal, assert_not_in -from dtest import RUN_STATIC_UPGRADE_MATRIX, Tester, debug -from thrift_bindings.v22 import Cassandra -from thrift_bindings.v22.Cassandra import (Column, ColumnDef, +from dtest import RUN_STATIC_UPGRADE_MATRIX, Tester +from thrift_bindings.thrift010 import Cassandra +from thrift_bindings.thrift010.Cassandra import (Column, ColumnDef, ColumnParent, ConsistencyLevel, SlicePredicate, SliceRange) -from thrift_tests import _i64, get_thrift_client +from thrift_test import _i64, get_thrift_client from tools.assertions import assert_length_equal -from tools.decorators import since -from upgrade_base import UpgradeTester -from upgrade_manifest import build_upgrade_pairs +from .upgrade_base import UpgradeTester +from .upgrade_manifest import build_upgrade_pairs + +since = pytest.mark.since +logger = logging.getLogger(__name__) def _create_dense_super_cf(name): @@ -36,20 +36,20 @@ def _create_sparse_super_cf(name): subcomparator_type='AsciiType') -def _validate_sparse_cql(cursor, cf='sparse_super_1', column1=u'column1', col1=u'col1', col2=u'col2', key='key'): +def _validate_sparse_cql(cursor, cf='sparse_super_1', column1='column1', col1='col1', col2='col2', key='key'): cursor.execute('use ks') - assert_equal(list(cursor.execute("SELECT * FROM {}".format(cf))), + assert (list(cursor.execute("SELECT * FROM {}".format(cf))) == [{key: 'k1', column1: 'key1', col1: 200, col2: 300}, {key: 'k1', column1: 'key2', col1: 200, col2: 300}, {key: 'k2', column1: 'key1', col1: 200, col2: 300}, {key: 'k2', column1: 'key2', col1: 200, col2: 300}]) - assert_equal(list(cursor.execute("SELECT * FROM {} WHERE {} = 'k1'".format(cf, key))), + assert (list(cursor.execute("SELECT * FROM {} WHERE {} = 'k1'".format(cf, key))) == [{key: 'k1', column1: 'key1', col1: 200, col2: 300}, {key: 'k1', column1: 'key2', col1: 200, col2: 300}]) - assert_equal(list(cursor.execute("SELECT * FROM {} WHERE {} = 'k2' AND {} = 'key1'".format(cf, key, column1))), + assert (list(cursor.execute("SELECT * FROM {} WHERE {} = 'k2' AND {} = 'key1'".format(cf, key, column1))) == [{key: 'k2', column1: 'key1', col1: 200, col2: 300}]) @@ -58,35 +58,35 @@ def _validate_sparse_thrift(client, cf='sparse_super_1'): client.set_keyspace('ks') result = client.get_slice('k1', ColumnParent(cf), SlicePredicate(slice_range=SliceRange('', '', False, 5)), ConsistencyLevel.ONE) assert_length_equal(result, 2) - assert_equal(result[0].super_column.name, 'key1') - assert_equal(result[1].super_column.name, 'key2') + assert result[0].super_column.name == 'key1' + assert result[1].super_column.name == 'key2' for cosc in result: - assert_equal(cosc.super_column.columns[0].name, 'col1') - assert_equal(cosc.super_column.columns[0].value, _i64(200)) - assert_equal(cosc.super_column.columns[1].name, 'col2') - assert_equal(cosc.super_column.columns[1].value, _i64(300)) - assert_equal(cosc.super_column.columns[2].name, 'value1') - assert_equal(cosc.super_column.columns[2].value, _i64(100)) + assert cosc.super_column.columns[0].name == 'col1' + assert cosc.super_column.columns[0].value == _i64(200) + assert cosc.super_column.columns[1].name == 'col2' + assert cosc.super_column.columns[1].value == _i64(300) + assert cosc.super_column.columns[2].name == 'value1' + assert cosc.super_column.columns[2].value == _i64(100) -def _validate_dense_cql(cursor, cf='dense_super_1', key=u'key', column1=u'column1', column2=u'column2', value=u'value'): +def _validate_dense_cql(cursor, cf='dense_super_1', key='key', column1='column1', column2='column2', value='value'): cursor.execute('use ks') - assert_equal(list(cursor.execute("SELECT * FROM {}".format(cf))), + assert (list(cursor.execute("SELECT * FROM {}".format(cf))) == [{key: 'k1', column1: 'key1', column2: 100, value: 'value1'}, {key: 'k1', column1: 'key2', column2: 100, value: 'value1'}, {key: 'k2', column1: 'key1', column2: 200, value: 'value2'}, {key: 'k2', column1: 'key2', column2: 200, value: 'value2'}]) - assert_equal(list(cursor.execute("SELECT * FROM {} WHERE {} = 'k1'".format(cf, key))), + assert (list(cursor.execute("SELECT * FROM {} WHERE {} = 'k1'".format(cf, key))) == [{key: 'k1', column1: 'key1', column2: 100, value: 'value1'}, {key: 'k1', column1: 'key2', column2: 100, value: 'value1'}]) - assert_equal(list(cursor.execute("SELECT * FROM {} WHERE {} = 'k1' AND {} = 'key1'".format(cf, key, column1))), + assert (list(cursor.execute("SELECT * FROM {} WHERE {} = 'k1' AND {} = 'key1'".format(cf, key, column1))) == [{key: 'k1', column1: 'key1', column2: 100, value: 'value1'}]) - assert_equal(list(cursor.execute("SELECT * FROM {} WHERE {} = 'k1' AND {} = 'key1' AND {} = 100".format(cf, key, column1, column2))), + assert (list(cursor.execute("SELECT * FROM {} WHERE {} = 'k1' AND {} = 'key1' AND {} = 100".format(cf, key, column1, column2))) == [{key: 'k1', column1: 'key1', column2: 100, value: 'value1'}]) @@ -95,24 +95,25 @@ def _validate_dense_thrift(client, cf='dense_super_1'): client.set_keyspace('ks') result = client.get_slice('k1', ColumnParent(cf), SlicePredicate(slice_range=SliceRange('', '', False, 5)), ConsistencyLevel.ONE) assert_length_equal(result, 2) - assert_equal(result[0].super_column.name, 'key1') - assert_equal(result[1].super_column.name, 'key2') + assert result[0].super_column.name == 'key1' + assert result[1].super_column.name == 'key2' - print(result[0]) - print(result[1]) + print((result[0])) + print((result[1])) for cosc in result: - assert_equal(cosc.super_column.columns[0].name, _i64(100)) - assert_equal(cosc.super_column.columns[0].value, 'value1') + assert cosc.super_column.columns[0].name == _i64(100) + assert cosc.super_column.columns[0].value == 'value1' -class UpgradeSuperColumnsThrough(Tester): +@pytest.mark.upgrade_test +class TestUpgradeSuperColumnsThrough(Tester): def upgrade_to_version(self, tag, nodes=None): - debug('Upgrading to ' + tag) + logger.debug('Upgrading to ' + tag) if nodes is None: nodes = self.cluster.nodelist() for node in nodes: - debug('Shutting down node: ' + node.name) + logger.debug('Shutting down node: ' + node.name) node.drain() node.watch_log_for("DRAINED") node.stop(wait_other_notice=False) @@ -121,12 +122,12 @@ class UpgradeSuperColumnsThrough(Tester): for node in nodes: node.set_install_dir(version=tag) node.set_configuration_options(values={'start_rpc': 'true'}) - debug("Set new cassandra dir for %s: %s" % (node.name, node.get_install_dir())) + logger.debug("Set new cassandra dir for %s: %s" % (node.name, node.get_install_dir())) self.cluster.set_install_dir(version=tag) # Restart nodes on new version for node in nodes: - debug('Starting %s on new version (%s)' % (node.name, tag)) + logger.debug('Starting %s on new version (%s)' % (node.name, tag)) # Setup log4j / logback again (necessary moving from 2.0 -> 2.1): node.set_log_level("INFO") node.start(wait_other_notice=True, wait_for_binary_proto=True) @@ -145,7 +146,7 @@ class UpgradeSuperColumnsThrough(Tester): cluster.start() return cluster - def dense_supercolumn_3_0_created_test(self): + def test_dense_supercolumn_3_0_created(self): cluster = self.prepare(cassandra_version='github:apache/cassandra-3.0') node = self.cluster.nodelist()[0] cursor = self.patient_cql_connection(node, row_factory=dict_factory) @@ -160,7 +161,7 @@ class UpgradeSuperColumnsThrough(Tester): client.system_add_column_family(_create_dense_super_cf('dense_super_1')) - for i in xrange(1, 3): + for i in range(1, 3): client.insert('k1', ColumnParent('dense_super_1', 'key{}'.format(i)), Column(_i64(100), 'value1', 0), ConsistencyLevel.ONE) client.insert('k2', ColumnParent('dense_super_1', 'key{}'.format(i)), Column(_i64(200), 'value2', 0), ConsistencyLevel.ONE) @@ -177,7 +178,7 @@ class UpgradeSuperColumnsThrough(Tester): _validate_dense_thrift(client, cf='dense_super_1') _validate_dense_cql(cursor, cf='dense_super_1') - def dense_supercolumn_test(self): + def test_dense_supercolumn(self): cluster = self.prepare() node = self.cluster.nodelist()[0] node.nodetool("enablethrift") @@ -193,7 +194,7 @@ class UpgradeSuperColumnsThrough(Tester): client.system_add_column_family(_create_dense_super_cf('dense_super_1')) - for i in xrange(1, 3): + for i in range(1, 3): client.insert('k1', ColumnParent('dense_super_1', 'key{}'.format(i)), Column(_i64(100), 'value1', 0), ConsistencyLevel.ONE) client.insert('k2', ColumnParent('dense_super_1', 'key{}'.format(i)), Column(_i64(200), 'value2', 0), ConsistencyLevel.ONE) @@ -218,7 +219,7 @@ class UpgradeSuperColumnsThrough(Tester): _validate_dense_thrift(client, cf='dense_super_1') _validate_dense_cql(cursor, cf='dense_super_1') - def sparse_supercolumn_test(self): + def test_sparse_supercolumn(self): cluster = self.prepare() node = self.cluster.nodelist()[0] node.nodetool("enablethrift") @@ -235,7 +236,7 @@ class UpgradeSuperColumnsThrough(Tester): cf = _create_sparse_super_cf('sparse_super_2') client.system_add_column_family(cf) - for i in xrange(1, 3): + for i in range(1, 3): client.insert('k1', ColumnParent('sparse_super_2', 'key{}'.format(i)), Column("value1", _i64(100), 0), ConsistencyLevel.ONE) client.insert('k1', ColumnParent('sparse_super_2', 'key{}'.format(i)), Column("col1", _i64(200), 0), ConsistencyLevel.ONE) client.insert('k1', ColumnParent('sparse_super_2', 'key{}'.format(i)), Column("col2", _i64(300), 0), ConsistencyLevel.ONE) @@ -266,6 +267,7 @@ class UpgradeSuperColumnsThrough(Tester): _validate_sparse_cql(cursor, cf='sparse_super_2') +@pytest.mark.upgrade_test @since('2.1', max_version='4.0.0') class TestThrift(UpgradeTester): """ @@ -275,7 +277,7 @@ class TestThrift(UpgradeTester): @jira_ticket CASSANDRA-12373 """ - def dense_supercolumn_test(self): + def test_dense_supercolumn(self): cursor = self.prepare(nodes=2, rf=2, row_factory=dict_factory) cluster = self.cluster @@ -289,7 +291,7 @@ class TestThrift(UpgradeTester): client.system_add_column_family(_create_dense_super_cf('dense_super_1')) - for i in xrange(1, 3): + for i in range(1, 3): client.insert('k1', ColumnParent('dense_super_1', 'key{}'.format(i)), Column(_i64(100), 'value1', 0), ConsistencyLevel.ONE) client.insert('k2', ColumnParent('dense_super_1', 'key{}'.format(i)), Column(_i64(200), 'value2', 0), ConsistencyLevel.ONE) @@ -297,12 +299,12 @@ class TestThrift(UpgradeTester): _validate_dense_thrift(client) for is_upgraded, cursor in self.do_upgrade(cursor, row_factory=dict_factory, use_thrift=True): - debug("Querying {} node".format("upgraded" if is_upgraded else "old")) + logger.debug("Querying {} node".format("upgraded" if is_upgraded else "old")) client = get_thrift_client(host, port) _validate_dense_cql(cursor) _validate_dense_thrift(client) - def dense_supercolumn_test_with_renames(self): + def test_dense_supercolumn_with_renames(self): cursor = self.prepare(row_factory=dict_factory) cluster = self.cluster @@ -317,7 +319,7 @@ class TestThrift(UpgradeTester): client.system_add_column_family(_create_dense_super_cf('dense_super_2')) - for i in xrange(1, 3): + for i in range(1, 3): client.insert('k1', ColumnParent('dense_super_2', 'key{}'.format(i)), Column(_i64(100), 'value1', 0), ConsistencyLevel.ONE) client.insert('k2', ColumnParent('dense_super_2', 'key{}'.format(i)), Column(_i64(200), 'value2', 0), ConsistencyLevel.ONE) @@ -326,16 +328,16 @@ class TestThrift(UpgradeTester): cursor.execute("ALTER TABLE ks.dense_super_2 RENAME column2 TO renamed_column2") cursor.execute("ALTER TABLE ks.dense_super_2 RENAME value TO renamed_value") - _validate_dense_cql(cursor, cf='dense_super_2', key=u'renamed_key', column1=u'renamed_column1', column2=u'renamed_column2', value=u'renamed_value') + _validate_dense_cql(cursor, cf='dense_super_2', key='renamed_key', column1='renamed_column1', column2='renamed_column2', value='renamed_value') _validate_dense_thrift(client, cf='dense_super_2') for is_upgraded, cursor in self.do_upgrade(cursor, row_factory=dict_factory, use_thrift=True): - debug("Querying {} node".format("upgraded" if is_upgraded else "old")) + logger.debug("Querying {} node".format("upgraded" if is_upgraded else "old")) client = get_thrift_client(host, port) - _validate_dense_cql(cursor, cf='dense_super_2', key=u'renamed_key', column1=u'renamed_column1', column2=u'renamed_column2', value=u'renamed_value') + _validate_dense_cql(cursor, cf='dense_super_2', key='renamed_key', column1='renamed_column1', column2='renamed_column2', value='renamed_value') _validate_dense_thrift(client, cf='dense_super_2') - def sparse_supercolumn_test_with_renames(self): + def test_sparse_supercolumn_with_renames(self): cursor = self.prepare(row_factory=dict_factory) cluster = self.cluster @@ -354,7 +356,7 @@ class TestThrift(UpgradeTester): cursor.execute("ALTER TABLE ks.sparse_super_1 RENAME key TO renamed_key") cursor.execute("ALTER TABLE ks.sparse_super_1 RENAME column1 TO renamed_column1") - for i in xrange(1, 3): + for i in range(1, 3): client.insert('k1', ColumnParent('sparse_super_1', 'key{}'.format(i)), Column("value1", _i64(100), 0), ConsistencyLevel.ONE) client.insert('k1', ColumnParent('sparse_super_1', 'key{}'.format(i)), Column("col1", _i64(200), 0), ConsistencyLevel.ONE) client.insert('k1', ColumnParent('sparse_super_1', 'key{}'.format(i)), Column("col2", _i64(300), 0), ConsistencyLevel.ONE) @@ -364,15 +366,15 @@ class TestThrift(UpgradeTester): client.insert('k2', ColumnParent('sparse_super_1', 'key{}'.format(i)), Column("col2", _i64(300), 0), ConsistencyLevel.ONE) _validate_sparse_thrift(client) - _validate_sparse_cql(cursor, column1=u'renamed_column1', key=u'renamed_key') + _validate_sparse_cql(cursor, column1='renamed_column1', key='renamed_key') for is_upgraded, cursor in self.do_upgrade(cursor, row_factory=dict_factory, use_thrift=True): - debug("Querying {} node".format("upgraded" if is_upgraded else "old")) + logger.debug("Querying {} node".format("upgraded" if is_upgraded else "old")) client = get_thrift_client(host, port) - _validate_sparse_cql(cursor, column1=u'renamed_column1', key=u'renamed_key') + _validate_sparse_cql(cursor, column1='renamed_column1', key='renamed_key') _validate_sparse_thrift(client) - def sparse_supercolumn_test(self): + def test_sparse_supercolumn(self): cursor = self.prepare(row_factory=dict_factory) cluster = self.cluster @@ -388,7 +390,7 @@ class TestThrift(UpgradeTester): cf = _create_sparse_super_cf('sparse_super_2') client.system_add_column_family(cf) - for i in xrange(1, 3): + for i in range(1, 3): client.insert('k1', ColumnParent('sparse_super_2', 'key{}'.format(i)), Column("value1", _i64(100), 0), ConsistencyLevel.ONE) client.insert('k1', ColumnParent('sparse_super_2', 'key{}'.format(i)), Column("col1", _i64(200), 0), ConsistencyLevel.ONE) client.insert('k1', ColumnParent('sparse_super_2', 'key{}'.format(i)), Column("col2", _i64(300), 0), ConsistencyLevel.ONE) @@ -401,7 +403,7 @@ class TestThrift(UpgradeTester): _validate_sparse_cql(cursor, cf='sparse_super_2') for is_upgraded, cursor in self.do_upgrade(cursor, row_factory=dict_factory, use_thrift=True): - debug("Querying {} node".format("upgraded" if is_upgraded else "old")) + logger.debug("Querying {} node".format("upgraded" if is_upgraded else "old")) client = get_thrift_client(host, port) _validate_sparse_thrift(client, cf='sparse_super_2') _validate_sparse_cql(cursor, cf='sparse_super_2') @@ -422,7 +424,9 @@ for spec in specs: rf=spec['RF'], pathname=spec['UPGRADE_PATH'].name) gen_class_name = TestThrift.__name__ + suffix - assert_not_in(gen_class_name, globals()) + assert gen_class_name not in globals() upgrade_applies_to_env = RUN_STATIC_UPGRADE_MATRIX or spec['UPGRADE_PATH'].upgrade_meta.matches_current_env_version_family - globals()[gen_class_name] = skipUnless(upgrade_applies_to_env, 'test not applicable to env.')(type(gen_class_name, (TestThrift,), spec)) + if not upgrade_applies_to_env: + pytest.mark.skip(reason='test not applicable to env.') + globals()[gen_class_name] = type(gen_class_name, (TestThrift,), spec)
http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/upgrade_tests/upgrade_base.py ---------------------------------------------------------------------- diff --git a/upgrade_tests/upgrade_base.py b/upgrade_tests/upgrade_base.py index 484c4bf..cf5d46f 100644 --- a/upgrade_tests/upgrade_base.py +++ b/upgrade_tests/upgrade_base.py @@ -1,13 +1,17 @@ import os import sys import time +import pytest +import logging + from abc import ABCMeta -from unittest import skipIf from ccmlib.common import get_version_from_build, is_win from tools.jmxutils import remove_perf_disable_shared_mem -from dtest import CASSANDRA_VERSION_FROM_BUILD, TRACE, DEBUG, Tester, debug, create_ks +from dtest import CASSANDRA_VERSION_FROM_BUILD, Tester, create_ks + +logger = logging.getLogger(__name__) def switch_jdks(major_version_int): @@ -25,12 +29,13 @@ def switch_jdks(major_version_int): # don't change if the same version was requested current_java_home = os.environ.get('JAVA_HOME') if current_java_home != os.environ[new_java_home]: - debug("Switching jdk to version {} (JAVA_HOME is changing from {} to {})".format(major_version_int, current_java_home or 'undefined', os.environ[new_java_home])) + logger.debug("Switching jdk to version {} (JAVA_HOME is changing from {} to {})".format(major_version_int, current_java_home or 'undefined', os.environ[new_java_home])) os.environ['JAVA_HOME'] = os.environ[new_java_home] -@skipIf(sys.platform == 'win32', 'Skip upgrade tests on Windows') -class UpgradeTester(Tester): +@pytest.mark.upgrade_test +@pytest.mark.skipif(sys.platform == 'win32', reason='Skip upgrade tests on Windows') +class UpgradeTester(Tester, metaclass=ABCMeta): """ When run in 'normal' upgrade mode without specifying any version to run, this will test different upgrade paths depending on what version of C* you @@ -38,35 +43,27 @@ class UpgradeTester(Tester): When run on 3.0, this will test the upgrade path to trunk. When run on versions above 3.0, this will test the upgrade path from 3.0 to HEAD. """ - # make this an abc so we can get all subclasses with __subclasses__() - __metaclass__ = ABCMeta NODES, RF, __test__, CL, UPGRADE_PATH = 2, 1, False, None, None - # known non-critical bug during teardown: - # https://issues.apache.org/jira/browse/CASSANDRA-12340 - if CASSANDRA_VERSION_FROM_BUILD < '2.2': - _known_teardown_race_error = ( - 'ScheduledThreadPoolExecutor$ScheduledFutureTask@[0-9a-f]+ ' - 'rejected from org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor' - ) - # don't alter ignore_log_patterns on the class, just the obj for this test - ignore_log_patterns = [_known_teardown_race_error] - - def __init__(self, *args, **kwargs): - try: - self.ignore_log_patterns - except AttributeError: - self.ignore_log_patterns = [] - - self.ignore_log_patterns = self.ignore_log_patterns[:] + [ + @pytest.fixture(autouse=True) + def fixture_add_additional_log_patterns(self, fixture_dtest_setup): + # known non-critical bug during teardown: + # https://issues.apache.org/jira/browse/CASSANDRA-12340 + if CASSANDRA_VERSION_FROM_BUILD < '2.2': + _known_teardown_race_error = ( + 'ScheduledThreadPoolExecutor$ScheduledFutureTask@[0-9a-f]+ ' + 'rejected from org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor' + ) + fixture_dtest_setup.ignore_log_patterns = fixture_dtest_setup.ignore_log_patterns \ + + [_known_teardown_race_error] + + fixture_dtest_setup.ignore_log_patterns = fixture_dtest_setup.ignore_log_patterns + [ r'RejectedExecutionException.*ThreadPoolExecutor has shut down', # see CASSANDRA-12364 ] - self.enable_for_jolokia = False - super(UpgradeTester, self).__init__(*args, **kwargs) def setUp(self): self.validate_class_config() - debug("Upgrade test beginning, setting CASSANDRA_VERSION to {}, and jdk to {}. (Prior values will be restored after test)." + logger.debug("Upgrade test beginning, setting CASSANDRA_VERSION to {}, and jdk to {}. (Prior values will be restored after test)." .format(self.UPGRADE_PATH.starting_version, self.UPGRADE_PATH.starting_meta.java_version)) switch_jdks(self.UPGRADE_PATH.starting_meta.java_version) os.environ['CASSANDRA_VERSION'] = self.UPGRADE_PATH.starting_version @@ -80,7 +77,7 @@ class UpgradeTester(Tester): cl = self.CL if cl is None else cl self.CL = cl # store for later use in do_upgrade - self.assertGreaterEqual(nodes, 2, "backwards compatibility tests require at least two nodes") + assert nodes, 2 >= "backwards compatibility tests require at least two nodes" self.protocol_version = protocol_version @@ -104,8 +101,8 @@ class UpgradeTester(Tester): cluster.populate(nodes) node1 = cluster.nodelist()[0] cluster.set_install_dir(version=self.UPGRADE_PATH.starting_version) - self.enable_for_jolokia = kwargs.pop('jolokia', False) - if self.enable_for_jolokia: + self.fixture_dtest_setup.enable_for_jolokia = kwargs.pop('jolokia', False) + if self.fixture_dtest_setup.enable_for_jolokia: remove_perf_disable_shared_mem(node1) cluster.start(wait_for_binary_proto=True) @@ -147,7 +144,7 @@ class UpgradeTester(Tester): if is_win() and self.cluster.version() <= '2.2': node1.mark_log_for_errors() - debug('upgrading node1 to {}'.format(self.UPGRADE_PATH.upgrade_version)) + logger.debug('upgrading node1 to {}'.format(self.UPGRADE_PATH.upgrade_version)) switch_jdks(self.UPGRADE_PATH.upgrade_meta.java_version) node1.set_install_dir(version=self.UPGRADE_PATH.upgrade_version) @@ -159,18 +156,19 @@ class UpgradeTester(Tester): # The since decorator can only check the starting version of the upgrade, # so here we check to new version of the upgrade as well. if hasattr(self, 'max_version') and self.max_version is not None and new_version_from_build >= self.max_version: - self.skip("Skipping test, new version {} is equal to or higher than max version {}".format(new_version_from_build, self.max_version)) + pytest.skip("Skipping test, new version {} is equal to or higher than " + "max version {}".format(new_version_from_build, self.max_version)) if (new_version_from_build >= '3' and self.protocol_version is not None and self.protocol_version < 3): - self.skip('Protocol version {} incompatible ' - 'with Cassandra version {}'.format(self.protocol_version, new_version_from_build)) - node1.set_log_level("DEBUG" if DEBUG else "TRACE" if TRACE else "INFO") + pytest.skip('Protocol version {} incompatible ' + 'with Cassandra version {}'.format(self.protocol_version, new_version_from_build)) + node1.set_log_level(logging.getLevelName(logging.root.level)) node1.set_configuration_options(values={'internode_compression': 'none'}) if use_thrift: node1.set_configuration_options(values={'start_rpc': 'true'}) - if self.enable_for_jolokia: + if self.fixture_dtest_setup.enable_for_jolokia: remove_perf_disable_shared_mem(node1) node1.start(wait_for_binary_proto=True, wait_other_notice=True) @@ -223,7 +221,7 @@ class UpgradeTester(Tester): Used in places where is_upgraded was used to determine if the node version was >=2.2. """ node_versions = self.get_node_versions() - self.assertLessEqual(len({v.vstring for v in node_versions}), 2) + assert len({v.vstring for v in node_versions}) <= 2 return max(node_versions) if is_upgraded else min(node_versions) def tearDown(self): @@ -246,4 +244,4 @@ class UpgradeTester(Tester): if subclasses else '') ) - self.assertIsNotNone(self.UPGRADE_PATH, no_upgrade_path_error) + assert self.UPGRADE_PATH is not None, no_upgrade_path_error http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/upgrade_tests/upgrade_compact_storage.py ---------------------------------------------------------------------- diff --git a/upgrade_tests/upgrade_compact_storage.py b/upgrade_tests/upgrade_compact_storage.py index 085d3a3..ed85515 100644 --- a/upgrade_tests/upgrade_compact_storage.py +++ b/upgrade_tests/upgrade_compact_storage.py @@ -1,28 +1,30 @@ -# coding: utf-8 - import time +import pytest +import logging from cassandra.query import dict_factory -from nose.tools import assert_equal, assert_true from ccmlib.node import NodeError -from dtest import Tester, debug +from dtest import Tester from cassandra.protocol import ConfigurationException -from tools.decorators import since + +since = pytest.mark.since +logger = logging.getLogger(__name__) VERSION_311 = 'github:apache/cassandra-3.11' VERSION_TRUNK = 'github:apache/trunk' +@pytest.mark.upgrade_test @since('4.0') -class UpgradeSuperColumnsThrough(Tester): +class TestUpgradeSuperColumnsThrough(Tester): def upgrade_to_version(self, tag, start_rpc=True, wait=True, nodes=None): - debug('Upgrading to ' + tag) + logger.debug('Upgrading to ' + tag) if nodes is None: nodes = self.cluster.nodelist() for node in nodes: - debug('Shutting down node: ' + node.name) + logger.debug('Shutting down node: ' + node.name) node.drain() node.watch_log_for("DRAINED") node.stop(wait_other_notice=False) @@ -30,12 +32,12 @@ class UpgradeSuperColumnsThrough(Tester): # Update Cassandra Directory for node in nodes: node.set_install_dir(version=tag) - debug("Set new cassandra dir for %s: %s" % (node.name, node.get_install_dir())) + logger.debug("Set new cassandra dir for %s: %s" % (node.name, node.get_install_dir())) self.cluster.set_install_dir(version=tag) # Restart nodes on new version for node in nodes: - debug('Starting %s on new version (%s)' % (node.name, tag)) + logger.debug('Starting %s on new version (%s)' % (node.name, tag)) node.start(wait_other_notice=wait, wait_for_binary_proto=wait) def prepare(self, num_nodes=1, cassandra_version="github:apache/cassandra-2.2"): @@ -49,7 +51,7 @@ class UpgradeSuperColumnsThrough(Tester): cluster.start() return cluster - def upgrade_compact_storage_test(self): + def test_upgrade_compact_storage(self): cluster = self.prepare(cassandra_version='github:apache/cassandra-3.0') node = self.cluster.nodelist()[0] session = self.patient_cql_connection(node, row_factory=dict_factory) @@ -57,18 +59,18 @@ class UpgradeSuperColumnsThrough(Tester): session.execute("CREATE KEYSPACE ks WITH replication = {'class': 'SimpleStrategy','replication_factor': '1' };") session.execute("CREATE TABLE ks.compact_table (pk int PRIMARY KEY, col1 int, col2 int) WITH COMPACT STORAGE") - for i in xrange(1, 5): + for i in range(1, 5): session.execute("INSERT INTO ks.compact_table (pk, col1, col2) VALUES ({i}, {i}, {i})".format(i=i)) self.upgrade_to_version(VERSION_TRUNK, wait=False) - self.allow_log_errors = True + self.fixture_dtest_setup.allow_log_errors = True time.sleep(5) # After restart, it won't start errors = len(node.grep_log("Compact Tables are not allowed in Cassandra starting with 4.0 version")) - assert_true(errors > 0) + assert errors > 0 - def mixed_cluster_test(self): + def test_mixed_cluster(self): cluster = self.prepare(num_nodes=2, cassandra_version=VERSION_311) node1, node2 = self.cluster.nodelist() @@ -88,9 +90,9 @@ class UpgradeSuperColumnsThrough(Tester): except ConfigurationException: thrown = True - assert_true(thrown) + assert thrown - def upgrade_with_dropped_compact_storage_test(self): + def test_upgrade_with_dropped_compact_storage(self): cluster = self.prepare(cassandra_version=VERSION_311) node = self.cluster.nodelist()[0] session = self.patient_cql_connection(node, row_factory=dict_factory) @@ -98,7 +100,7 @@ class UpgradeSuperColumnsThrough(Tester): session.execute("CREATE KEYSPACE ks WITH replication = {'class': 'SimpleStrategy','replication_factor': '1' };") session.execute("CREATE TABLE ks.compact_table (pk int PRIMARY KEY, col1 int, col2 int) WITH COMPACT STORAGE") - for i in xrange(1, 5): + for i in range(1, 5): session.execute("INSERT INTO ks.compact_table (pk, col1, col2) VALUES ({i}, {i}, {i})".format(i=i)) session.execute("ALTER TABLE ks.compact_table DROP COMPACT STORAGE") @@ -106,10 +108,10 @@ class UpgradeSuperColumnsThrough(Tester): self.upgrade_to_version(VERSION_TRUNK, wait=True) session = self.patient_cql_connection(node, row_factory=dict_factory) - assert_equal(list(session.execute("SELECT * FROM ks.compact_table WHERE pk = 1")), - [{u'col2': 1, u'pk': 1, u'column1': None, u'value': None, u'col1': 1}]) + assert (list(session.execute("SELECT * FROM ks.compact_table WHERE pk = 1")) == + [{'col2': 1, 'pk': 1, 'column1': None, 'value': None, 'col1': 1}]) - def force_readd_compact_storage_test(self): + def test_force_readd_compact_storage(self): cluster = self.prepare(cassandra_version=VERSION_311) node = self.cluster.nodelist()[0] session = self.patient_cql_connection(node, row_factory=dict_factory) @@ -117,7 +119,7 @@ class UpgradeSuperColumnsThrough(Tester): session.execute("CREATE KEYSPACE ks WITH replication = {'class': 'SimpleStrategy','replication_factor': '1' };") session.execute("CREATE TABLE ks.compact_table (pk int PRIMARY KEY, col1 int, col2 int) WITH COMPACT STORAGE") - for i in xrange(1, 5): + for i in range(1, 5): session.execute("INSERT INTO ks.compact_table (pk, col1, col2) VALUES ({i}, {i}, {i})".format(i=i)) session.execute("ALTER TABLE ks.compact_table DROP COMPACT STORAGE") @@ -127,23 +129,23 @@ class UpgradeSuperColumnsThrough(Tester): session = self.patient_cql_connection(node, row_factory=dict_factory) session.execute("update system_schema.tables set flags={} where keyspace_name='ks' and table_name='compact_table';") - assert_equal(list(session.execute("SELECT * FROM ks.compact_table WHERE pk = 1")), - [{u'col2': 1, u'pk': 1, u'column1': None, u'value': None, u'col1': 1}]) + assert (list(session.execute("SELECT * FROM ks.compact_table WHERE pk = 1")) == + [{'col2': 1, 'pk': 1, 'column1': None, 'value': None, 'col1': 1}]) - self.allow_log_errors = True + self.fixture_dtest_setup.allow_log_errors = True node.stop(wait_other_notice=False) node.set_install_dir(version=VERSION_TRUNK) try: node.start(wait_other_notice=False, wait_for_binary_proto=False, verbose=False) except (NodeError): - print "error" # ignore + print("error") # ignore time.sleep(5) # After restart, it won't start errors = len(node.grep_log("Compact Tables are not allowed in Cassandra starting with 4.0 version")) - assert_true(errors > 0) + assert errors > 0 - def upgrade_with_dropped_compact_storage_index_test(self): + def test_upgrade_with_dropped_compact_storage_index(self): cluster = self.prepare(cassandra_version=VERSION_311) node = self.cluster.nodelist()[0] session = self.patient_cql_connection(node, row_factory=dict_factory) @@ -152,25 +154,25 @@ class UpgradeSuperColumnsThrough(Tester): session.execute("CREATE TABLE ks.compact_table (pk ascii PRIMARY KEY, col1 ascii) WITH COMPACT STORAGE") session.execute("CREATE INDEX ON ks.compact_table(col1)") - for i in xrange(1, 10): + for i in range(1, 10): session.execute("INSERT INTO ks.compact_table (pk, col1) VALUES ('{pk}', '{col1}')".format(pk=i, col1=i * 10)) - assert_equal(list(session.execute("SELECT * FROM ks.compact_table WHERE col1 = '50'")), - [{u'pk': '5', u'col1': '50'}]) - assert_equal(list(session.execute("SELECT * FROM ks.compact_table WHERE pk = '5'")), - [{u'pk': '5', u'col1': '50'}]) + assert (list(session.execute("SELECT * FROM ks.compact_table WHERE col1 = '50'")) == + [{'pk': '5', 'col1': '50'}]) + assert (list(session.execute("SELECT * FROM ks.compact_table WHERE pk = '5'")) == + [{'pk': '5', 'col1': '50'}]) session.execute("ALTER TABLE ks.compact_table DROP COMPACT STORAGE") - assert_equal(list(session.execute("SELECT * FROM ks.compact_table WHERE col1 = '50'")), - [{u'col1': '50', u'column1': None, u'pk': '5', u'value': None}]) - assert_equal(list(session.execute("SELECT * FROM ks.compact_table WHERE pk = '5'")), - [{u'col1': '50', u'column1': None, u'pk': '5', u'value': None}]) + assert (list(session.execute("SELECT * FROM ks.compact_table WHERE col1 = '50'")) == + [{'col1': '50', 'column1': None, 'pk': '5', 'value': None}]) + assert (list(session.execute("SELECT * FROM ks.compact_table WHERE pk = '5'")) == + [{'col1': '50', 'column1': None, 'pk': '5', 'value': None}]) self.upgrade_to_version(VERSION_TRUNK, wait=True) session = self.patient_cql_connection(node, row_factory=dict_factory) - assert_equal(list(session.execute("SELECT * FROM ks.compact_table WHERE col1 = '50'")), - [{u'col1': '50', u'column1': None, u'pk': '5', u'value': None}]) - assert_equal(list(session.execute("SELECT * FROM ks.compact_table WHERE pk = '5'")), - [{u'col1': '50', u'column1': None, u'pk': '5', u'value': None}]) + assert (list(session.execute("SELECT * FROM ks.compact_table WHERE col1 = '50'")) == + [{'col1': '50', 'column1': None, 'pk': '5', 'value': None}]) + assert (list(session.execute("SELECT * FROM ks.compact_table WHERE pk = '5'")) == + [{'col1': '50', 'column1': None, 'pk': '5', 'value': None}]) http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/upgrade_tests/upgrade_manifest.py ---------------------------------------------------------------------- diff --git a/upgrade_tests/upgrade_manifest.py b/upgrade_tests/upgrade_manifest.py index d5ed776..ce9442f 100644 --- a/upgrade_tests/upgrade_manifest.py +++ b/upgrade_tests/upgrade_manifest.py @@ -1,7 +1,11 @@ +import logging + from collections import namedtuple from dtest import (CASSANDRA_GITREF, CASSANDRA_VERSION_FROM_BUILD, - RUN_STATIC_UPGRADE_MATRIX, debug) + RUN_STATIC_UPGRADE_MATRIX) + +logger = logging.getLogger(__name__) # UpgradePath's contain data about upgrade paths we wish to test # They also contain VersionMeta's for each version the path is testing @@ -156,18 +160,18 @@ def build_upgrade_pairs(): valid_upgrade_pairs = [] manifest = OVERRIDE_MANIFEST or MANIFEST - for origin_meta, destination_metas in manifest.items(): + for origin_meta, destination_metas in list(manifest.items()): for destination_meta in destination_metas: if not (origin_meta and destination_meta): # None means we don't care about that version, which means we don't care about iterations involving it either - debug("skipping class creation as a version is undefined (this is normal), versions: {} and {}".format(origin_meta, destination_meta)) + logger.debug("skipping class creation as a version is undefined (this is normal), versions: {} and {}".format(origin_meta, destination_meta)) continue if not _is_targeted_variant_combo(origin_meta, destination_meta): - debug("skipping class creation, no testing of '{}' to '{}' (for {} upgrade to {})".format(origin_meta.variant, destination_meta.variant, origin_meta.name, destination_meta.name)) + logger.debug("skipping class creation, no testing of '{}' to '{}' (for {} upgrade to {})".format(origin_meta.variant, destination_meta.variant, origin_meta.name, destination_meta.name)) continue if not _have_common_proto(origin_meta, destination_meta): - debug("skipping class creation, no compatible protocol version between {} and {}".format(origin_meta.name, destination_meta.name)) + logger.debug("skipping class creation, no compatible protocol version between {} and {}".format(origin_meta.name, destination_meta.name)) continue path_name = 'Upgrade_' + origin_meta.name + '_To_' + destination_meta.name @@ -177,7 +181,7 @@ def build_upgrade_pairs(): # looks like this test should actually run in the current env, so let's set the final version to match the env exactly oldmeta = destination_meta newmeta = destination_meta.clone_with_local_env_version() - debug("{} appears applicable to current env. Overriding final test version from {} to {}".format(path_name, oldmeta.version, newmeta.version)) + logger.debug("{} appears applicable to current env. Overriding final test version from {} to {}".format(path_name, oldmeta.version, newmeta.version)) destination_meta = newmeta valid_upgrade_pairs.append( http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/upgrade_tests/upgrade_schema_agreement_test.py ---------------------------------------------------------------------- diff --git a/upgrade_tests/upgrade_schema_agreement_test.py b/upgrade_tests/upgrade_schema_agreement_test.py index 8541411..4c8d942 100644 --- a/upgrade_tests/upgrade_schema_agreement_test.py +++ b/upgrade_tests/upgrade_schema_agreement_test.py @@ -1,8 +1,13 @@ import re import time +import pytest +import logging from ccmlib.node import TimeoutError -from dtest import Tester, debug +from dtest import Tester + +since = pytest.mark.since +logger = logging.getLogger(__name__) class TestSchemaAgreementUpgrade(Tester): @@ -33,15 +38,14 @@ class TestSchemaAgreementUpgrade(Tester): # The number of seconds we wait for schema migration log entries to verify migration_check_time = 30 - def __init__(self, *args, **kwargs): - self.ignore_log_patterns = [ + @pytest.fixture(autouse=True) + def fixture_add_additional_log_patterns(self, fixture_dtest_setup): + fixture_dtest_setup.ignore_log_patterns = ( # This one occurs if we do a non-rolling upgrade, the node # it's trying to send the migration to hasn't started yet, # and when it does, it gets replayed and everything is fine. r'Can\'t send migration request: node.*is down', - ] - - Tester.__init__(self, *args, **kwargs) + ) def _prepare(self, version, num_nodes=3): cluster = self.cluster @@ -65,7 +69,7 @@ class TestSchemaAgreementUpgrade(Tester): expressions = [" - [pP]ulling schema from endpoint", " - [Ss]ubmitting migration task", " - [Pp]ulled schema from endpoint"] - debug("Inspecting log files of {}...".format([n.name for n in nodes])) + logger.debug("Inspecting log files of {}...".format([n.name for n in nodes])) all_matchings = "" for node in nodes: try: @@ -73,11 +77,11 @@ class TestSchemaAgreementUpgrade(Tester): all_matchings = all_matchings + "\n{}: {}".format(node.name, matchings) except TimeoutError: # good - debug(" {}: log files don't show schema migration messages (good)".format(node.name)) + logger.debug(" {}: log files don't show schema migration messages (good)".format(node.name)) if all_matchings != "": msg = "Expected no schema migration log entries, but got:{}".format(all_matchings) - debug(msg) # debug message for the validation test case (3.0 vs 3.11.1) - self.fail(msg) + logger.debug(msg) # debug message for the validation test case (3.0 vs 3.11.1) + pytest.fail(msg) def _wait_for_status_normal(self, node, mark): # Wait until the node is in state NORMAL (otherwise we can expect @@ -86,13 +90,13 @@ class TestSchemaAgreementUpgrade(Tester): from_mark=mark, timeout=300, filename='debug.log') def _bounce_node(self, node): - debug("Bouncing {}...".format(node.name)) - debug(" Stopping...") + logger.debug("Bouncing {}...".format(node.name)) + logger.debug(" Stopping...") node.stop(wait_other_notice=False) # intentionally set to wait_other_notice=False mark = node.mark_log(filename='debug.log') - debug(" Starting...") + logger.debug(" Starting...") node.start(wait_other_notice=False) # intentionally set to wait_other_notice=False - debug(" Waiting for status NORMAL...") + logger.debug(" Waiting for status NORMAL...") self._wait_for_status_normal(node, mark) def _min_version(self, nodes): @@ -103,7 +107,7 @@ class TestSchemaAgreementUpgrade(Tester): min_version = 99.9 for node in nodes: short_version = node.get_base_cassandra_version() - debug("{} is on {} ({})".format(node.name, short_version, node.get_cassandra_version())) + logger.debug("{} is on {} ({})".format(node.name, short_version, node.get_cassandra_version())) if short_version < min_version: min_version = short_version return min_version @@ -131,29 +135,29 @@ class TestSchemaAgreementUpgrade(Tester): """ # prepare the cluster with initial version from the upgrade path - debug('Starting upgrade test with {}'.format(upgrade_path[0][1])) + logger.debug('Starting upgrade test with {}'.format(upgrade_path[0][1])) cluster = self._prepare(version=upgrade_path[0][1]) nodes = self.cluster.nodelist() # perform _rolling_ upgrades from one version to another for (gossip_log_with_product_version, version) in upgrade_path[1:]: - debug("") - debug("Upgrading cluster to {}".format(version)) + logger.debug("") + logger.debug("Upgrading cluster to {}".format(version)) cluster.set_install_dir(version=version) for node in nodes: other_nodes = [n for n in nodes if n != node] - debug("") - debug("Stopping {} for upgrade...".format(node.name)) + logger.debug("") + logger.debug("Stopping {} for upgrade...".format(node.name)) # needed to "patch" the config file (especially since 4.0) and get the correct version number node.set_install_dir(version=version) node.stop(wait_other_notice=False) # intentionally set to wait_other_notice=False # remember the logfile-mark when the node was upgraded upgrade_log_mark = node.mark_log(filename='debug.log') - debug("Starting upgraded {}...".format(node.name)) + logger.debug("Starting upgraded {}...".format(node.name)) node.start(wait_other_notice=False) # intentionally set to wait_other_notice=False # wait until the upgraded node is in status NORMAL @@ -161,25 +165,25 @@ class TestSchemaAgreementUpgrade(Tester): # If it's a 3.11.2 node, check that the correct schema version is announced min_version = self._min_version(nodes) - debug("Minimum version: {}".format(min_version)) + logger.debug("Minimum version: {}".format(min_version)) if gossip_log_with_product_version: # 3.11.2 nodes (and only 3.11.2) indicate whether they announce # a "3.0 compatible" or "real" "3.11" schema version. watch_part = "Gossiping my {} schema version".format("3.0 compatible" if min_version == 3.0 else "3.11") - debug("Inspecting log for '{}'...".format(watch_part)) + logger.debug("Inspecting log for '{}'...".format(watch_part)) matchings = node.watch_log_for(watch_part, from_mark=upgrade_log_mark, timeout=120, filename='debug.log') - debug(" Found: {}".format(matchings)) + logger.debug(" Found: {}".format(matchings)) # Only log the schema information for debug purposes here. Primarily want to catch the # schema migration race. for n in nodes: out, _, _ = n.nodetool("describecluster") - debug("nodetool describecluster of {}:".format(n.name)) - debug(out) + logger.debug("nodetool describecluster of {}:".format(n.name)) + logger.debug(out) # We expect no schema migrations at this point. self._set_verify_log_mark(other_nodes) - debug(" Sleep for {} seconds...".format(self.migration_check_time)) + logger.debug(" Sleep for {} seconds...".format(self.migration_check_time)) time.sleep(self.migration_check_time) self._expect_no_schema_migrations(other_nodes) @@ -188,11 +192,11 @@ class TestSchemaAgreementUpgrade(Tester): # the whole endpoint state to propagate - including the schema version, which, in theory, # should trigger the race. # It is expected, that the _other_ nodes do not try to pull the schema. - debug("") - debug("Try to trigger schema migration race by bouncing the upgraded node") + logger.debug("") + logger.debug("Try to trigger schema migration race by bouncing the upgraded node") self._bounce_node(node) self._set_verify_log_mark(other_nodes) - debug(" Sleep for {} seconds...".format(self.migration_check_time)) + logger.debug(" Sleep for {} seconds...".format(self.migration_check_time)) time.sleep(self.migration_check_time) self._expect_no_schema_migrations(other_nodes) @@ -201,8 +205,8 @@ class TestSchemaAgreementUpgrade(Tester): # only want to have one schema version. for n in nodes: out, _, _ = n.nodetool("describecluster") - debug("nodetool describecluster of {}:".format(n.name)) - debug(out) + logger.debug("nodetool describecluster of {}:".format(n.name)) + logger.debug(out) versions = out.split('Schema versions:')[1].strip() num_schemas = len(re.findall('\[.*?\]', versions)) self.assertEqual(num_schemas, 1, "Multiple schema versions detected on {}: {}".format(n.name, out)) http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/upgrade_tests/upgrade_supercolumns_test.py ---------------------------------------------------------------------- diff --git a/upgrade_tests/upgrade_supercolumns_test.py b/upgrade_tests/upgrade_supercolumns_test.py index 6ba9db1..28bf47c 100644 --- a/upgrade_tests/upgrade_supercolumns_test.py +++ b/upgrade_tests/upgrade_supercolumns_test.py @@ -1,12 +1,24 @@ import os +import pytest +import logging -from collections import OrderedDict - -from dtest import CASSANDRA_VERSION_FROM_BUILD, Tester, debug -from pycassa.pool import ConnectionPool -from pycassa.columnfamily import ColumnFamily +from dtest import CASSANDRA_VERSION_FROM_BUILD, Tester +from thrift_test import get_thrift_client from tools.assertions import assert_all +from thrift_bindings.thrift010.Cassandra import (CfDef, Column, ColumnDef, + ColumnOrSuperColumn, ColumnParent, + ColumnPath, ColumnSlice, + ConsistencyLevel, CounterColumn, + Deletion, IndexExpression, + IndexOperator, IndexType, + InvalidRequestException, KeyRange, + KeySlice, KsDef, MultiSliceRequest, + Mutation, NotFoundException, + SlicePredicate, SliceRange, + SuperColumn) + +logger = logging.getLogger(__name__) # Use static supercolumn data to reduce total test time and avoid driver issues connecting to C* 1.2. # The data contained in the SSTables is (name, {'attr': {'name': name}}) for the name in NAMES. @@ -15,28 +27,28 @@ TABLES_PATH = os.path.join("./", "upgrade_tests", "supercolumn-data", "cassandra NAMES = ["Alice", "Bob", "Claire", "Dave", "Ed", "Frank", "Grace"] +@pytest.mark.upgrade_test class TestSCUpgrade(Tester): """ Tests upgrade between a 2.0 cluster with predefined super columns and all other versions. Verifies data with both CQL and Thrift. """ - - def __init__(self, *args, **kwargs): - self.ignore_log_patterns = [ + @pytest.fixture(autouse=True) + def fixture_add_additional_log_patterns(self, fixture_dtest_setup): + fixture_dtest_setup.allow_log_errors = True + fixture_dtest_setup.ignore_log_patterns = ( # This one occurs if we do a non-rolling upgrade, the node # it's trying to send the migration to hasn't started yet, # and when it does, it gets replayed and everything is fine. r'Can\'t send migration request: node.*is down', - ] + ) if CASSANDRA_VERSION_FROM_BUILD < '2.2': _known_teardown_race_error = ( 'ScheduledThreadPoolExecutor$ScheduledFutureTask@[0-9a-f]+ ' 'rejected from org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor' ) # don't alter ignore_log_patterns on the class, just the obj for this test - self.ignore_log_patterns += [_known_teardown_race_error] - - Tester.__init__(self, *args, **kwargs) + fixture_dtest_setup.ignore_log_patterns += [_known_teardown_race_error] def prepare(self, num_nodes=1, cassandra_version="git:cassandra-2.1"): cluster = self.cluster @@ -54,15 +66,20 @@ class TestSCUpgrade(Tester): if self.cluster.version() >= '4': return - pool = ConnectionPool("supcols", pool_size=1) - super_col_fam = ColumnFamily(pool, "cols") + node = self.cluster.nodelist()[0] + host, port = node.network_interfaces['thrift'] + client = get_thrift_client(host, port) + client.transport.open() + client.set_keyspace('supcols') + p = SlicePredicate(slice_range=SliceRange('', '', False, 1000)) for name in NAMES: - super_col_value = super_col_fam.get(name) - self.assertEqual(OrderedDict([(('attr', u'name'), name)]), super_col_value) + super_col_value = client.get_slice(name, ColumnParent("cols"), p, ConsistencyLevel.ONE) + logger.debug("get_slice(%s) returned %s" % (name, super_col_value)) + assert name == super_col_value[0].column.value def verify_with_cql(self, session): session.execute("USE supcols") - expected = [[name, 'attr', u'name', name] for name in ['Grace', 'Claire', 'Dave', 'Frank', 'Ed', 'Bob', 'Alice']] + expected = [[name, 'attr', 'name', name] for name in ['Grace', 'Claire', 'Dave', 'Frank', 'Ed', 'Bob', 'Alice']] assert_all(session, "SELECT * FROM cols", expected) def _upgrade_super_columns_through_versions_test(self, upgrade_path): @@ -118,20 +135,20 @@ class TestSCUpgrade(Tester): cluster.remove(node=node1) - def upgrade_super_columns_through_all_versions_test(self): - self._upgrade_super_columns_through_versions_test(upgrade_path=['git:cassandra-2.2', 'git:cassandra-3.X', - 'git:trunk']) + def test_upgrade_super_columns_through_all_versions(self): + self._upgrade_super_columns_through_versions_test(upgrade_path=['git:cassandra-2.2', 'git:cassandra-3.0', + 'git:cassandra-3.11', 'git:trunk']) - def upgrade_super_columns_through_limited_versions_test(self): + def test_upgrade_super_columns_through_limited_versions(self): self._upgrade_super_columns_through_versions_test(upgrade_path=['git:cassandra-3.0', 'git:trunk']) def upgrade_to_version(self, tag, nodes=None): - debug('Upgrading to ' + tag) + logger.debug('Upgrading to ' + tag) if nodes is None: nodes = self.cluster.nodelist() for node in nodes: - debug('Shutting down node: ' + node.name) + logger.debug('Shutting down node: ' + node.name) node.drain() node.watch_log_for("DRAINED") node.stop(wait_other_notice=False) @@ -142,12 +159,12 @@ class TestSCUpgrade(Tester): if tag < "2.1": if "memtable_allocation_type" in node.config_options: node.config_options.__delitem__("memtable_allocation_type") - debug("Set new cassandra dir for %s: %s" % (node.name, node.get_install_dir())) + logger.debug("Set new cassandra dir for %s: %s" % (node.name, node.get_install_dir())) self.cluster.set_install_dir(version=tag) # Restart nodes on new version for node in nodes: - debug('Starting %s on new version (%s)' % (node.name, tag)) + logger.debug('Starting %s on new version (%s)' % (node.name, tag)) # Setup log4j / logback again (necessary moving from 2.0 -> 2.1): node.set_log_level("INFO") node.start(wait_other_notice=True, wait_for_binary_proto=True) http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/upgrade_tests/upgrade_through_versions_test.py ---------------------------------------------------------------------- diff --git a/upgrade_tests/upgrade_through_versions_test.py b/upgrade_tests/upgrade_through_versions_test.py index a825645..397ea15 100644 --- a/upgrade_tests/upgrade_through_versions_test.py +++ b/upgrade_tests/upgrade_through_versions_test.py @@ -5,24 +5,26 @@ import random import signal import time import uuid +import logging +import pytest +import psutil + from collections import defaultdict, namedtuple from multiprocessing import Process, Queue -from Queue import Empty, Full -from unittest import skipUnless +from queue import Empty, Full -import psutil from cassandra import ConsistencyLevel, WriteTimeout from cassandra.query import SimpleStatement -from nose.plugins.attrib import attr -from six import print_ -from dtest import RUN_STATIC_UPGRADE_MATRIX, Tester, debug +from dtest import RUN_STATIC_UPGRADE_MATRIX, Tester from tools.misc import generate_ssl_stores, new_node -from upgrade_base import switch_jdks -from upgrade_manifest import (build_upgrade_pairs, current_2_0_x, +from .upgrade_base import switch_jdks +from .upgrade_manifest import (build_upgrade_pairs, current_2_0_x, current_2_1_x, current_2_2_x, current_3_0_x, indev_2_2_x, indev_3_x) +logger = logging.getLogger(__name__) + def data_writer(tester, to_verify_queue, verification_done_queue, rewrite_probability=0): """ @@ -67,7 +69,7 @@ def data_writer(tester, to_verify_queue, verification_done_queue, rewrite_probab to_verify_queue.put_nowait((key, val,)) except Exception: - debug("Error in data writer process!") + logger.debug("Error in data writer process!") to_verify_queue.close() raise @@ -107,7 +109,7 @@ def data_checker(tester, to_verify_queue, verification_done_queue): time.sleep(0.1) # let's not eat CPU if the queue is empty continue except Exception: - debug("Error in data verifier process!") + logger.debug("Error in data verifier process!") verification_done_queue.close() raise else: @@ -165,7 +167,7 @@ def counter_incrementer(tester, to_verify_queue, verification_done_queue, rewrit to_verify_queue.put_nowait((key, count + 1,)) except Exception: - debug("Error in counter incrementer process!") + logger.debug("Error in counter incrementer process!") to_verify_queue.close() raise @@ -205,7 +207,7 @@ def counter_checker(tester, to_verify_queue, verification_done_queue): time.sleep(0.1) # let's not eat CPU if the queue is empty continue except Exception: - debug("Error in counter verifier process!") + logger.debug("Error in counter verifier process!") verification_done_queue.close() raise else: @@ -221,70 +223,69 @@ def counter_checker(tester, to_verify_queue, verification_done_queue): pass -@attr("resource-intensive") -class UpgradeTester(Tester): +@pytest.mark.upgrade_test +@pytest.mark.resource_intensive +class TestUpgrade(Tester): """ Upgrades a 3-node Murmur3Partitioner cluster through versions specified in test_version_metas. """ test_version_metas = None # set on init to know which versions to use subprocs = None # holds any subprocesses, for status checking and cleanup extra_config = None # holds a non-mutable structure that can be cast as dict() - __test__ = False # this is a base class only - ignore_log_patterns = ( - # This one occurs if we do a non-rolling upgrade, the node - # it's trying to send the migration to hasn't started yet, - # and when it does, it gets replayed and everything is fine. - r'Can\'t send migration request: node.*is down', - r'RejectedExecutionException.*ThreadPoolExecutor has shut down', - # Occurs due to test/ccm writing topo on down nodes - r'Cannot update data center or rack from.*for live host', - # Normal occurance. See CASSANDRA-12026. Likely won't be needed after C* 4.0. - r'Unknown column cdc during deserialization', - ) - def __init__(self, *args, **kwargs): - self.subprocs = [] - Tester.__init__(self, *args, **kwargs) + @pytest.fixture(autouse=True) + def fixture_add_additional_log_patterns(self, fixture_dtest_setup): + fixture_dtest_setup.ignore_log_patterns = ( + # This one occurs if we do a non-rolling upgrade, the node + # it's trying to send the migration to hasn't started yet, + # and when it does, it gets replayed and everything is fine. + r'Can\'t send migration request: node.*is down', + r'RejectedExecutionException.*ThreadPoolExecutor has shut down', + # Occurs due to test/ccm writing topo on down nodes + r'Cannot update data center or rack from.*for live host', + # Normal occurance. See CASSANDRA-12026. Likely won't be needed after C* 4.0. + r'Unknown column cdc during deserialization', + ) def setUp(self): - debug("Upgrade test beginning, setting CASSANDRA_VERSION to {}, and jdk to {}. (Prior values will be restored after test)." + logger.debug("Upgrade test beginning, setting CASSANDRA_VERSION to {}, and jdk to {}. (Prior values will be restored after test)." .format(self.test_version_metas[0].version, self.test_version_metas[0].java_version)) os.environ['CASSANDRA_VERSION'] = self.test_version_metas[0].version switch_jdks(self.test_version_metas[0].java_version) - super(UpgradeTester, self).setUp() - debug("Versions to test (%s): %s" % (type(self), str([v.version for v in self.test_version_metas]))) + super(TestUpgrade, self).setUp() + logger.debug("Versions to test (%s): %s" % (type(self), str([v.version for v in self.test_version_metas]))) def init_config(self): Tester.init_config(self) if self.extra_config is not None: - debug("Setting extra configuration options:\n{}".format( + logger.debug("Setting extra configuration options:\n{}".format( pprint.pformat(dict(self.extra_config), indent=4)) ) self.cluster.set_configuration_options( values=dict(self.extra_config) ) - def parallel_upgrade_test(self): + def test_parallel_upgrade(self): """ Test upgrading cluster all at once (requires cluster downtime). """ self.upgrade_scenario() - def rolling_upgrade_test(self): + def test_rolling_upgrade(self): """ Test rolling upgrade of the cluster, so we have mixed versions part way through. """ self.upgrade_scenario(rolling=True) - def parallel_upgrade_with_internode_ssl_test(self): + def test_parallel_upgrade_with_internode_ssl(self): """ Test upgrading cluster all at once (requires cluster downtime), with internode ssl. """ self.upgrade_scenario(internode_ssl=True) - def rolling_upgrade_with_internode_ssl_test(self): + def test_rolling_upgrade_with_internode_ssl(self): """ Rolling upgrade test using internode ssl. """ @@ -301,17 +302,17 @@ class UpgradeTester(Tester): cluster.set_configuration_options({'enable_user_defined_functions': 'true'}) if internode_ssl: - debug("***using internode ssl***") - generate_ssl_stores(self.test_path) - self.cluster.enable_internode_ssl(self.test_path) + logger.debug("***using internode ssl***") + generate_ssl_stores(self.fixture_dtest_setup.test_path) + self.cluster.enable_internode_ssl(self.fixture_dtest_setup.test_path) if populate: # Start with 3 node cluster - debug('Creating cluster (%s)' % self.test_version_metas[0].version) + logger.debug('Creating cluster (%s)' % self.test_version_metas[0].version) cluster.populate(3) [node.start(use_jna=True, wait_for_binary_proto=True) for node in cluster.nodelist()] else: - debug("Skipping cluster creation (should already be built)") + logger.debug("Skipping cluster creation (should already be built)") # add nodes to self for convenience for i, node in enumerate(cluster.nodelist(), 1): @@ -324,7 +325,7 @@ class UpgradeTester(Tester): else: self._create_schema() else: - debug("Skipping schema creation (should already be built)") + logger.debug("Skipping schema creation (should already be built)") time.sleep(5) # sigh... self._log_current_ver(self.test_version_metas[0]) @@ -344,8 +345,8 @@ class UpgradeTester(Tester): self.upgrade_to_version(version_meta, partial=True, nodes=(node,), internode_ssl=internode_ssl) - self._check_on_subprocs(self.subprocs) - debug('Successfully upgraded %d of %d nodes to %s' % + self._check_on_subprocs(self.fixture_dtest_setup.subprocs) + logger.debug('Successfully upgraded %d of %d nodes to %s' % (num + 1, len(self.cluster.nodelist()), version_meta.version)) self.cluster.set_install_dir(version=version_meta.version) @@ -375,7 +376,7 @@ class UpgradeTester(Tester): for call in after_upgrade_call: call() - debug('All nodes successfully upgraded to %s' % version_meta.version) + logger.debug('All nodes successfully upgraded to %s' % version_meta.version) self._log_current_ver(version_meta) cluster.stop() @@ -384,7 +385,7 @@ class UpgradeTester(Tester): # just to be super sure we get cleaned up self._terminate_subprocs() - super(UpgradeTester, self).tearDown() + super(TestUpgrade, self).tearDown() def _check_on_subprocs(self, subprocs): """ @@ -402,12 +403,12 @@ class UpgradeTester(Tester): raise RuntimeError(message) def _terminate_subprocs(self): - for s in self.subprocs: + for s in self.fixture_dtest_setup.subprocs: if s.is_alive(): try: psutil.Process(s.pid).kill() # with fire damnit except Exception: - debug("Error terminating subprocess. There could be a lingering process.") + logger.debug("Error terminating subprocess. There could be a lingering process.") pass def upgrade_to_version(self, version_meta, partial=False, nodes=None, internode_ssl=False): @@ -416,21 +417,21 @@ class UpgradeTester(Tester): that are specified by *nodes*, otherwise ignore *nodes* specified and upgrade all nodes. """ - debug('Upgrading {nodes} to {version}'.format(nodes=[n.name for n in nodes] if nodes is not None else 'all nodes', version=version_meta.version)) + logger.debug('Upgrading {nodes} to {version}'.format(nodes=[n.name for n in nodes] if nodes is not None else 'all nodes', version=version_meta.version)) switch_jdks(version_meta.java_version) - debug("JAVA_HOME: " + os.environ.get('JAVA_HOME')) + logger.debug("JAVA_HOME: " + os.environ.get('JAVA_HOME')) if not partial: nodes = self.cluster.nodelist() for node in nodes: - debug('Shutting down node: ' + node.name) + logger.debug('Shutting down node: ' + node.name) node.drain() node.watch_log_for("DRAINED") node.stop(wait_other_notice=False) for node in nodes: node.set_install_dir(version=version_meta.version) - debug("Set new cassandra dir for %s: %s" % (node.name, node.get_install_dir())) + logger.debug("Set new cassandra dir for %s: %s" % (node.name, node.get_install_dir())) if internode_ssl and version_meta.version >= '4.0': node.set_configuration_options({'server_encryption_options': {'enabled': True, 'enable_legacy_ssl_storage_port': True}}) @@ -441,7 +442,7 @@ class UpgradeTester(Tester): # Restart nodes on new version for node in nodes: - debug('Starting %s on new version (%s)' % (node.name, version_meta.version)) + logger.debug('Starting %s on new version (%s)' % (node.name, version_meta.version)) # Setup log4j / logback again (necessary moving from 2.0 -> 2.1): node.set_log_level("INFO") node.start(wait_other_notice=240, wait_for_binary_proto=True) @@ -453,7 +454,7 @@ class UpgradeTester(Tester): """ vers = [m.version for m in self.test_version_metas] curr_index = vers.index(current_version_meta.version) - debug( + logger.debug( "Current upgrade path: {}".format( vers[:curr_index] + ['***' + current_version_meta.version + '***'] + vers[curr_index + 1:])) @@ -496,7 +497,7 @@ class UpgradeTester(Tester): def _write_values(self, num=100): session = self.patient_cql_connection(self.node2, protocol_version=self.protocol_version) session.execute("use upgrade") - for i in xrange(num): + for i in range(num): x = len(self.row_values) + 1 session.execute("UPDATE cf SET v='%d' WHERE k=%d" % (x, x)) self.row_values.add(x) @@ -509,8 +510,8 @@ class UpgradeTester(Tester): query = SimpleStatement("SELECT k,v FROM cf WHERE k=%d" % x, consistency_level=consistency_level) result = session.execute(query) k, v = result[0] - self.assertEqual(x, k) - self.assertEqual(str(x), v) + assert x == k + assert str(x) == v def _wait_until_queue_condition(self, label, queue, opfunc, required_len, max_wait_s=600): """ @@ -528,14 +529,14 @@ class UpgradeTester(Tester): try: qsize = queue.qsize() except NotImplementedError: - debug("Queue size may not be checkable on Mac OS X. Test will continue without waiting.") + logger.debug("Queue size may not be checkable on Mac OS X. Test will continue without waiting.") break if opfunc(qsize, required_len): - debug("{} queue size ({}) is '{}' to {}. Continuing.".format(label, qsize, opfunc.__name__, required_len)) + logger.debug("{} queue size ({}) is '{}' to {}. Continuing.".format(label, qsize, opfunc.__name__, required_len)) break if divmod(round(time.time()), 30)[1] == 0: - debug("{} queue size is at {}, target is to reach '{}' {}".format(label, qsize, opfunc.__name__, required_len)) + logger.debug("{} queue size is at {}, target is to reach '{}' {}".format(label, qsize, opfunc.__name__, required_len)) time.sleep(0.1) continue @@ -559,7 +560,7 @@ class UpgradeTester(Tester): writer = Process(target=data_writer, args=(self, to_verify_queue, verification_done_queue, 25)) # daemon subprocesses are killed automagically when the parent process exits writer.daemon = True - self.subprocs.append(writer) + self.fixture_dtest_setup.subprocs.append(writer) writer.start() if wait_for_rowcount > 0: @@ -568,7 +569,7 @@ class UpgradeTester(Tester): verifier = Process(target=data_checker, args=(self, to_verify_queue, verification_done_queue)) # daemon subprocesses are killed automagically when the parent process exits verifier.daemon = True - self.subprocs.append(verifier) + self.fixture_dtest_setup.subprocs.append(verifier) verifier.start() return writer, verifier, to_verify_queue @@ -588,7 +589,7 @@ class UpgradeTester(Tester): incrementer = Process(target=data_writer, args=(self, to_verify_queue, verification_done_queue, 25)) # daemon subprocesses are killed automagically when the parent process exits incrementer.daemon = True - self.subprocs.append(incrementer) + self.fixture_dtest_setup.subprocs.append(incrementer) incrementer.start() if wait_for_rowcount > 0: @@ -597,13 +598,13 @@ class UpgradeTester(Tester): count_verifier = Process(target=data_checker, args=(self, to_verify_queue, verification_done_queue)) # daemon subprocesses are killed automagically when the parent process exits count_verifier.daemon = True - self.subprocs.append(count_verifier) + self.fixture_dtest_setup.subprocs.append(count_verifier) count_verifier.start() return incrementer, count_verifier, to_verify_queue def _increment_counters(self, opcount=25000): - debug("performing {opcount} counter increments".format(opcount=opcount)) + logger.debug("performing {opcount} counter increments".format(opcount=opcount)) session = self.patient_cql_connection(self.node2, protocol_version=self.protocol_version) session.execute("use upgrade;") @@ -616,7 +617,7 @@ class UpgradeTester(Tester): fail_count = 0 for i in range(opcount): - key1 = random.choice(self.expected_counts.keys()) + key1 = random.choice(list(self.expected_counts.keys())) key2 = random.randint(1, 10) try: query = SimpleStatement(update_counter_query.format(key1=key1, key2=key2), consistency_level=ConsistencyLevel.ALL) @@ -628,15 +629,15 @@ class UpgradeTester(Tester): if fail_count > 100: break - self.assertLess(fail_count, 100, "Too many counter increment failures") + assert fail_count, 100 < "Too many counter increment failures" def _check_counters(self): - debug("Checking counter values...") + logger.debug("Checking counter values...") session = self.patient_cql_connection(self.node2, protocol_version=self.protocol_version) session.execute("use upgrade;") - for key1 in self.expected_counts.keys(): - for key2 in self.expected_counts[key1].keys(): + for key1 in list(self.expected_counts.keys()): + for key2 in list(self.expected_counts[key1].keys()): expected_value = self.expected_counts[key1][key2] query = SimpleStatement("SELECT c from countertable where k1='{key1}' and k2={key2};".format(key1=key1, key2=key2), @@ -649,10 +650,10 @@ class UpgradeTester(Tester): # counter wasn't found actual_value = None - self.assertEqual(actual_value, expected_value) + assert actual_value == expected_value def _check_select_count(self, consistency_level=ConsistencyLevel.ALL): - debug("Checking SELECT COUNT(*)") + logger.debug("Checking SELECT COUNT(*)") session = self.patient_cql_connection(self.node2, protocol_version=self.protocol_version) session.execute("use upgrade;") @@ -663,7 +664,7 @@ class UpgradeTester(Tester): if result is not None: actual_num_rows = result[0][0] - self.assertEqual(actual_num_rows, expected_num_rows, "SELECT COUNT(*) returned %s when expecting %s" % (actual_num_rows, expected_num_rows)) + assert actual_num_rows == expected_num_rows, "SELECT COUNT(*) returned %s when expecting %s" % (actual_num_rows, expected_num_rows) else: self.fail("Count query did not return") @@ -678,7 +679,7 @@ class BootstrapMixin(object): def _bootstrap_new_node(self): # Check we can bootstrap a new node on the upgraded cluster: - debug("Adding a node to the cluster") + logger.debug("Adding a node to the cluster") nnode = new_node(self.cluster, remote_debug_port=str(2000 + len(self.cluster.nodes))) nnode.start(use_jna=True, wait_other_notice=240, wait_for_binary_proto=True) self._write_values() @@ -688,7 +689,7 @@ class BootstrapMixin(object): def _bootstrap_new_node_multidc(self): # Check we can bootstrap a new node on the upgraded cluster: - debug("Adding a node to the cluster") + logger.debug("Adding a node to the cluster") nnode = new_node(self.cluster, remote_debug_port=str(2000 + len(self.cluster.nodes)), data_center='dc2') nnode.start(use_jna=True, wait_other_notice=240, wait_for_binary_proto=True) @@ -697,11 +698,11 @@ class BootstrapMixin(object): self._check_values() self._check_counters() - def bootstrap_test(self): + def test_bootstrap(self): # try and add a new node self.upgrade_scenario(after_upgrade_call=(self._bootstrap_new_node,)) - def bootstrap_multidc_test(self): + def test_bootstrap_multidc(self): # try and add a new node # multi dc, 2 nodes in each dc cluster = self.cluster @@ -759,26 +760,26 @@ def create_upgrade_class(clsname, version_metas, protocol_version, extra_config = (('partitioner', 'org.apache.cassandra.dht.Murmur3Partitioner'),) if bootstrap_test: - parent_classes = (UpgradeTester, BootstrapMixin) + parent_classes = (TestUpgrade, BootstrapMixin) else: - parent_classes = (UpgradeTester,) + parent_classes = (TestUpgrade,) # short names for debug output parent_class_names = [cls.__name__ for cls in parent_classes] - print_("Creating test class {} ".format(clsname)) - print_(" for C* versions:\n{} ".format(pprint.pformat(version_metas))) - print_(" using protocol: v{}, and parent classes: {}".format(protocol_version, parent_class_names)) - print_(" to run these tests alone, use `nosetests {}.py:{}`".format(__name__, clsname)) + print("Creating test class {} ".format(clsname)) + print(" for C* versions:\n{} ".format(pprint.pformat(version_metas))) + print(" using protocol: v{}, and parent classes: {}".format(protocol_version, parent_class_names)) + print(" to run these tests alone, use `nosetests {}.py:{}`".format(__name__, clsname)) upgrade_applies_to_env = RUN_STATIC_UPGRADE_MATRIX or version_metas[-1].matches_current_env_version_family - - newcls = skipUnless(upgrade_applies_to_env, 'test not applicable to env.')( - type( + if not upgrade_applies_to_env: + pytest.mark.skip(reason='test not applicable to env.') + newcls = type( clsname, parent_classes, {'test_version_metas': version_metas, '__test__': True, 'protocol_version': protocol_version, 'extra_config': extra_config} - )) + ) if clsname in globals(): raise RuntimeError("Class by name already exists!") @@ -837,7 +838,7 @@ for upgrade in MULTI_UPGRADES: # looks like this test should actually run in the current env, so let's set the final version to match the env exactly oldmeta = metas[-1] newmeta = oldmeta.clone_with_local_env_version() - debug("{} appears applicable to current env. Overriding final test version from {} to {}".format(upgrade.name, oldmeta.version, newmeta.version)) + logger.debug("{} appears applicable to current env. Overriding final test version from {} to {}".format(upgrade.name, oldmeta.version, newmeta.version)) metas[-1] = newmeta create_upgrade_class(upgrade.name, [m for m in metas], protocol_version=upgrade.protocol_version, extra_config=upgrade.extra_config) http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/user_functions_test.py ---------------------------------------------------------------------- diff --git a/user_functions_test.py b/user_functions_test.py index 5982f28..7a10233 100644 --- a/user_functions_test.py +++ b/user_functions_test.py @@ -1,22 +1,39 @@ import math import time +import pytest +import logging + from distutils.version import LooseVersion from cassandra import FunctionFailure -from dtest import CASSANDRA_VERSION_FROM_BUILD, Tester, debug, create_ks +from dtest_setup_overrides import DTestSetupOverrides + +from dtest import CASSANDRA_VERSION_FROM_BUILD, Tester, create_ks from tools.assertions import assert_invalid, assert_none, assert_one -from tools.decorators import since from tools.misc import ImmutableMapping +since = pytest.mark.since +logger = logging.getLogger(__name__) + @since('2.2') class TestUserFunctions(Tester): - if CASSANDRA_VERSION_FROM_BUILD >= '3.0': - cluster_options = ImmutableMapping({'enable_user_defined_functions': 'true', - 'enable_scripted_user_defined_functions': 'true'}) - else: - cluster_options = ImmutableMapping({'enable_user_defined_functions': 'true'}) + + @pytest.fixture(scope='function', autouse=True) + def fixture_dtest_setup_overrides(self): + dtest_setup_overrides = DTestSetupOverrides() + if CASSANDRA_VERSION_FROM_BUILD >= '3.0': + dtest_setup_overrides.cluster_options = ImmutableMapping({'enable_user_defined_functions': 'true', + 'enable_scripted_user_defined_functions': 'true'}) + else: + dtest_setup_overrides.cluster_options = ImmutableMapping({'enable_user_defined_functions': 'true'}) + return dtest_setup_overrides + + @pytest.fixture(scope='function', autouse=True) + def parse_dtest_config(self, parse_dtest_config): + + return parse_dtest_config def prepare(self, create_keyspace=True, nodes=1, rf=1): cluster = self.cluster @@ -120,7 +137,7 @@ class TestUserFunctions(Tester): "CREATE FUNCTION bad_sin ( input double ) CALLED ON NULL INPUT RETURNS uuid LANGUAGE java AS 'return Math.sin(input);';", "Type mismatch: cannot convert from double to UUID") - def udf_overload_test(self): + def test_udf_overload(self): session = self.prepare(nodes=3) @@ -154,7 +171,7 @@ class TestUserFunctions(Tester): # should now work - unambiguous session.execute("DROP FUNCTION overloaded") - def udf_scripting_test(self): + def test_udf_scripting(self): session = self.prepare() session.execute("create table nums (key int primary key, val double);") @@ -177,7 +194,7 @@ class TestUserFunctions(Tester): assert_one(session, "select plustwo(key) from nums where key = 3", [5]) - def default_aggregate_test(self): + def test_default_aggregate(self): session = self.prepare() session.execute("create table nums (key int primary key, val double);") @@ -190,7 +207,7 @@ class TestUserFunctions(Tester): assert_one(session, "SELECT avg(val) FROM nums", [5.0]) assert_one(session, "SELECT count(*) FROM nums", [9]) - def aggregate_udf_test(self): + def test_aggregate_udf(self): session = self.prepare() session.execute("create table nums (key int primary key, val int);") @@ -209,7 +226,7 @@ class TestUserFunctions(Tester): assert_invalid(session, "create aggregate aggthree(int) sfunc test stype int finalfunc aggtwo") - def udf_with_udt_test(self): + def test_udf_with_udt(self): """ Test UDFs that operate on non-frozen UDTs. @jira_ticket CASSANDRA-7423 @@ -225,7 +242,7 @@ class TestUserFunctions(Tester): frozen_vals = (True,) for frozen in frozen_vals: - debug("Using {} UDTs".format("frozen" if frozen else "non-frozen")) + logger.debug("Using {} UDTs".format("frozen" if frozen else "non-frozen")) table_name = "tab_frozen" if frozen else "tab" column_type = "frozen<test>" if frozen else "test" @@ -240,7 +257,7 @@ class TestUserFunctions(Tester): assert_invalid(session, "drop type test;") @since('2.2') - def udf_with_udt_keyspace_isolation_test(self): + def test_udf_with_udt_keyspace_isolation(self): """ Ensure functions dont allow a UDT from another keyspace @jira_ticket CASSANDRA-9409 @@ -266,7 +283,7 @@ class TestUserFunctions(Tester): "Statement on keyspace user_ks cannot refer to a user type in keyspace ks" ) - def aggregate_with_udt_keyspace_isolation_test(self): + def test_aggregate_with_udt_keyspace_isolation(self): """ Ensure aggregates dont allow a UDT from another keyspace @jira_ticket CASSANDRA-9409 --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org