This is an automated email from the ASF dual-hosted git repository. aweisberg pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/cassandra-dtest.git
The following commit(s) were added to refs/heads/master by this push: new 84598f1 Reenable upgrade tests 84598f1 is described below commit 84598f11513f4c1dc0be4d7115a47b59940a649e Author: Ariel Weisberg <aweisb...@apple.com> AuthorDate: Wed Oct 31 16:17:17 2018 -0400 Reenable upgrade tests Patch by Ariel Weisberg; Reviewed by Dinesh Joshi for CASSANDRA-14421 Co-authored-by: Ariel Weisberg <aweisb...@apple.com> Co-authored-by: Dinesh A. Joshi <dinesh.jo...@apple.com> --- conftest.py | 63 +++-- cqlsh_tests/cqlsh_copy_tests.py | 1 + cqlsh_tests/cqlsh_tests.py | 3 + dtest.py | 9 +- dtest_setup.py | 28 +- pytest.ini | 3 +- sstable_generation_loading_test.py | 41 ++- tools/assertions.py | 8 + tools/misc.py | 22 +- tools/paging.py | 14 +- upgrade_tests/compatibility_flag_test.py | 2 + upgrade_tests/conftest.py | 4 + upgrade_tests/cql_tests.py | 249 +++++++++++++---- upgrade_tests/paging_test.py | 110 ++++---- upgrade_tests/regression_test.py | 10 +- upgrade_tests/repair_test.py | 4 +- upgrade_tests/storage_engine_upgrade_test.py | 120 ++++++--- upgrade_tests/thrift_upgrade_test.py | 356 +++++++++++++++++-------- upgrade_tests/upgrade_base.py | 30 ++- upgrade_tests/upgrade_compact_storage.py | 2 + upgrade_tests/upgrade_manifest.py | 131 +++++---- upgrade_tests/upgrade_schema_agreement_test.py | 2 + upgrade_tests/upgrade_supercolumns_test.py | 23 +- upgrade_tests/upgrade_through_versions_test.py | 87 +++--- 24 files changed, 922 insertions(+), 400 deletions(-) diff --git a/conftest.py b/conftest.py index 5b1a276..bfd4299 100644 --- a/conftest.py +++ b/conftest.py @@ -18,6 +18,7 @@ from netifaces import AF_INET from psutil import virtual_memory import netifaces as ni +import ccmlib.repository from ccmlib.common import validate_install_dir, is_win, get_version_from_build from dtest_config import DTestConfig @@ -75,6 +76,8 @@ def pytest_addoption(parser): "after the test completes") parser.addoption("--enable-jacoco-code-coverage", action="store_true", default=False, help="Enable JaCoCo Code Coverage Support") + parser.addoption("--upgrade-version-selection", action="store", default="indev", + help="Specify whether to run indev, releases, or both") def sufficient_system_resources_for_resource_intensive_tests(): @@ -364,12 +367,29 @@ def fixture_since(request, fixture_dtest_setup): since_str = request.node.get_closest_marker('since').args[0] since = LooseVersion(since_str) - # use cassandra_version_from_build as it's guaranteed to be a LooseVersion - # whereas cassandra_version may be a string if set in the cli options - current_running_version = fixture_dtest_setup.dtest_config.cassandra_version_from_build - skip_msg = _skip_msg(current_running_version, since, max_version) - if skip_msg: - pytest.skip(skip_msg) + # For upgrade tests don't run the test if any of the involved versions + # are excluded by the annotation + if hasattr(request.cls, "UPGRADE_PATH"): + upgrade_path = request.cls.UPGRADE_PATH + ccm_repo_cache_dir, _ = ccmlib.repository.setup(upgrade_path.starting_meta.version) + starting_version = get_version_from_build(ccm_repo_cache_dir) + skip_msg = _skip_msg(starting_version, since, max_version) + if skip_msg: + pytest.skip(skip_msg) + ccm_repo_cache_dir, _ = ccmlib.repository.setup(upgrade_path.upgrade_meta.version) + ending_version = get_version_from_build(ccm_repo_cache_dir) + skip_msg = _skip_msg(ending_version, since, max_version) + if skip_msg: + pytest.skip(skip_msg) + else: + # For regular tests the value in the current cluster actually means something so we should + # use that to check. + # Use cassandra_version_from_build as it's guaranteed to be a LooseVersion + # whereas cassandra_version may be a string if set in the cli options + current_running_version = fixture_dtest_setup.dtest_config.cassandra_version_from_build + skip_msg = _skip_msg(current_running_version, since, max_version) + if skip_msg: + pytest.skip(skip_msg) @pytest.fixture(autouse=True) @@ -409,13 +429,16 @@ def pytest_collection_modifyitems(items, config): This function is called upon during the pytest test collection phase and allows for modification of the test items within the list """ - if not config.getoption("--collect-only") and config.getoption("--cassandra-dir") is None: - if config.getoption("--cassandra-version") is None: + collect_only = config.getoption("--collect-only") + cassandra_dir = config.getoption("--cassandra-dir") + cassandra_version = config.getoption("--cassandra-version") + if not collect_only and cassandra_dir is None: + if cassandra_version is None: raise Exception("Required dtest arguments were missing! You must provide either --cassandra-dir " "or --cassandra-version. Refer to the documentation or invoke the help with --help.") # Either cassandra_version or cassandra_dir is defined, so figure out the version - CASSANDRA_VERSION = config.getoption("--cassandra-version") or get_version_from_build(config.getoption("--cassandra-dir")) + CASSANDRA_VERSION = cassandra_version or get_version_from_build(cassandra_dir) # Check that use_off_heap_memtables is supported in this c* version if config.getoption("--use-off-heap-memtables") and ("3.0" <= CASSANDRA_VERSION < "3.4"): @@ -433,16 +456,17 @@ def pytest_collection_modifyitems(items, config): for item in items: deselect_test = False - if item.get_closest_marker("resource_intensive"): - if config.getoption("--force-resource-intensive-tests"): - pass - if config.getoption("--skip-resource-intensive-tests"): - deselect_test = True - logger.info("SKIP: Deselecting test %s as test marked resource_intensive. To force execution of " - "this test re-run with the --force-resource-intensive-tests command line argument" % item.name) - if not sufficient_system_resources_resource_intensive: - deselect_test = True - logger.info("SKIP: Deselecting resource_intensive test %s due to insufficient system resources" % item.name) + if item.get_closest_marker("resource_intensive") and not collect_only: + force_resource_intensive = config.getoption("--force-resource-intensive-tests") + skip_resource_intensive = config.getoption("--skip-resource-intensive-tests") + if not force_resource_intensive: + if skip_resource_intensive: + deselect_test = True + logger.info("SKIP: Deselecting test %s as test marked resource_intensive. To force execution of " + "this test re-run with the --force-resource-intensive-tests command line argument" % item.name) + if not sufficient_system_resources_resource_intensive: + deselect_test = True + logger.info("SKIP: Deselecting resource_intensive test %s due to insufficient system resources" % item.name) if item.get_closest_marker("no_vnodes"): if config.getoption("--use-vnodes"): @@ -480,4 +504,3 @@ def pytest_collection_modifyitems(items, config): config.hook.pytest_deselected(items=deselected_items) items[:] = selected_items - diff --git a/cqlsh_tests/cqlsh_copy_tests.py b/cqlsh_tests/cqlsh_copy_tests.py index 9769fd7..662fb31 100644 --- a/cqlsh_tests/cqlsh_copy_tests.py +++ b/cqlsh_tests/cqlsh_copy_tests.py @@ -58,6 +58,7 @@ class UTC(datetime.tzinfo): return datetime.timedelta(0) +@pytest.mark.skip("These aren't functioning just yet") class TestCqlshCopy(Tester): """ Tests the COPY TO and COPY FROM features in cqlsh. diff --git a/cqlsh_tests/cqlsh_tests.py b/cqlsh_tests/cqlsh_tests.py index ef38ee2..e5c601c 100644 --- a/cqlsh_tests/cqlsh_tests.py +++ b/cqlsh_tests/cqlsh_tests.py @@ -27,6 +27,7 @@ since = pytest.mark.since logger = logging.getLogger(__name__) +@pytest.mark.skip("These aren't functioning just yet") class TestCqlsh(Tester): @classmethod @@ -1673,6 +1674,7 @@ Tracing session:""") return p.communicate() +@pytest.mark.skip("These aren't functioning just yet") class TestCqlshSmoke(Tester): """ Tests simple use cases for clqsh. @@ -1945,6 +1947,7 @@ class TestCqlshSmoke(Tester): return [table.name for table in list(self.session.cluster.metadata.keyspaces[keyspace].tables.values())] +@pytest.mark.skip("These aren't functioning just yet") class CqlLoginTest(Tester): """ Tests login which requires password authenticator diff --git a/dtest.py b/dtest.py index 095ccd0..9027d75 100644 --- a/dtest.py +++ b/dtest.py @@ -8,6 +8,8 @@ import sys import threading import time import traceback +from distutils.version import LooseVersion + import pytest import cassandra @@ -41,6 +43,8 @@ if len(config.read(os.path.expanduser('~/.cassandra-dtest'))) > 0: RUN_STATIC_UPGRADE_MATRIX = os.environ.get('RUN_STATIC_UPGRADE_MATRIX', '').lower() in ('yes', 'true') +MAJOR_VERSION_4 = LooseVersion('4.0') + logger = logging.getLogger(__name__) @@ -236,6 +240,7 @@ class Tester: def set_dtest_setup_on_function(self, fixture_dtest_setup): self.fixture_dtest_setup = fixture_dtest_setup self.dtest_config = fixture_dtest_setup.dtest_config + return None def set_node_to_current_version(self, node): version = os.environ.get('CASSANDRA_VERSION') @@ -454,12 +459,12 @@ def run_scenarios(scenarios, handler, deferred_exceptions=tuple()): try: handler(scenario) except deferred_exceptions as e: - tracebacks.append(traceback.format_exc(sys.exc_info())) + tracebacks.append(traceback.format_exc()) errors.append(type(e)('encountered {} {} running scenario:\n {}\n'.format(e.__class__.__name__, str(e), scenario))) logger.debug("scenario {}/{} encountered a deferrable exception, continuing".format(i, len(scenarios))) except Exception as e: # catch-all for any exceptions not intended to be deferred - tracebacks.append(traceback.format_exc(sys.exc_info())) + tracebacks.append(traceback.format_exc()) errors.append(type(e)('encountered {} {} running scenario:\n {}\n'.format(e.__class__.__name__, str(e), scenario))) logger.debug("scenario {}/{} encountered a non-deferrable exception, aborting".format(i, len(scenarios))) raise MultiError(errors, tracebacks) diff --git a/dtest_setup.py b/dtest_setup.py index 9e3f330..4b2ece9 100644 --- a/dtest_setup.py +++ b/dtest_setup.py @@ -420,6 +420,15 @@ class DTestSetup: if self.cluster.version() >= '4': values['corrupted_tombstone_strategy'] = 'exception' + if self.dtest_config.use_vnodes: + self.cluster.set_configuration_options( + values={'initial_token': None, 'num_tokens': self.dtest_config.num_tokens}) + else: + self.cluster.set_configuration_options(values={'num_tokens': None}) + + if self.dtest_config.use_off_heap_memtables: + self.cluster.set_configuration_options(values={'memtable_allocation_type': 'offheap_objects'}) + self.cluster.set_configuration_options(values) logger.debug("Done setting configuration options:\n" + pprint.pformat(self.cluster._config_options, indent=4)) @@ -454,7 +463,7 @@ class DTestSetup: @staticmethod def create_ccm_cluster(dtest_setup): - logger.debug("cluster ccm directory: " + dtest_setup.test_path) + logger.info("cluster ccm directory: " + dtest_setup.test_path) version = dtest_setup.dtest_config.cassandra_version if version: @@ -462,14 +471,6 @@ class DTestSetup: else: cluster = Cluster(dtest_setup.test_path, dtest_setup.cluster_name, cassandra_dir=dtest_setup.dtest_config.cassandra_dir) - if dtest_setup.dtest_config.use_vnodes: - cluster.set_configuration_options(values={'initial_token': None, 'num_tokens': dtest_setup.dtest_config.num_tokens}) - else: - cluster.set_configuration_options(values={'num_tokens': None}) - - if dtest_setup.dtest_config.use_off_heap_memtables: - cluster.set_configuration_options(values={'memtable_allocation_type': 'offheap_objects'}) - cluster.set_datadir_count(dtest_setup.dtest_config.data_dir_count) cluster.set_environment_variable('CASSANDRA_LIBJEMALLOC', dtest_setup.dtest_config.jemalloc_path) @@ -513,3 +514,12 @@ class DTestSetup: # write_last_test_file(cls.test_path, cls.cluster) # cls.post_initialize_cluster() + + def reinitialize_cluster_for_different_version(self): + """ + This method is used by upgrade tests to re-init the cluster to work with a specific + version that may not be compatible with the existing configuration options + """ + self.init_default_config() + + diff --git a/pytest.ini b/pytest.ini index f088fa5..c8e85d4 100644 --- a/pytest.ini +++ b/pytest.ini @@ -1,6 +1,7 @@ [pytest] +python_files = test_*.py *_test.py *_tests.py junit_suite_name = Cassandra dtests log_print = True log_level = INFO log_format = %(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s -timeout = 900 \ No newline at end of file +timeout = 900 diff --git a/sstable_generation_loading_test.py b/sstable_generation_loading_test.py index ab99a03..901011d 100644 --- a/sstable_generation_loading_test.py +++ b/sstable_generation_loading_test.py @@ -2,18 +2,19 @@ import os import subprocess import time import distutils.dir_util +from distutils.version import LooseVersion + import pytest import logging from ccmlib import common as ccmcommon -from dtest import Tester, create_ks, create_cf +from dtest import Tester, create_ks, create_cf, MAJOR_VERSION_4 from tools.assertions import assert_all, assert_none, assert_one since = pytest.mark.since logger = logging.getLogger(__name__) - # WARNING: sstableloader tests should be added to TestSSTableGenerationAndLoading (below), # and not to BaseSStableLoaderTest (which is shared with upgrade tests) @@ -27,45 +28,62 @@ class TestBaseSStableLoader(Tester): fixture_dtest_setup.allow_log_errors = True upgrade_from = None - compact = False + test_compact = False + + def compact(self): + return self.fixture_dtest_setup.cluster.version() < MAJOR_VERSION_4 and self.test_compact def create_schema(self, session, ks, compression): create_ks(session, ks, rf=2) - create_cf(session, "standard1", compression=compression, compact_storage=self.compact) + create_cf(session, "standard1", compression=compression, compact_storage=self.compact()) create_cf(session, "counter1", compression=compression, columns={'v': 'counter'}, - compact_storage=self.compact) + compact_storage=self.compact()) + + def skip_base_class_test(self): + if self.__class__.__name__ != 'TestBasedSSTableLoader' and self.upgrade_from is None: + pytest.skip("Don't need to run base class test, only derived classes") def test_sstableloader_compression_none_to_none(self): + self.skip_base_class_test() self.load_sstable_with_configuration(None, None) def test_sstableloader_compression_none_to_snappy(self): + self.skip_base_class_test() self.load_sstable_with_configuration(None, 'Snappy') def test_sstableloader_compression_none_to_deflate(self): + self.skip_base_class_test() self.load_sstable_with_configuration(None, 'Deflate') def test_sstableloader_compression_snappy_to_none(self): + self.skip_base_class_test() self.load_sstable_with_configuration('Snappy', None) def test_sstableloader_compression_snappy_to_snappy(self): + self.skip_base_class_test() self.load_sstable_with_configuration('Snappy', 'Snappy') def test_sstableloader_compression_snappy_to_deflate(self): + self.skip_base_class_test() self.load_sstable_with_configuration('Snappy', 'Deflate') def test_sstableloader_compression_deflate_to_none(self): + self.skip_base_class_test() self.load_sstable_with_configuration('Deflate', None) def test_sstableloader_compression_deflate_to_snappy(self): + self.skip_base_class_test() self.load_sstable_with_configuration('Deflate', 'Snappy') def test_sstableloader_compression_deflate_to_deflate(self): + self.skip_base_class_test() self.load_sstable_with_configuration('Deflate', 'Deflate') def test_sstableloader_with_mv(self): """ @jira_ticket CASSANDRA-11275 """ + self.skip_base_class_test() def create_schema_with_mv(session, ks, compression): self.create_schema(session, ks, compression) # create a materialized view @@ -125,9 +143,11 @@ class TestBaseSStableLoader(Tester): cluster = self.cluster if self.upgrade_from: logger.debug("Generating sstables with version %s" % (self.upgrade_from)) + default_install_version = self.cluster.version() default_install_dir = self.cluster.get_install_dir() # Forcing cluster version on purpose cluster.set_install_dir(version=self.upgrade_from) + self.fixture_dtest_setup.reinitialize_cluster_for_different_version() logger.debug("Using jvm_args={}".format(self.jvm_args)) cluster.populate(2).start(jvm_args=list(self.jvm_args)) node1, node2 = cluster.nodelist() @@ -141,6 +161,16 @@ class TestBaseSStableLoader(Tester): session.execute("UPDATE standard1 SET v='{}' WHERE KEY='{}' AND c='col'".format(i, i)) session.execute("UPDATE counter1 SET v=v+1 WHERE KEY='{}'".format(i)) + #Will upgrade to a version that doesn't support compact storage so revert the compact + #storage, this doesn't actually fix it yet + if self.compact() and default_install_version >= MAJOR_VERSION_4: + session.execute('alter table standard1 drop compact storage'); + session.execute('alter table counter1 drop compact storage'); + node1.nodetool('rebuild') + node1.nodetool('cleanup') + node2.nodetool('rebuild') + node2.nodetool('cleanup') + node1.nodetool('drain') node1.stop() node2.nodetool('drain') @@ -158,6 +188,7 @@ class TestBaseSStableLoader(Tester): logger.debug("Running sstableloader with version from %s" % (default_install_dir)) # Return to previous version cluster.set_install_dir(install_dir=default_install_dir) + self.fixture_dtest_setup.reinitialize_cluster_for_different_version() cluster.start(jvm_args=list(self.jvm_args)) time.sleep(5) # let gossip figure out what is going on diff --git a/tools/assertions.py b/tools/assertions.py index 2f826ae..bce9b8a 100644 --- a/tools/assertions.py +++ b/tools/assertions.py @@ -346,3 +346,11 @@ def assert_lists_equal_ignoring_order(list1, list2, sort_key=None): sorted_list2 = sorted(normalized_list2, key=lambda elm: str(elm[sort_key])) assert sorted_list1 == sorted_list2 + + +def assert_lists_of_dicts_equal(list1, list2): + for adict, bdict in zip(list1, list2): + assert(len(adict) == len(bdict)) + for key, value in adict.items(): + assert key in bdict + assert bdict[key] == value diff --git a/tools/misc.py b/tools/misc.py index aa2c084..a4502f1 100644 --- a/tools/misc.py +++ b/tools/misc.py @@ -3,6 +3,7 @@ import subprocess import time import hashlib import logging +import pytest from collections import Mapping @@ -97,7 +98,9 @@ def list_to_hashed_dict(list): normalized_list.append(tmp_list) else: normalized_list.append(item) - list_digest = hashlib.sha256(str(normalized_list).encode('utf-8', 'ignore')).hexdigest() + list_str = str(normalized_list) + utf8 = list_str.encode('utf-8', 'ignore') + list_digest = hashlib.sha256(utf8).hexdigest() hashed_dict[list_digest] = normalized_list return hashed_dict @@ -136,3 +139,20 @@ class ImmutableMapping(Mapping): def __repr__(self): return '{cls}({data})'.format(cls=self.__class__.__name__, data=self._data) + + +def wait_for_agreement(thrift, timeout=10): + def check_agreement(): + schemas = thrift.describe_schema_versions() + if len([ss for ss in list(schemas.keys()) if ss != 'UNREACHABLE']) > 1: + raise Exception("schema agreement not reached") + retry_till_success(check_agreement, timeout=timeout) + + +def add_skip(cls, reason=""): + if hasattr(cls, "pytestmark"): + cls.pytestmark = cls.pytestmark.copy() + cls.pytestmark.append(pytest.mark.skip(reason)) + else: + cls.pytestmark = [pytest.mark.skip(reason)] + return cls diff --git a/tools/paging.py b/tools/paging.py index 0d99bfd..7153ed6 100644 --- a/tools/paging.py +++ b/tools/paging.py @@ -165,7 +165,19 @@ class PageAssertionMixin(object): """Can be added to subclasses of unittest.Tester""" def assertEqualIgnoreOrder(self, actual, expected): - assert list_to_hashed_dict(actual) == list_to_hashed_dict(expected) + hashed_expected = list_to_hashed_dict(expected) + hashed_actual = list_to_hashed_dict(actual) + for key, expected in hashed_expected.items(): + assert key in hashed_actual, "expected %s not in actual" % str(expected) + actual = hashed_actual[key] + assert actual == expected, "actual %s not same as expected %s" % (str(actual), str(expected)) + + for key, actual in hashed_actual.items(): + assert key in hashed_expected, "actual %s not in expected" % str(actual) + expected = hashed_expected[key] + assert expected == actual, "expected %s not same as actual %s" % (str(expected), str(actual)) + + assert hashed_expected == hashed_actual def assertIsSubsetOf(self, subset, superset): diff --git a/upgrade_tests/compatibility_flag_test.py b/upgrade_tests/compatibility_flag_test.py index f308174..3711841 100644 --- a/upgrade_tests/compatibility_flag_test.py +++ b/upgrade_tests/compatibility_flag_test.py @@ -25,6 +25,7 @@ class TestCompatibilityFlag(Tester): cluster.populate(2) node1, node2 = cluster.nodelist() cluster.set_install_dir(version=from_version) + self.fixture_dtest_setup.reinitialize_cluster_for_different_version() cluster.start(wait_for_binary_proto=True) node1.drain() @@ -46,6 +47,7 @@ class TestCompatibilityFlag(Tester): cluster.populate(2) node1, node2 = cluster.nodelist() cluster.set_install_dir(version=from_version) + self.fixture_dtest_setup.reinitialize_cluster_for_different_version() cluster.start(wait_for_binary_proto=True) node1.drain() diff --git a/upgrade_tests/conftest.py b/upgrade_tests/conftest.py new file mode 100644 index 0000000..ecd4753 --- /dev/null +++ b/upgrade_tests/conftest.py @@ -0,0 +1,4 @@ +from .upgrade_manifest import set_config + +def pytest_configure(config): + set_config(config) diff --git a/upgrade_tests/cql_tests.py b/upgrade_tests/cql_tests.py index f08a141..4350877 100644 --- a/upgrade_tests/cql_tests.py +++ b/upgrade_tests/cql_tests.py @@ -16,7 +16,7 @@ from cassandra.protocol import ProtocolException, SyntaxException from cassandra.query import SimpleStatement from cassandra.util import sortedset -from dtest import RUN_STATIC_UPGRADE_MATRIX +from dtest import RUN_STATIC_UPGRADE_MATRIX, MAJOR_VERSION_4 from thrift_bindings.thrift010.ttypes import \ ConsistencyLevel as ThriftConsistencyLevel from thrift_bindings.thrift010.ttypes import (CfDef, Column, ColumnDef, @@ -27,6 +27,7 @@ from thrift_test import get_thrift_client from tools.assertions import (assert_all, assert_invalid, assert_length_equal, assert_none, assert_one, assert_row_count) from tools.data import rows_to_list +from tools.misc import add_skip from .upgrade_base import UpgradeTester from .upgrade_manifest import build_upgrade_pairs @@ -37,6 +38,9 @@ logger = logging.getLogger(__name__) @pytest.mark.upgrade_test class TestCQL(UpgradeTester): + def is_40_or_greater(self): + return self.UPGRADE_PATH.upgrade_meta.family in ('trunk', '4.0') + def test_static_cf(self): """ Test static CF syntax """ cursor = self.prepare() @@ -79,7 +83,7 @@ class TestCQL(UpgradeTester): assert_all(cursor, "SELECT * FROM users", [[UUID('f47ac10b-58cc-4372-a567-0e02b2c3d479'), 37, None, None], [UUID('550e8400-e29b-41d4-a716-446655440000'), 36, None, None]]) - @since('2.0', max_version='3') # 3.0+ not compatible with protocol version 2 + @since('2.0', max_version='2.99') # 3.0+ not compatible with protocol version 2 def test_large_collection_errors(self): """ For large collections, make sure that we are printing warnings """ for version in self.get_node_versions(): @@ -128,6 +132,10 @@ class TestCQL(UpgradeTester): ) WITH COMPACT STORAGE; """) + #4.0 doesn't support compact storage + if self.is_40_or_greater(): + cursor.execute("ALTER TABLE users DROP COMPACT STORAGE;") + for is_upgraded, cursor in self.do_upgrade(cursor): logger.debug("Querying {} node".format("upgraded" if is_upgraded else "old")) cursor.execute("TRUNCATE users") @@ -139,15 +147,25 @@ class TestCQL(UpgradeTester): # Queries assert_one(cursor, "SELECT firstname, lastname FROM users WHERE userid = 550e8400-e29b-41d4-a716-446655440000", ['Frodo', 'Baggins']) - assert_one(cursor, "SELECT * FROM users WHERE userid = 550e8400-e29b-41d4-a716-446655440000", [UUID('550e8400-e29b-41d4-a716-446655440000'), 32, 'Frodo', 'Baggins']) - - # FIXME There appears to be some sort of problem with reusable cells - # when executing this query. It's likely that CASSANDRA-9705 will - # fix this, but I'm not 100% sure. - assert_one(cursor, "SELECT * FROM users WHERE userid = f47ac10b-58cc-4372-a567-0e02b2c3d479", [UUID('f47ac10b-58cc-4372-a567-0e02b2c3d479'), 33, 'Samwise', 'Gamgee']) - - assert_all(cursor, "SELECT * FROM users", [[UUID('f47ac10b-58cc-4372-a567-0e02b2c3d479'), 33, 'Samwise', 'Gamgee'], - [UUID('550e8400-e29b-41d4-a716-446655440000'), 32, 'Frodo', 'Baggins']]) + if self.is_40_or_greater(): + assert_one(cursor, "SELECT * FROM users WHERE userid = 550e8400-e29b-41d4-a716-446655440000", + [UUID('550e8400-e29b-41d4-a716-446655440000'), None, 32, 'Frodo', 'Baggins', None]) + assert_one(cursor, "SELECT * FROM users WHERE userid = f47ac10b-58cc-4372-a567-0e02b2c3d479", + [UUID('f47ac10b-58cc-4372-a567-0e02b2c3d479'), None, 33, 'Samwise', 'Gamgee', None]) + assert_all(cursor, "SELECT * FROM users", + [[UUID('f47ac10b-58cc-4372-a567-0e02b2c3d479'), None, 33, 'Samwise', 'Gamgee', None], + [UUID('550e8400-e29b-41d4-a716-446655440000'), None, 32, 'Frodo', 'Baggins', None]]) + else: + assert_one(cursor, "SELECT * FROM users WHERE userid = 550e8400-e29b-41d4-a716-446655440000", + [UUID('550e8400-e29b-41d4-a716-446655440000'), 32, 'Frodo', 'Baggins']) + # FIXME There appears to be some sort of problem with reusable cells + # when executing this query. It's likely that CASSANDRA-9705 will + # fix this, but I'm not 100% sure. + assert_one(cursor, "SELECT * FROM users WHERE userid = f47ac10b-58cc-4372-a567-0e02b2c3d479", + [UUID('f47ac10b-58cc-4372-a567-0e02b2c3d479'), 33, 'Samwise', 'Gamgee']) + assert_all(cursor, "SELECT * FROM users", + [[UUID('f47ac10b-58cc-4372-a567-0e02b2c3d479'), 33, 'Samwise', 'Gamgee'], + [UUID('550e8400-e29b-41d4-a716-446655440000'), 32, 'Frodo', 'Baggins']]) # Test batch inserts cursor.execute(""" @@ -159,8 +177,12 @@ class TestCQL(UpgradeTester): APPLY BATCH """) - assert_all(cursor, "SELECT * FROM users", [[UUID('f47ac10b-58cc-4372-a567-0e02b2c3d479'), 37, None, None], - [UUID('550e8400-e29b-41d4-a716-446655440000'), 36, None, None]]) + if self.is_40_or_greater(): + assert_all(cursor, "SELECT * FROM users", [[UUID('f47ac10b-58cc-4372-a567-0e02b2c3d479'), None, 37, None, None, None], + [UUID('550e8400-e29b-41d4-a716-446655440000'), None, 36, None, None, None]]) + else: + assert_all(cursor, "SELECT * FROM users", [[UUID('f47ac10b-58cc-4372-a567-0e02b2c3d479'), 37, None, None], + [UUID('550e8400-e29b-41d4-a716-446655440000'), 36, None, None]]) def test_dynamic_cf(self): """ Test non-composite dynamic CF syntax """ @@ -175,6 +197,10 @@ class TestCQL(UpgradeTester): ) WITH COMPACT STORAGE; """) + #4.0 doesn't support compact storage + if self.is_40_or_greater(): + cursor.execute("ALTER TABLE clicks DROP COMPACT STORAGE;") + for is_upgraded, cursor in self.do_upgrade(cursor): logger.debug("Querying {} node".format("upgraded" if is_upgraded else "old")) cursor.execute("TRUNCATE clicks") @@ -196,8 +222,9 @@ class TestCQL(UpgradeTester): assert_all(cursor, "SELECT time FROM clicks", [[24], [12], [128], [24], [12], [42]]) - # Check we don't allow empty values for url since this is the full underlying cell name (#6152) - assert_invalid(cursor, "INSERT INTO clicks (userid, url, time) VALUES (810e8500-e29b-41d4-a716-446655440000, '', 42)") + if not self.is_40_or_greater(): + # Check we don't allow empty values for url since this is the full underlying cell name (#6152) + assert_invalid(cursor, "INSERT INTO clicks (userid, url, time) VALUES (810e8500-e29b-41d4-a716-446655440000, '', 42)") def test_dense_cf(self): """ Test composite 'dense' CF syntax """ @@ -213,6 +240,10 @@ class TestCQL(UpgradeTester): ) WITH COMPACT STORAGE; """) + #4.0 doesn't support compact storage + if self.is_40_or_greater(): + cursor.execute("ALTER TABLE connections DROP COMPACT STORAGE;") + for is_upgraded, cursor in self.do_upgrade(cursor): logger.debug("Querying {} node".format("upgraded" if is_upgraded else "old")) cursor.execute("TRUNCATE connections") @@ -224,8 +255,12 @@ class TestCQL(UpgradeTester): cursor.execute("UPDATE connections SET time = 24 WHERE userid = f47ac10b-58cc-4372-a567-0e02b2c3d479 AND ip = '192.168.0.2' AND port = 80") # we don't have to include all of the clustering columns (see CASSANDRA-7990) - cursor.execute("INSERT INTO connections (userid, ip, time) VALUES (f47ac10b-58cc-4372-a567-0e02b2c3d479, '192.168.0.3', 42)") - cursor.execute("UPDATE connections SET time = 42 WHERE userid = f47ac10b-58cc-4372-a567-0e02b2c3d479 AND ip = '192.168.0.4'") + if self.is_40_or_greater(): + cursor.execute("INSERT INTO connections (userid, ip, port, time) VALUES (f47ac10b-58cc-4372-a567-0e02b2c3d479, '192.168.0.3', 80, 42)") + cursor.execute("UPDATE connections SET time = 42 WHERE userid = f47ac10b-58cc-4372-a567-0e02b2c3d479 AND ip = '192.168.0.4' AND port = 90") + else: + cursor.execute("INSERT INTO connections (userid, ip, time) VALUES (f47ac10b-58cc-4372-a567-0e02b2c3d479, '192.168.0.3', 42)") + cursor.execute("UPDATE connections SET time = 42 WHERE userid = f47ac10b-58cc-4372-a567-0e02b2c3d479 AND ip = '192.168.0.4'") # Queries assert_all(cursor, "SELECT ip, port, time FROM connections WHERE userid = 550e8400-e29b-41d4-a716-446655440000", @@ -239,17 +274,23 @@ class TestCQL(UpgradeTester): assert_none(cursor, "SELECT ip, port, time FROM connections WHERE userid = 550e8400-e29b-41d4-a716-446655440000 and ip > '192.168.0.2'") - assert_one(cursor, "SELECT ip, port, time FROM connections WHERE userid = f47ac10b-58cc-4372-a567-0e02b2c3d479 AND ip = '192.168.0.3'", - ['192.168.0.3', None, 42]) - - assert_one(cursor, "SELECT ip, port, time FROM connections WHERE userid = f47ac10b-58cc-4372-a567-0e02b2c3d479 AND ip = '192.168.0.4'", - ['192.168.0.4', None, 42]) + if self.is_40_or_greater(): + assert_one(cursor, "SELECT ip, port, time FROM connections WHERE userid = f47ac10b-58cc-4372-a567-0e02b2c3d479 AND ip = '192.168.0.3'", + ['192.168.0.3', 80, 42]) + assert_one(cursor, "SELECT ip, port, time FROM connections WHERE userid = f47ac10b-58cc-4372-a567-0e02b2c3d479 AND ip = '192.168.0.4'", + ['192.168.0.4', 90, 42]) + else: + assert_one(cursor, "SELECT ip, port, time FROM connections WHERE userid = f47ac10b-58cc-4372-a567-0e02b2c3d479 AND ip = '192.168.0.3'", + ['192.168.0.3', None, 42]) + assert_one(cursor, "SELECT ip, port, time FROM connections WHERE userid = f47ac10b-58cc-4372-a567-0e02b2c3d479 AND ip = '192.168.0.4'", + ['192.168.0.4', None, 42]) # Deletion cursor.execute("DELETE time FROM connections WHERE userid = 550e8400-e29b-41d4-a716-446655440000 AND ip = '192.168.0.2' AND port = 80") res = list(cursor.execute("SELECT * FROM connections WHERE userid = 550e8400-e29b-41d4-a716-446655440000")) - assert_length_equal(res, 2) + #Without compact storage deleting just the column leaves the row behind + assert_length_equal(res, 3 if self.is_40_or_greater() else 2) cursor.execute("DELETE FROM connections WHERE userid = 550e8400-e29b-41d4-a716-446655440000") assert_none(cursor, "SELECT * FROM connections WHERE userid = 550e8400-e29b-41d4-a716-446655440000") @@ -310,6 +351,10 @@ class TestCQL(UpgradeTester): ) WITH COMPACT STORAGE; """) + #4.0 doesn't support compact storage + if self.is_40_or_greater(): + cursor.execute("ALTER TABLE clicks DROP COMPACT STORAGE;") + for is_upgraded, cursor in self.do_upgrade(cursor): logger.debug("Querying {} node".format("upgraded" if is_upgraded else "old")) cursor.execute("TRUNCATE clicks") @@ -337,6 +382,10 @@ class TestCQL(UpgradeTester): ) WITH COMPACT STORAGE; """) + #4.0 doesn't support compact storage + if self.is_40_or_greater(): + cursor.execute("ALTER TABLE clicks DROP COMPACT STORAGE;") + for is_upgraded, cursor in self.do_upgrade(cursor): logger.debug("Querying {} node".format("upgraded" if is_upgraded else "old")) cursor.execute("TRUNCATE clicks") @@ -408,6 +457,7 @@ class TestCQL(UpgradeTester): res = list(cursor.execute("SELECT * FROM clicks LIMIT 4")) assert_length_equal(res, 4) + @pytest.mark.skip("https://issues.apache.org/jira/browse/CASSANDRA-14958") def test_counters(self): """ Validate counter support """ cursor = self.prepare() @@ -421,6 +471,10 @@ class TestCQL(UpgradeTester): ) WITH COMPACT STORAGE; """) + #4.0 doesn't support compact storage + if self.is_40_or_greater(): + cursor.execute("ALTER TABLE clicks DROP COMPACT STORAGE;") + for is_upgraded, cursor in self.do_upgrade(cursor): logger.debug("Querying {} node".format("upgraded" if is_upgraded else "old")) cursor.execute("TRUNCATE clicks") @@ -516,6 +570,10 @@ class TestCQL(UpgradeTester): ) WITH COMPACT STORAGE; """) + #4.0 doesn't support compact storage + if self.is_40_or_greater(): + cursor.execute("ALTER TABLE test DROP COMPACT STORAGE;") + for is_upgraded, cursor in self.do_upgrade(cursor): logger.debug("Querying {} node".format("upgraded" if is_upgraded else "old")) cursor.execute("TRUNCATE test") @@ -564,6 +622,11 @@ class TestCQL(UpgradeTester): ) WITH COMPACT STORAGE; """) + #4.0 doesn't support compact storage + if self.is_40_or_greater(): + cursor.execute("ALTER TABLE test1 DROP COMPACT STORAGE;") + cursor.execute("ALTER TABLE test2 DROP COMPACT STORAGE;") + for is_upgraded, cursor in self.do_upgrade(cursor): logger.debug("Querying {} node".format("upgraded" if is_upgraded else "old")) cursor.execute("TRUNCATE test1") @@ -613,6 +676,10 @@ class TestCQL(UpgradeTester): ); """) + #4.0 doesn't support compact storage + if self.is_40_or_greater(): + cursor.execute("ALTER TABLE test1 DROP COMPACT STORAGE;") + for is_upgraded, cursor in self.do_upgrade(cursor): logger.debug("Querying {} node".format("upgraded" if is_upgraded else "old")) cursor.execute("TRUNCATE test1") @@ -664,6 +731,11 @@ class TestCQL(UpgradeTester): ) WITH COMPACT STORAGE """) + #4.0 doesn't support compact storage + if self.is_40_or_greater(): + cursor.execute("ALTER TABLE Test DROP COMPACT STORAGE;") + cursor.execute("ALTER TABLE test2 DROP COMPACT STORAGE;") + for is_upgraded, cursor in self.do_upgrade(cursor): logger.debug("Querying {} node".format("upgraded" if is_upgraded else "old")) cursor.execute("TRUNCATE test") @@ -901,6 +973,10 @@ class TestCQL(UpgradeTester): ) WITH COMPACT STORAGE; """) + #4.0 doesn't support compact storage + if self.is_40_or_greater(): + cursor.execute("ALTER TABLE testcf2 DROP COMPACT STORAGE;") + for is_upgraded, cursor in self.do_upgrade(cursor): logger.debug("Querying {} node".format("upgraded" if is_upgraded else "old")) cursor.execute("TRUNCATE testcf") @@ -1485,7 +1561,7 @@ class TestCQL(UpgradeTester): assert_all(cursor, "SELECT * FROM test WHERE token(k1, k2) > " + str(-((2 ** 63) - 1)), [[0, 2, 2, 2], [0, 3, 3, 3], [0, 0, 0, 0], [0, 1, 1, 1]]) - @since('2', max_version='4') + @since('2', max_version='3.99') def test_cql3_insert_thrift(self): """ Check that we can insert from thrift into a CQL3 table @@ -1514,7 +1590,7 @@ class TestCQL(UpgradeTester): key = struct.pack('>i', 2) column_name_component = struct.pack('>i', 4) # component length + component + EOC + component length + component + EOC - column_name = '\x00\x04' + column_name_component + '\x00' + '\x00\x01' + 'v' + '\x00' + column_name = b'\x00\x04' + column_name_component + b'\x00' + b'\x00\x01' + 'v'.encode() + b'\x00' value = struct.pack('>i', 8) client.batch_mutate( {key: {'test': [Mutation(ColumnOrSuperColumn(column=Column(name=column_name, value=value, timestamp=100)))]}}, @@ -1522,7 +1598,7 @@ class TestCQL(UpgradeTester): assert_one(cursor, "SELECT * FROM test", [2, 4, 8]) - @since('2', max_version='4') + @since('2', max_version='3.99') def test_cql3_non_compound_range_tombstones(self): """ Checks that 3.0 serializes RangeTombstoneLists correctly @@ -1538,7 +1614,7 @@ class TestCQL(UpgradeTester): client.set_keyspace('ks') # create a CF with mixed static and dynamic cols - column_defs = [ColumnDef('static1', 'Int32Type', None, None, None)] + column_defs = [ColumnDef('static1'.encode(), 'Int32Type', None, None, None)] cfdef = CfDef( keyspace='ks', name='cf', @@ -1568,34 +1644,34 @@ class TestCQL(UpgradeTester): client.set_keyspace('ks') # insert a number of keys so that we'll get rows on both the old and upgraded nodes - for key in ['key{}'.format(i) for i in range(10)]: - logger.debug("Using key " + key) + for key in ['key{}'.format(i).encode() for i in range(10)]: + logger.debug("Using key " + key.decode()) # insert "static" column client.batch_mutate( - {key: {'cf': [Mutation(ColumnOrSuperColumn(column=Column(name='static1', value=struct.pack('>i', 1), timestamp=100)))]}}, + {key: {'cf': [Mutation(ColumnOrSuperColumn(column=Column(name='static1'.encode(), value=struct.pack('>i', 1), timestamp=100)))]}}, ThriftConsistencyLevel.ALL) # insert "dynamic" columns for i, column_name in enumerate(('a', 'b', 'c', 'd', 'e')): column_value = 'val{}'.format(i) client.batch_mutate( - {key: {'cf': [Mutation(ColumnOrSuperColumn(column=Column(name=column_name, value=column_value, timestamp=100)))]}}, + {key: {'cf': [Mutation(ColumnOrSuperColumn(column=Column(name=column_name.encode(), value=column_value.encode(), timestamp=100)))]}}, ThriftConsistencyLevel.ALL) # sanity check on the query - fetch_slice = SlicePredicate(slice_range=SliceRange('', '', False, 100)) + fetch_slice = SlicePredicate(slice_range=SliceRange(''.encode(), ''.encode(), False, 100)) row = client.get_slice(key, ColumnParent(column_family='cf'), fetch_slice, ThriftConsistencyLevel.ALL) assert 6 == len(row), row - cols = OrderedDict([(cosc.column.name, cosc.column.value) for cosc in row]) + cols = OrderedDict([(cosc.column.name.decode(), cosc.column.value) for cosc in row]) logger.debug(cols) assert ['a', 'b', 'c', 'd', 'e', 'static1'] == list(cols.keys()) - assert 'val0' == cols['a'] - assert 'val4' == cols['e'] + assert 'val0'.encode() == cols['a'] + assert 'val4'.encode() == cols['e'] assert struct.pack('>i', 1) == cols['static1'] # delete a slice of dynamic columns - slice_range = SliceRange('b', 'd', False, 100) + slice_range = SliceRange('b'.encode(), 'd'.encode(), False, 100) client.batch_mutate( {key: {'cf': [Mutation(deletion=Deletion(timestamp=101, predicate=SlicePredicate(slice_range=slice_range)))]}}, ThriftConsistencyLevel.ALL) @@ -1603,11 +1679,11 @@ class TestCQL(UpgradeTester): # check remaining columns row = client.get_slice(key, ColumnParent(column_family='cf'), fetch_slice, ThriftConsistencyLevel.ALL) assert 3 == len(row), row - cols = OrderedDict([(cosc.column.name, cosc.column.value) for cosc in row]) + cols = OrderedDict([(cosc.column.name.decode(), cosc.column.value) for cosc in row]) logger.debug(cols) assert ['a', 'e', 'static1'] == list(cols.keys()) - assert 'val0' == cols['a'] - assert 'val4' == cols['e'] + assert 'val0'.encode() == cols['a'] + assert 'val4'.encode() == cols['e'] assert struct.pack('>i', 1) == cols['static1'] def test_row_existence(self): @@ -1673,6 +1749,10 @@ class TestCQL(UpgradeTester): ) WITH COMPACT STORAGE """) + #4.0 doesn't support compact storage + if self.is_40_or_greater(): + cursor.execute("ALTER TABLE test2 DROP COMPACT STORAGE;") + for is_upgraded, cursor in self.do_upgrade(cursor): logger.debug("Querying {} node".format("upgraded" if is_upgraded else "old")) cursor.execute("TRUNCATE test") @@ -1692,7 +1772,9 @@ class TestCQL(UpgradeTester): cursor.execute(q, (k, c)) query = "SELECT * FROM test2" - assert_all(cursor, query, [[x, y] for x in range(0, 2) for y in range(0, 2)]) + expected = [[x, y, None] for x in range(0, 2) for y in range(0, 2)] if self.is_40_or_greater() else [ + [x, y] for x in range(0, 2) for y in range(0, 2)] + assert_all(cursor, query, expected) def test_no_clustering(self): cursor = self.prepare() @@ -2043,6 +2125,11 @@ class TestCQL(UpgradeTester): ) WITH COMPACT STORAGE; """) + #4.0 doesn't support compact storage + if self.is_40_or_greater(): + cursor.execute("ALTER TABLE test1 DROP COMPACT STORAGE;") + cursor.execute("ALTER TABLE test2 DROP COMPACT STORAGE;") + for is_upgraded, cursor in self.do_upgrade(cursor): logger.debug("Querying {} node".format("upgraded" if is_upgraded else "old")) cursor.execute("TRUNCATE test1") @@ -2108,6 +2195,10 @@ class TestCQL(UpgradeTester): AND CLUSTERING ORDER BY(c1 DESC, c2 DESC); """) + #4.0 doesn't support compact storage + if self.is_40_or_greater(): + cursor.execute("ALTER TABLE test DROP COMPACT STORAGE;") + for is_upgraded, cursor in self.do_upgrade(cursor): logger.debug("Querying {} node".format("upgraded" if is_upgraded else "old")) cursor.execute("TRUNCATE test") @@ -2354,6 +2445,7 @@ class TestCQL(UpgradeTester): query = "SELECT blog_id, content FROM blogs WHERE author='foo'" assert_all(cursor, query, [[1, set(['bar1', 'bar2'])], [1, set(['bar2', 'bar3'])], [2, set(['baz'])]]) + @pytest.mark.skip("https://issues.apache.org/jira/browse/CASSANDRA-14961") def test_truncate_clean_cache(self): cursor = self.prepare(ordered=True, use_cache=True) @@ -2412,7 +2504,7 @@ class TestCQL(UpgradeTester): for i in random.sample(range(nb_keys), nb_deletes): cursor.execute("DELETE FROM test WHERE k = {}".format(i)) - res = list(cursor.execute("SELECT * FROM test LIMIT {}".format(nb_keys / 2))) + res = list(cursor.execute("SELECT * FROM test LIMIT {}".format(int(nb_keys / 2)))) assert_length_equal(res, nb_keys / 2) def test_collection_function(self): @@ -2497,6 +2589,10 @@ class TestCQL(UpgradeTester): cursor.execute(create) + #4.0 doesn't support compact storage + if compact and self.is_40_or_greater(): + cursor.execute("ALTER TABLE zipcodes DROP COMPACT STORAGE;") + for is_upgraded, cursor in self.do_upgrade(cursor): logger.debug("Querying {} node".format("upgraded" if is_upgraded else "old")) cursor.execute("TRUNCATE zipcodes") @@ -2551,6 +2647,10 @@ class TestCQL(UpgradeTester): ) WITH COMPACT STORAGE """) + #4.0 doesn't support compact storage + if self.is_40_or_greater(): + cursor.execute("ALTER TABLE test DROP COMPACT STORAGE;") + for is_upgraded, cursor in self.do_upgrade(cursor): logger.debug("Querying {} node".format("upgraded" if is_upgraded else "old")) cursor.execute("TRUNCATE test") @@ -2675,12 +2775,16 @@ class TestCQL(UpgradeTester): ) WITH COMPACT STORAGE; """) + #4.0 doesn't support compact storage + if self.is_40_or_greater(): + cursor.execute("ALTER TABLE bar DROP COMPACT STORAGE;") + for is_upgraded, cursor in self.do_upgrade(cursor): logger.debug("Querying {} node".format("upgraded" if is_upgraded else "old")) cursor.execute("TRUNCATE bar") cursor.execute("INSERT INTO bar (id, i) VALUES (1, 2);") - assert_one(cursor, "SELECT * FROM bar", [1, 2]) + assert_one(cursor, "SELECT * FROM bar", [1, None, 2, None] if self.is_40_or_greater() else [1, 2]) def test_query_compact_tables_during_upgrade(self): """ @@ -2708,6 +2812,10 @@ class TestCQL(UpgradeTester): [(i, i) for i in range(100)]) self.cluster.flush() + #4.0 doesn't support compact storage + if self.is_40_or_greater(): + cursor.execute("ALTER TABLE t1 DROP COMPACT STORAGE;") + def check_read_all(cursor): read_count = 0 # first read each row separately - obviously, we should be able to retrieve all 100 @@ -2895,7 +3003,7 @@ class TestCQL(UpgradeTester): assert_one(cursor, query, [3, 'foobar']) # Fixed by CASSANDRA-12654 in 3.12 - @since('2.0', max_version='3.12') + @since('2.0', max_version='3.11.99') def test_IN_clause_on_last_key(self): """ Tests patch to improve validation by not throwing an assertion when using map, list, or set @@ -2960,9 +3068,9 @@ class TestCQL(UpgradeTester): cursor.execute("TRUNCATE test") cursor.execute("INSERT INTO test (k, b) VALUES (0, 0x)") - assert_one(cursor, "SELECT * FROM test", [0, '']) + assert_one(cursor, "SELECT * FROM test", [0, ''.encode()]) - @since('2', max_version='4') + @since('2', max_version='3.99') def test_rename(self): cursor = self.prepare(start_rpc=True) @@ -3225,7 +3333,7 @@ class TestCQL(UpgradeTester): # test aliasing a regular function res = cursor.execute('SELECT intAsBlob(id) AS id_blob FROM users WHERE id = 0') assert 'id_blob' == res[0]._fields[0] - assert '\x00\x00\x00\x00' == res[0].id_blob + assert '\x00\x00\x00\x00' == res[0].id_blob.decode() logger.debug("Current node version is {}".format(self.get_node_version(is_upgraded))) @@ -3261,6 +3369,10 @@ class TestCQL(UpgradeTester): # Same test, but for compact cursor.execute("CREATE TABLE test_compact (k1 int, k2 int, v int, PRIMARY KEY (k1, k2)) WITH COMPACT STORAGE") + #4.0 doesn't support compact storage + if self.is_40_or_greater(): + cursor.execute("ALTER TABLE test_compact DROP COMPACT STORAGE;") + for is_upgraded, cursor in self.do_upgrade(cursor): logger.debug("Querying {} node".format("upgraded" if is_upgraded else "old")) cursor.execute("TRUNCATE test") @@ -3331,6 +3443,11 @@ class TestCQL(UpgradeTester): # Test a 'wide row' thrift table. cursor.execute('CREATE TABLE wide (pk int, name text, val int, PRIMARY KEY(pk, name)) WITH COMPACT STORAGE') + #4.0 doesn't support compact storage + if self.is_40_or_greater(): + cursor.execute("ALTER TABLE compact DROP COMPACT STORAGE;") + cursor.execute("ALTER TABLE wide DROP COMPACT STORAGE;") + for is_upgraded, cursor in self.do_upgrade(cursor): logger.debug("Querying {} node".format("upgraded" if is_upgraded else "old")) cursor.execute("TRUNCATE regular") @@ -4194,7 +4311,7 @@ class TestCQL(UpgradeTester): assert_invalid(cursor, "SELECT v1, v2, v3 FROM test WHERE k = 0 AND (v1, v3) > (1, 0)") - @since('2.0', max_version='3') # 3.0+ not compatible with protocol version 2 + @since('2.0', max_version='2.99') # 3.0+ not compatible with protocol version 2 def test_v2_protocol_IN_with_tuples(self): """ @jira_ticket CASSANDRA-8062 @@ -4296,6 +4413,10 @@ class TestCQL(UpgradeTester): ) WITH COMPACT STORAGE """) + #4.0 doesn't support compact storage + if self.is_40_or_greater(): + cursor.execute("ALTER TABLE lock DROP COMPACT STORAGE;") + for is_upgraded, cursor in self.do_upgrade(cursor): logger.debug("Querying {} node".format("upgraded" if is_upgraded else "old")) cursor.execute("TRUNCATE lock") @@ -4868,6 +4989,10 @@ class TestCQL(UpgradeTester): ) WITH COMPACT STORAGE """) + #4.0 doesn't support compact storage + if self.is_40_or_greater(): + cursor.execute("ALTER TABLE test DROP COMPACT STORAGE;") + for is_upgraded, cursor in self.do_upgrade(cursor): logger.debug("Querying {} node".format("upgraded" if is_upgraded else "old")) cursor.execute("TRUNCATE test") @@ -4879,9 +5004,14 @@ class TestCQL(UpgradeTester): assert_all(cursor, "SELECT v FROM test WHERE k=0 AND v > 0 AND v <= 4 LIMIT 2", [[1], [2]]) assert_all(cursor, "SELECT v FROM test WHERE k=0 AND v > -1 AND v <= 4 LIMIT 2", [[0], [1]]) - assert_all(cursor, "SELECT * FROM test WHERE k IN (0, 1, 2) AND v > 0 AND v <= 4 LIMIT 2", [[0, 1], [0, 2]]) - assert_all(cursor, "SELECT * FROM test WHERE k IN (0, 1, 2) AND v > -1 AND v <= 4 LIMIT 2", [[0, 0], [0, 1]]) - assert_all(cursor, "SELECT * FROM test WHERE k IN (0, 1, 2) AND v > 0 AND v <= 4 LIMIT 6", [[0, 1], [0, 2], [0, 3], [1, 1], [1, 2], [1, 3]]) + if self.is_40_or_greater(): + assert_all(cursor, "SELECT * FROM test WHERE k IN (0, 1, 2) AND v > 0 AND v <= 4 LIMIT 2", [[0, 1, None], [0, 2, None]]) + assert_all(cursor, "SELECT * FROM test WHERE k IN (0, 1, 2) AND v > -1 AND v <= 4 LIMIT 2", [[0, 0, None], [0, 1, None]]) + assert_all(cursor, "SELECT * FROM test WHERE k IN (0, 1, 2) AND v > 0 AND v <= 4 LIMIT 6", [[0, 1, None], [0, 2, None], [0, 3, None], [1, 1, None], [1, 2, None], [1, 3, None]]) + else: + assert_all(cursor, "SELECT * FROM test WHERE k IN (0, 1, 2) AND v > 0 AND v <= 4 LIMIT 2", [[0, 1], [0, 2]]) + assert_all(cursor, "SELECT * FROM test WHERE k IN (0, 1, 2) AND v > -1 AND v <= 4 LIMIT 2", [[0, 0], [0, 1]]) + assert_all(cursor, "SELECT * FROM test WHERE k IN (0, 1, 2) AND v > 0 AND v <= 4 LIMIT 6", [[0, 1], [0, 2], [0, 3], [1, 1], [1, 2], [1, 3]]) # This doesn't work -- see #7059 # assert_all(cursor, "SELECT * FROM test WHERE v > 1 AND v <= 3 LIMIT 6 ALLOW FILTERING", [[1, 2], [1, 3], [0, 2], [0, 3], [2, 2], [2, 3]]) @@ -5050,6 +5180,7 @@ class TestCQL(UpgradeTester): # A blob that is not 4 bytes should be rejected assert_invalid(cursor, "INSERT INTO test(k, v) VALUES (0, blobAsInt(0x01))") + @pytest.mark.skip("https://issues.apache.org/jira/browse/CASSANDRA-14960") def test_invalid_string_literals(self): """ @jira_ticket CASSANDRA-8101 @@ -5062,10 +5193,9 @@ class TestCQL(UpgradeTester): cursor.execute("TRUNCATE invalid_string_literals") assert_invalid(cursor, "insert into ks.invalid_string_literals (k, a) VALUES (0, '\u038E\u0394\u03B4\u03E0')") - # since the protocol requires strings to be valid UTF-8, the error response to this is a ProtocolError try: - cursor.execute("insert into ks.invalid_string_literals (k, c) VALUES (0, '\xc2\x01')") + cursor.execute("insert into ks.invalid_string_literals (k, b) VALUES (0, '\xc2\x01')") self.fail("Expected error") except ProtocolException as e: assert "Cannot decode string as UTF8" in str(e) @@ -5279,6 +5409,7 @@ class TestCQL(UpgradeTester): assert_none(cursor, "select * from space1.table1 where a=1 and b=1") + @pytest.mark.skip("https://issues.apache.org/jira/browse/CASSANDRA-14961") def test_secondary_index_query(self): """ Test for fix to bug where secondary index cannot be queried due to Column Family caching changes. @@ -5335,7 +5466,7 @@ class TestCQL(UpgradeTester): logger.debug("Querying {} node".format("upgraded" if is_upgraded else "old")) assert_all(cursor, "SELECT k FROM ks.test WHERE v = 0", [[0]]) - def test_tracing_prevents_startup_after_upgrading(self): + def test_tracing_prevents_startup_after_upgrading(self, fixture_dtest_setup): """ Test that after upgrading from 2.1 to 3.0, the system_traces.sessions table is properly upgraded to include the client column. @@ -5346,6 +5477,13 @@ class TestCQL(UpgradeTester): cursor.execute("CREATE KEYSPACE foo WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}") cursor.execute("CREATE TABLE foo.bar (k int PRIMARY KEY, v int)") + #It's possible to log an error when reading trace information because the schema at node differs + #between versions + if self.is_40_or_greater(): + fixture_dtest_setup.ignore_log_patterns = fixture_dtest_setup.ignore_log_patterns +\ + ["Unknown column coordinator_port during deserialization", + "Unknown column source_port during deserialization"] + for is_upgraded, cursor in self.do_upgrade(cursor): logger.debug("Querying {} node".format("upgraded" if is_upgraded else "old")) @@ -5401,6 +5539,7 @@ for spec in specs: assert gen_class_name not in globals() upgrade_applies_to_env = RUN_STATIC_UPGRADE_MATRIX or spec['UPGRADE_PATH'].upgrade_meta.matches_current_env_version_family + cls = type(gen_class_name, (TestCQL,), spec) if not upgrade_applies_to_env: - pytest.skip('test not applicable to env.') - globals()[gen_class_name] = type(gen_class_name, (TestCQL,), spec) + add_skip(cls, 'test not applicable to env.') + globals()[gen_class_name] = cls diff --git a/upgrade_tests/paging_test.py b/upgrade_tests/paging_test.py index e21ba88..11df0b9 100644 --- a/upgrade_tests/paging_test.py +++ b/upgrade_tests/paging_test.py @@ -9,10 +9,12 @@ from cassandra import InvalidRequest from cassandra.query import SimpleStatement, dict_factory, named_tuple_factory from ccmlib.common import LogPatternToVersion -from dtest import RUN_STATIC_UPGRADE_MATRIX, run_scenarios -from tools.assertions import assert_read_timeout_or_failure +from dtest import RUN_STATIC_UPGRADE_MATRIX, run_scenarios, MAJOR_VERSION_4 + +from tools.assertions import (assert_read_timeout_or_failure, assert_lists_equal_ignoring_order) from tools.data import rows_to_list from tools.datahelp import create_rows, flatten_into_set, parse_data_into_dicts +from tools.misc import add_skip from tools.paging import PageAssertionMixin, PageFetcher from .upgrade_base import UpgradeTester from .upgrade_manifest import build_upgrade_pairs @@ -121,7 +123,7 @@ class TestPagingSize(BasePagingTester, PageAssertionMixin): assert pf.num_results_all() == [5, 4] # make sure expected and actual have same data elements (ignoring order) - self.assertEqualIgnoreOrder(pf.all_data(), expected_data) + assert_lists_equal_ignoring_order(pf.all_data(), expected_data, sort_key='value') def test_with_equal_results_to_page_size(self): cursor = self.prepare() @@ -151,7 +153,7 @@ class TestPagingSize(BasePagingTester, PageAssertionMixin): assert pf.pagecount() == 1 # make sure expected and actual have same data elements (ignoring order) - self.assertEqualIgnoreOrder(pf.all_data(), expected_data) + assert_lists_equal_ignoring_order(pf.all_data(), expected_data, sort_key='value') def test_undefined_page_size_default(self): """ @@ -183,7 +185,7 @@ class TestPagingSize(BasePagingTester, PageAssertionMixin): self.maxDiff = None # make sure expected and actual have same data elements (ignoring order) - self.assertEqualIgnoreOrder(pf.all_data(), expected_data) + assert_lists_equal_ignoring_order(pf.all_data(), expected_data, sort_key='value') class TestPagingWithModifiers(BasePagingTester, PageAssertionMixin): @@ -422,7 +424,7 @@ class TestPagingWithModifiers(BasePagingTester, PageAssertionMixin): assert pf.num_results_all() == [4, 3] # make sure the allow filtering query matches the expected results (ignoring order) - self.assertEqualIgnoreOrder( + assert_lists_equal_ignoring_order( pf.all_data(), parse_data_into_dicts( """ @@ -435,7 +437,8 @@ class TestPagingWithModifiers(BasePagingTester, PageAssertionMixin): |8 |and more testing| |9 |and more testing| """, format_funcs={'id': int, 'value': str} - ) + ), + sort_key='value' ) @@ -457,21 +460,30 @@ class TestPagingData(BasePagingTester, PageAssertionMixin): ); """) - cursor.execute(""" - CREATE TABLE test2 ( - k int, - c int, - v text, - PRIMARY KEY (k, c) - ) WITH COMPACT STORAGE; - """) + testing_compact_storage = self.cluster.version() <= MAJOR_VERSION_4 + if testing_compact_storage: + cursor.execute(""" + CREATE TABLE test2 ( + k int, + c int, + v text, + PRIMARY KEY (k, c) + ) WITH COMPACT STORAGE; + """) + + version_string = self.upgrade_version_string() + #4.0 doesn't support compact storage + if version_string == 'trunk' or version_string >= MAJOR_VERSION_4: + cursor.execute("ALTER TABLE test2 DROP COMPACT STORAGE;") for is_upgraded, cursor in self.do_upgrade(cursor): logger.debug("Querying %s node" % ("upgraded" if is_upgraded else "old",)) cursor.execute("TRUNCATE test") - cursor.execute("TRUNCATE test2") + if testing_compact_storage: + cursor.execute("TRUNCATE test2") - for table in ("test", "test2"): + tables = ("test", "test2") if testing_compact_storage else ("test") + for table in tables: logger.debug("Querying table %s" % (table,)) expected = [] # match the key ordering for murmur3 @@ -503,22 +515,31 @@ class TestPagingData(BasePagingTester, PageAssertionMixin): ); """) - cursor.execute(""" - CREATE TABLE test2 ( - k int, - c1 int, - c2 int, - v text, - PRIMARY KEY (k, c1, c2) - ) WITH COMPACT STORAGE; - """) + testing_compact_storage = self.cluster.version() <= MAJOR_VERSION_4 + if testing_compact_storage: + cursor.execute(""" + CREATE TABLE test2 ( + k int, + c1 int, + c2 int, + v text, + PRIMARY KEY (k, c1, c2) + ) WITH COMPACT STORAGE; + """) + + version_string = self.upgrade_version_string() + #4.0 doesn't support compact storage + if version_string == 'trunk' or version_string >= MAJOR_VERSION_4: + cursor.execute("ALTER TABLE test2 DROP COMPACT STORAGE;") for is_upgraded, cursor in self.do_upgrade(cursor): logger.debug("Querying %s node" % ("upgraded" if is_upgraded else "old",)) cursor.execute("TRUNCATE test") - cursor.execute("TRUNCATE test2") + if testing_compact_storage: + cursor.execute("TRUNCATE test2") - for table in ("test", "test2"): + tables = ("test", "test2") if testing_compact_storage else ("test") + for table in tables: logger.debug("Querying table %s" % (table,)) expected = [] # match the key ordering for murmur3 @@ -566,7 +587,7 @@ class TestPagingData(BasePagingTester, PageAssertionMixin): all_results = pf.all_data() assert len(expected_data) == len(all_results) self.maxDiff = None - self.assertEqualIgnoreOrder(expected_data, all_results) + assert_lists_equal_ignoring_order(expected_data, all_results, sort_key='value') def test_paging_across_multi_wide_rows(self): cursor = self.prepare() @@ -635,7 +656,7 @@ class TestPagingData(BasePagingTester, PageAssertionMixin): assert pf.pagecount() == 2 assert pf.num_results_all() == [400, 200] - self.assertEqualIgnoreOrder(expected_data, pf.all_data()) + assert_lists_equal_ignoring_order(expected_data, pf.all_data(), sort_key='sometext') @since('2.0.6') def test_static_columns_paging(self): @@ -902,7 +923,7 @@ class TestPagingData(BasePagingTester, PageAssertionMixin): assert pf.pagecount() == 2 assert pf.num_results_all() == [400, 200] - self.assertEqualIgnoreOrder(expected_data, pf.all_data()) + assert_lists_equal_ignoring_order(expected_data, pf.all_data(), sort_key='sometext') class TestPagingDatasetChanges(BasePagingTester, PageAssertionMixin): @@ -944,7 +965,7 @@ class TestPagingDatasetChanges(BasePagingTester, PageAssertionMixin): assert pf.pagecount() == 2 assert pf.num_results_all(), [501 == 499] - self.assertEqualIgnoreOrder(pf.all_data(), expected_data) + assert_lists_equal_ignoring_order(pf.all_data(), expected_data, sort_key='mytext') def test_data_change_impacting_later_page(self): cursor = self.prepare() @@ -981,7 +1002,7 @@ class TestPagingDatasetChanges(BasePagingTester, PageAssertionMixin): # add the new row to the expected data and then do a compare expected_data.append({'id': 2, 'mytext': 'foo'}) - self.assertEqualIgnoreOrder(pf.all_data(), expected_data) + assert_lists_equal_ignoring_order(pf.all_data(), expected_data, sort_key='mytext') def test_row_TTL_expiry_during_paging(self): cursor = self.prepare() @@ -1064,7 +1085,7 @@ class TestPagingDatasetChanges(BasePagingTester, PageAssertionMixin): # no need to request page here, because the first page is automatically retrieved page1 = pf.page_data(1) - self.assertEqualIgnoreOrder(page1, data[:500]) + assert_lists_equal_ignoring_order(page1, data[:500], sort_key="mytext") # set some TTLs for data on page 3 for row in data[1000:1500]: @@ -1080,7 +1101,7 @@ class TestPagingDatasetChanges(BasePagingTester, PageAssertionMixin): # check page two pf.request_one() page2 = pf.page_data(2) - self.assertEqualIgnoreOrder(page2, data[500:1000]) + assert_lists_equal_ignoring_order(page2, data[500:1000], sort_key="mytext") page3expected = [] for row in data[1000:1500]: @@ -1093,7 +1114,7 @@ class TestPagingDatasetChanges(BasePagingTester, PageAssertionMixin): pf.request_one() page3 = pf.page_data(3) - self.assertEqualIgnoreOrder(page3, page3expected) + assert_lists_equal_ignoring_order(page3, page3expected, sort_key="mytext") class TestPagingQueryIsolation(BasePagingTester, PageAssertionMixin): @@ -1476,13 +1497,11 @@ class TestPagingWithDeletions(BasePagingTester, PageAssertionMixin): def test_failure_threshold_deletions(self): """Test that paging throws a failure in case of tombstone threshold """ self.fixture_dtest_setup.allow_log_errors = True - self.cluster.set_configuration_options( - values={'tombstone_failure_threshold': 500, - 'read_request_timeout_in_ms': 1000, - 'request_timeout_in_ms': 1000, - 'range_request_timeout_in_ms': 1000} - ) - cursor = self.prepare() + cursor = self.prepare( + extra_config_options={'tombstone_failure_threshold': 500, + 'read_request_timeout_in_ms': 1000, + 'request_timeout_in_ms': 1000, + 'range_request_timeout_in_ms': 1000}) nodes = self.cluster.nodelist() self.setup_schema(cursor) @@ -1530,6 +1549,7 @@ for klaus in BasePagingTester.__subclasses__(): assert gen_class_name not in globals() upgrade_applies_to_env = RUN_STATIC_UPGRADE_MATRIX or spec['UPGRADE_PATH'].upgrade_meta.matches_current_env_version_family + cls = type(gen_class_name, (klaus,), spec) if not upgrade_applies_to_env: - pytest.mark.skip(reason='test not applicable to env.') - globals()[gen_class_name] = type(gen_class_name, (klaus,), spec) + add_skip(cls, 'test not applicable to env.') + globals()[gen_class_name] = cls diff --git a/upgrade_tests/regression_test.py b/upgrade_tests/regression_test.py index 2e16645..e36e19a 100644 --- a/upgrade_tests/regression_test.py +++ b/upgrade_tests/regression_test.py @@ -12,6 +12,7 @@ from cassandra import ConsistencyLevel as CL from dtest import RUN_STATIC_UPGRADE_MATRIX from tools.jmxutils import (JolokiaAgent, make_mbean) +from tools.misc import add_skip from .upgrade_base import UpgradeTester from .upgrade_manifest import build_upgrade_pairs @@ -145,8 +146,8 @@ class TestForRegressions(UpgradeTester): logger.debug(response) schemas = response.split('Schema versions:')[1].strip() num_schemas = len(re.findall(r'\[.*?\]', schemas)) - self.assertEqual(num_schemas, 1, "There were multiple schema versions during an upgrade: {}" - .format(schemas)) + assert num_schemas == 1, "There were multiple schema versions during an upgrade: {}" \ + .format(schemas) for node in self.cluster.nodelist(): validate_schema_agreement(node, False) @@ -180,6 +181,7 @@ for path in build_upgrade_pairs(): '__test__': True} upgrade_applies_to_env = RUN_STATIC_UPGRADE_MATRIX or path.upgrade_meta.matches_current_env_version_family + cls = type(gen_class_name, (TestForRegressions,), spec) if not upgrade_applies_to_env: - pytest.mark.skip(reason='test not applicable to env.') - globals()[gen_class_name] = type(gen_class_name, (TestForRegressions,), spec) + add_skip(cls, 'test not applicable to env.') + globals()[gen_class_name] = cls diff --git a/upgrade_tests/repair_test.py b/upgrade_tests/repair_test.py index 9ac45a9..36eb5cc 100644 --- a/upgrade_tests/repair_test.py +++ b/upgrade_tests/repair_test.py @@ -15,10 +15,10 @@ LEGACY_SSTABLES_JVM_ARGS = ["-Dcassandra.streamdes.initial_mem_buffer_size=1", # We don't support directly upgrading from 2.2 to 4.0 so disabling this on 4.0. # TODO: we should probably not hardcode versions? @pytest.mark.upgrade_test -@since('3.0', max_version='4') +@since('3.0', max_version='3.99') class TestUpgradeRepair(BaseRepairTest): - @since('3.0') + @since('3.0', max_version='3.99') def test_repair_after_upgrade(self): """ @jira_ticket CASSANDRA-10990 diff --git a/upgrade_tests/storage_engine_upgrade_test.py b/upgrade_tests/storage_engine_upgrade_test.py index 4cc718c..c1fc228 100644 --- a/upgrade_tests/storage_engine_upgrade_test.py +++ b/upgrade_tests/storage_engine_upgrade_test.py @@ -3,7 +3,7 @@ import time import pytest import logging -from dtest import Tester +from dtest import Tester, MAJOR_VERSION_4 from sstable_generation_loading_test import TestBaseSStableLoader from thrift_bindings.thrift010.Cassandra import (ConsistencyLevel, Deletion, Mutation, SlicePredicate, @@ -25,13 +25,11 @@ LEGACY_SSTABLES_JVM_ARGS = ["-Dcassandra.streamdes.initial_mem_buffer_size=1", @since('3.0') class TestStorageEngineUpgrade(Tester): - def setUp(self, bootstrap=False, jvm_args=None): - super(TestStorageEngineUpgrade, self).setUp() - self.default_install_dir = self.cluster.get_install_dir() - self.bootstrap = bootstrap - if jvm_args is None: - jvm_args = [] - self.jvm_args = jvm_args + @pytest.fixture(autouse=True) + def storage_engine_upgrade_setup(self, fixture_dtest_setup): + self.fixture_dtest_setup.default_install_dir = fixture_dtest_setup.cluster.get_install_dir() + self.fixture_dtest_setup.bootstrap = False + return () def _setup_cluster(self, create_keyspace=True, cluster_options=None): cluster = self.cluster @@ -40,10 +38,11 @@ class TestStorageEngineUpgrade(Tester): cluster.set_configuration_options(cluster_options) # Forcing cluster version on purpose - if self.dtest_config.cassandra_version_from_build >= '4': + if self.dtest_config.cassandra_version_from_build >= MAJOR_VERSION_4: cluster.set_install_dir(version="git:cassandra-3.0") else: cluster.set_install_dir(version="git:cassandra-2.1") + self.fixture_dtest_setup.reinitialize_cluster_for_different_version() cluster.populate(1).start() node1 = cluster.nodelist()[0] @@ -62,14 +61,14 @@ class TestStorageEngineUpgrade(Tester): time.sleep(.5) node1.stop(wait_other_notice=True) - node1.set_install_dir(install_dir=self.default_install_dir) + node1.set_install_dir(install_dir=self.fixture_dtest_setup.default_install_dir) node1.start(wait_other_notice=True, wait_for_binary_proto=True) - if self.bootstrap: - cluster.set_install_dir(install_dir=self.default_install_dir) + if self.fixture_dtest_setup.bootstrap: + cluster.set_install_dir(install_dir=self.fixture_dtest_setup.default_install_dir) # Add a new node, bootstrap=True ensures that it is not a seed node2 = new_node(cluster, bootstrap=True) - node2.start(wait_for_binary_proto=True, jvm_args=self.jvm_args) + node2.start(wait_for_binary_proto=True, jvm_args=self.fixture_dtest_setup.jvm_args) temp_files = self.glob_data_dirs(os.path.join('*', "tmp", "*.dat")) logger.debug("temp files: " + str(temp_files)) @@ -140,6 +139,10 @@ class TestStorageEngineUpgrade(Tester): for r in range(ROWS): session.execute("INSERT INTO t(k, t, v) VALUES ({n}, {r}, {r})".format(n=n, r=r)) + #4.0 doesn't support compact storage + if compact_storage and self.dtest_config.cassandra_version_from_build >= MAJOR_VERSION_4: + session.execute("ALTER TABLE t DROP COMPACT STORAGE;") + session = self._do_upgrade() for n in range(PARTITIONS): @@ -151,8 +154,8 @@ class TestStorageEngineUpgrade(Tester): [[n, v, v] for v in range(ROWS - 1, -1, -1)]) # Querying a "large" slice - start = ROWS / 10 - end = ROWS - 1 - (ROWS / 10) + start = ROWS // 10 + end = ROWS - 1 - (ROWS // 10) assert_all(session, "SELECT * FROM t WHERE k = {n} AND t >= {start} AND t < {end}".format(n=n, start=start, end=end), [[n, v, v] for v in range(start, end)]) @@ -161,8 +164,8 @@ class TestStorageEngineUpgrade(Tester): [[n, v, v] for v in range(end - 1, start - 1, -1)]) # Querying a "small" slice - start = ROWS / 2 - end = ROWS / 2 + 5 + start = ROWS // 2 + end = ROWS // 2 + 5 assert_all(session, "SELECT * FROM t WHERE k = {n} AND t >= {start} AND t < {end}".format(n=n, start=start, end=end), [[n, v, v] for v in range(start, end)]) @@ -179,8 +182,8 @@ class TestStorageEngineUpgrade(Tester): [[n, v, v] for v in range(ROWS - 1, -1, -1)]) # Querying a "large" slice - start = ROWS / 10 - end = ROWS - 1 - (ROWS / 10) + start = ROWS // 10 + end = ROWS - 1 - (ROWS // 10) assert_all(session, "SELECT * FROM t WHERE k = {n} AND t >= {start} AND t < {end}".format(n=n, start=start, end=end), [[n, v, v] for v in range(start, end)]) @@ -189,8 +192,8 @@ class TestStorageEngineUpgrade(Tester): [[n, v, v] for v in range(end - 1, start - 1, -1)]) # Querying a "small" slice - start = ROWS / 2 - end = ROWS / 2 + 5 + start = ROWS // 2 + end = ROWS // 2 + 5 assert_all(session, "SELECT * FROM t WHERE k = {n} AND t >= {start} AND t < {end}".format(n=n, start=start, end=end), [[n, v, v] for v in range(start, end)]) @@ -209,15 +212,25 @@ class TestStorageEngineUpgrade(Tester): for n in range(PARTITIONS): session.execute("INSERT INTO t(k, v1, v2, v3, v4) VALUES ({}, {}, {}, {}, {})".format(n, n + 1, n + 2, n + 3, n + 4)) + is40 = self.dtest_config.cassandra_version_from_build >= MAJOR_VERSION_4 + if compact_storage and is40: + session.execute("ALTER TABLE t DROP COMPACT STORAGE;") + session = self._do_upgrade() + def maybe_add_compact_columns(expected): + if is40 and compact_storage: + expected.insert(1, None) + expected.append(None) + return expected + for n in range(PARTITIONS): - assert_one(session, "SELECT * FROM t WHERE k = {}".format(n), [n, n + 1, n + 2, n + 3, n + 4]) + assert_one(session, "SELECT * FROM t WHERE k = {}".format(n), maybe_add_compact_columns([n, n + 1, n + 2, n + 3, n + 4])) self.cluster.compact() for n in range(PARTITIONS): - assert_one(session, "SELECT * FROM t WHERE k = {}".format(n), [n, n + 1, n + 2, n + 3, n + 4]) + assert_one(session, "SELECT * FROM t WHERE k = {}".format(n), maybe_add_compact_columns([n, n + 1, n + 2, n + 3, n + 4])) def test_upgrade_with_statics(self): self.upgrade_with_statics(rows=10) @@ -402,7 +415,7 @@ class TestStorageEngineUpgrade(Tester): assert_one(session, "SELECT k FROM t", ['some_key']) - @since('3.0', max_version='4') + @since('3.0', max_version='3.99') def test_upgrade_with_range_tombstone_eoc_0(self): """ Check sstable upgrading when the sstable contains a range tombstone with EOC=0. @@ -457,35 +470,68 @@ class TestStorageEngineUpgrade(Tester): @since('3.0') class TestBootstrapAfterUpgrade(TestStorageEngineUpgrade): - def setUp(self): - super(TestBootstrapAfterUpgrade, self).setUp(bootstrap=True, jvm_args=LEGACY_SSTABLES_JVM_ARGS) - + @pytest.fixture(autouse=True) + def set_up(self, storage_engine_upgrade_setup): + self.fixture_dtest_setup.bootstrap=True + self.fixture_dtest_setup.jvm_args=LEGACY_SSTABLES_JVM_ARGS @pytest.mark.upgrade_test -@since('3.0', max_version='4') +@since('3.0', max_version='3.99') class TestLoadKaSStables(TestBaseSStableLoader): - upgrade_from = '2.1.6' + upgrade_test = True + upgrade_from = '2.1.20' jvm_args = LEGACY_SSTABLES_JVM_ARGS @pytest.mark.upgrade_test -@since('3.0', max_version='4') +@since('3.0', max_version='3.99') class TestLoadKaCompactSStables(TestBaseSStableLoader): - upgrade_from = '2.1.6' + upgrade_test = True + upgrade_from = '2.1.20' jvm_args = LEGACY_SSTABLES_JVM_ARGS - compact = True + test_compact = True @pytest.mark.upgrade_test -@since('3.0', max_version='4') +@since('3.0', max_version='3.99') class TestLoadLaSStables(TestBaseSStableLoader): - upgrade_from = '2.2.4' + upgrade_test = True + upgrade_from = '2.2.13' jvm_args = LEGACY_SSTABLES_JVM_ARGS @pytest.mark.upgrade_test -@since('3.0', max_version='4') +@since('3.0', max_version='3.99') class TestLoadLaCompactSStables(TestBaseSStableLoader): - upgrade_from = '2.2.4' + upgrade_test = True + upgrade_from = '2.2.13' jvm_args = LEGACY_SSTABLES_JVM_ARGS - compact = True + test_compact = True + + +@pytest.mark.upgrade_test +@since('4.0', max_version='4.99') +class TestLoadMdSStables(TestBaseSStableLoader): + upgrade_from = '3.0.17' + + +@pytest.mark.upgrade_test +@pytest.mark.skip("4.0 sstableloader can't handle formerly compact tables even after drop compact storage, rebuild, cleanup") +@since('4.0', max_version='4.99') +class TestLoadMdCompactSStables(TestBaseSStableLoader): + upgrade_from = '3.0.17' + test_compact = True + + +@pytest.mark.upgrade_test +@since('4.0', max_version='4.99') +class TestLoadMdThreeOneOneSStables(TestBaseSStableLoader): + upgrade_from = '3.11.3' + + +@pytest.mark.upgrade_test +@pytest.mark.skip("4.0 sstableloader can't handle formerly compact tables even after drop compact storage, rebuild, cleanup") +@since('4.0', max_version='4.99') +class TestLoadMdThreeOneOneCompactSStables(TestBaseSStableLoader): + upgrade_from = '3.11.3' + test_compact = True diff --git a/upgrade_tests/thrift_upgrade_test.py b/upgrade_tests/thrift_upgrade_test.py index 42343a2..b427660 100644 --- a/upgrade_tests/thrift_upgrade_test.py +++ b/upgrade_tests/thrift_upgrade_test.py @@ -10,7 +10,8 @@ from thrift_bindings.thrift010.Cassandra import (Column, ColumnDef, ColumnParent, ConsistencyLevel, SlicePredicate, SliceRange) from thrift_test import _i64, get_thrift_client -from tools.assertions import assert_length_equal +from tools.assertions import assert_length_equal, assert_lists_of_dicts_equal +from tools.misc import wait_for_agreement, add_skip from .upgrade_base import UpgradeTester from .upgrade_manifest import build_upgrade_pairs @@ -18,91 +19,173 @@ since = pytest.mark.since logger = logging.getLogger(__name__) -def _create_dense_super_cf(name): - return Cassandra.CfDef('ks', name, column_type='Super', +def _create_dense_super_cf(thrift, name): + cfdef = Cassandra.CfDef('ks', name, column_type='Super', key_validation_class='AsciiType', # pk comparator_type='AsciiType', # ck default_validation_class='AsciiType', # SC value subcomparator_type='LongType') # SC key + thrift.system_add_column_family(cfdef) + wait_for_agreement(thrift) -def _create_sparse_super_cf(name): - cd1 = ColumnDef('col1', 'LongType', None, None) - cd2 = ColumnDef('col2', 'LongType', None, None) - return Cassandra.CfDef('ks', name, column_type='Super', +def _create_sparse_super_cf(thrift, name): + cd1 = ColumnDef('col1'.encode(), 'LongType', None, None) + cd2 = ColumnDef('col2'.encode(), 'LongType', None, None) + cfdef = Cassandra.CfDef('ks', name, column_type='Super', column_metadata=[cd1, cd2], key_validation_class='AsciiType', comparator_type='AsciiType', subcomparator_type='AsciiType') + thrift.system_add_column_family(cfdef) + wait_for_agreement(thrift) -def _validate_sparse_cql(cursor, cf='sparse_super_1', column1='column1', col1='col1', col2='col2', key='key'): +def unpack(lst): + result_list = [] + for item_dict in lst: + normalized_dict = {} + for key, value in item_dict.items(): + if hasattr(value, "items"): + assert(key == '') + for a, b in value.items(): + normalized_dict[a] = b + else: + normalized_dict[key] = value + result_list.append(normalized_dict) + return result_list + + +def add_value(list): + """Helper for _validate_sparse_cql to modify expected results based""" + for item in list: + key = item.get('key', None) + if key is None: + key = item.get('renamed_key') + + value_key = 'value1' if key == 'k1' else 'value2' + item[value_key]=_i64(100) + + +def _validate_sparse_cql(cursor, cf='sparse_super_1', column1='column1', col1='col1', col2='col2', key='key', is_version_4_or_greater=False): cursor.execute('use ks') - assert (list(cursor.execute("SELECT * FROM {}".format(cf))) == - [{key: 'k1', column1: 'key1', col1: 200, col2: 300}, - {key: 'k1', column1: 'key2', col1: 200, col2: 300}, - {key: 'k2', column1: 'key1', col1: 200, col2: 300}, - {key: 'k2', column1: 'key2', col1: 200, col2: 300}]) + result = unpack(list(cursor.execute("SELECT * FROM {}".format(cf)))) + + expected = [{key: 'k1', column1: 'key1', col1: 200, col2: 300}, + {key: 'k1', column1: 'key2', col1: 200, col2: 300}, + {key: 'k2', column1: 'key1', col1: 200, col2: 300}, + {key: 'k2', column1: 'key2', col1: 200, col2: 300}] + if is_version_4_or_greater: + add_value(expected) + assert_lists_of_dicts_equal(result, expected) - assert (list(cursor.execute("SELECT * FROM {} WHERE {} = 'k1'".format(cf, key))) == - [{key: 'k1', column1: 'key1', col1: 200, col2: 300}, - {key: 'k1', column1: 'key2', col1: 200, col2: 300}]) + result = unpack(list(cursor.execute("SELECT * FROM {} WHERE {} = 'k1'".format(cf, key)))) + expected = [{key: 'k1', column1: 'key1', col1: 200, col2: 300}, + {key: 'k1', column1: 'key2', col1: 200, col2: 300}] + if is_version_4_or_greater: + add_value(expected) + assert_lists_of_dicts_equal(result, expected) - assert (list(cursor.execute("SELECT * FROM {} WHERE {} = 'k2' AND {} = 'key1'".format(cf, key, column1))) == - [{key: 'k2', column1: 'key1', col1: 200, col2: 300}]) + result = unpack(list(cursor.execute("SELECT * FROM {} WHERE {} = 'k2' AND {} = 'key1'".format(cf, key, column1)))) + expected = [{key: 'k2', column1: 'key1', col1: 200, col2: 300}] + if is_version_4_or_greater: + add_value(expected) + assert_lists_of_dicts_equal(result, expected) def _validate_sparse_thrift(client, cf='sparse_super_1'): - client.transport.open() + try: + client.transport.open() + except: + pass client.set_keyspace('ks') - result = client.get_slice('k1', ColumnParent(cf), SlicePredicate(slice_range=SliceRange('', '', False, 5)), ConsistencyLevel.ONE) + result = client.get_slice('k1'.encode(), ColumnParent(cf), SlicePredicate(slice_range=SliceRange(''.encode(), ''.encode(), False, 5)), ConsistencyLevel.ONE) assert_length_equal(result, 2) - assert result[0].super_column.name == 'key1' - assert result[1].super_column.name == 'key2' + assert result[0].super_column.name == 'key1'.encode() + assert result[1].super_column.name == 'key2'.encode() for cosc in result: - assert cosc.super_column.columns[0].name == 'col1' + assert cosc.super_column.columns[0].name == 'col1'.encode() assert cosc.super_column.columns[0].value == _i64(200) - assert cosc.super_column.columns[1].name == 'col2' + assert cosc.super_column.columns[1].name == 'col2'.encode() assert cosc.super_column.columns[1].value == _i64(300) - assert cosc.super_column.columns[2].name == 'value1' + assert cosc.super_column.columns[2].name == 'value1'.encode() assert cosc.super_column.columns[2].value == _i64(100) -def _validate_dense_cql(cursor, cf='dense_super_1', key='key', column1='column1', column2='column2', value='value'): +def _validate_dense_cql(cursor, cf='dense_super_1', key='key', column1='column1', column2='column2', value='value', is_version_4_or_greater=False): cursor.execute('use ks') - assert (list(cursor.execute("SELECT * FROM {}".format(cf))) == - [{key: 'k1', column1: 'key1', column2: 100, value: 'value1'}, + expected = [{key: 'k1', column1: 'key1', column2: 100, value: 'value1'}, {key: 'k1', column1: 'key2', column2: 100, value: 'value1'}, {key: 'k2', column1: 'key1', column2: 200, value: 'value2'}, - {key: 'k2', column1: 'key2', column2: 200, value: 'value2'}]) - - assert (list(cursor.execute("SELECT * FROM {} WHERE {} = 'k1'".format(cf, key))) == - [{key: 'k1', column1: 'key1', column2: 100, value: 'value1'}, - {key: 'k1', column1: 'key2', column2: 100, value: 'value1'}]) - - assert (list(cursor.execute("SELECT * FROM {} WHERE {} = 'k1' AND {} = 'key1'".format(cf, key, column1))) == - [{key: 'k1', column1: 'key1', column2: 100, value: 'value1'}]) - - assert (list(cursor.execute("SELECT * FROM {} WHERE {} = 'k1' AND {} = 'key1' AND {} = 100".format(cf, key, column1, column2))) == - [{key: 'k1', column1: 'key1', column2: 100, value: 'value1'}]) + {key: 'k2', column1: 'key2', column2: 200, value: 'value2'}] + if is_version_4_or_greater: + expected[0][100]='value1' + expected[1][100]='value1' + expected[2][200]='value2' + expected[3][200]='value2' + for dict in expected: + del dict[value] + for dict in expected: + del dict[column2] + result = unpack(list(cursor.execute("SELECT * FROM {}".format(cf)))) + assert_lists_of_dicts_equal(result, expected) + + result = unpack(list(cursor.execute("SELECT * FROM {} WHERE {} = 'k1'".format(cf, key)))) + expected = [{key: 'k1', column1: 'key1', column2: 100, value: 'value1'}, + {key: 'k1', column1: 'key2', column2: 100, value: 'value1'}] + if is_version_4_or_greater: + expected[0][100]='value1' + expected[1][100]='value1' + for dict in expected: + del dict[value] + for dict in expected: + del dict[column2] + assert_lists_of_dicts_equal(result, expected) + + result = unpack(list(cursor.execute("SELECT * FROM {} WHERE {} = 'k1' AND {} = 'key1'".format(cf, key, column1)))) + expected = [{key: 'k1', column1: 'key1', column2: 100, value: 'value1'}] + if is_version_4_or_greater: + expected[0][100]='value1' + for dict in expected: + del dict[value] + for dict in expected: + del dict[column2] + assert_lists_of_dicts_equal(result, expected) + + if is_version_4_or_greater: + result = unpack(list(cursor.execute("SELECT * FROM {} WHERE {} = 'k1' AND {} = 'key1' AND \"\" CONTAINS KEY 100 ALLOW FILTERING".format(cf, key, column1, column2)))) + else: + result = list(cursor.execute("SELECT * FROM {} WHERE {} = 'k1' AND {} = 'key1' AND {} = 100".format(cf, key, column1, column2))) + + expected = [{key: 'k1', column1: 'key1', column2: 100, value: 'value1'}] + if is_version_4_or_greater: + expected[0][100]='value1' + for dict in expected: + del dict[value] + for dict in expected: + del dict[column2] + assert_lists_of_dicts_equal(result, expected) def _validate_dense_thrift(client, cf='dense_super_1'): - client.transport.open() + try: + client.transport.open() + except: + pass client.set_keyspace('ks') - result = client.get_slice('k1', ColumnParent(cf), SlicePredicate(slice_range=SliceRange('', '', False, 5)), ConsistencyLevel.ONE) + result = client.get_slice('k1'.encode(), ColumnParent(cf), SlicePredicate(slice_range=SliceRange(''.encode(), ''.encode(), False, 5)), ConsistencyLevel.ONE) assert_length_equal(result, 2) - assert result[0].super_column.name == 'key1' - assert result[1].super_column.name == 'key2' + assert result[0].super_column.name == 'key1'.encode() + assert result[1].super_column.name == 'key2'.encode() print((result[0])) print((result[1])) for cosc in result: assert cosc.super_column.columns[0].name == _i64(100) - assert cosc.super_column.columns[0].value == 'value1' + assert cosc.super_column.columns[0].value == 'value1'.encode() @pytest.mark.upgrade_test @@ -124,6 +207,9 @@ class TestUpgradeSuperColumnsThrough(Tester): node.set_configuration_options(values={'start_rpc': 'true'}) logger.debug("Set new cassandra dir for %s: %s" % (node.name, node.get_install_dir())) self.cluster.set_install_dir(version=tag) + self.fixture_dtest_setup.reinitialize_cluster_for_different_version() + for node in nodes: + node.set_configuration_options(values={'start_rpc': 'true'}) # Restart nodes on new version for node in nodes: @@ -138,6 +224,7 @@ class TestUpgradeSuperColumnsThrough(Tester): # Forcing cluster version on purpose cluster.set_install_dir(version=cassandra_version) + self.fixture_dtest_setup.reinitialize_cluster_for_different_version() cluster.populate(num_nodes) for node in self.cluster.nodelist(): @@ -159,24 +246,30 @@ class TestUpgradeSuperColumnsThrough(Tester): client.transport.open() client.set_keyspace('ks') - client.system_add_column_family(_create_dense_super_cf('dense_super_1')) + _create_dense_super_cf(client, 'dense_super_1') for i in range(1, 3): - client.insert('k1', ColumnParent('dense_super_1', 'key{}'.format(i)), Column(_i64(100), 'value1', 0), ConsistencyLevel.ONE) - client.insert('k2', ColumnParent('dense_super_1', 'key{}'.format(i)), Column(_i64(200), 'value2', 0), ConsistencyLevel.ONE) + client.insert('k1'.encode(), ColumnParent('dense_super_1', 'key{}'.format(i).encode()), Column(_i64(100), 'value1'.encode(), 0), ConsistencyLevel.ONE) + client.insert('k2'.encode(), ColumnParent('dense_super_1', 'key{}'.format(i).encode()), Column(_i64(200), 'value2'.encode(), 0), ConsistencyLevel.ONE) _validate_dense_thrift(client, cf='dense_super_1') - node.stop() self.set_node_to_current_version(node) - node.set_configuration_options(values={'start_rpc': 'true'}) + #4.0 doesn't support compact storage + if node.get_cassandra_version() >= '4': + cursor.execute("ALTER TABLE ks.dense_super_1 DROP COMPACT STORAGE;") + + node.stop() + if node.get_cassandra_version() < '4': + node.set_configuration_options(values={'start_rpc': 'true'}) node.start() cursor = self.patient_cql_connection(node, row_factory=dict_factory) - client = get_thrift_client(host, port) - _validate_dense_thrift(client, cf='dense_super_1') - _validate_dense_cql(cursor, cf='dense_super_1') + if node.get_cassandra_version() < '4': + client = get_thrift_client(host, port) + _validate_dense_thrift(client, cf='dense_super_1') + _validate_dense_cql(cursor, cf='dense_super_1', is_version_4_or_greater=node.get_cassandra_version() >= '4') def test_dense_supercolumn(self): cluster = self.prepare() @@ -192,11 +285,11 @@ class TestUpgradeSuperColumnsThrough(Tester): client.transport.open() client.set_keyspace('ks') - client.system_add_column_family(_create_dense_super_cf('dense_super_1')) + _create_dense_super_cf(client, 'dense_super_1') for i in range(1, 3): - client.insert('k1', ColumnParent('dense_super_1', 'key{}'.format(i)), Column(_i64(100), 'value1', 0), ConsistencyLevel.ONE) - client.insert('k2', ColumnParent('dense_super_1', 'key{}'.format(i)), Column(_i64(200), 'value2', 0), ConsistencyLevel.ONE) + client.insert('k1'.encode(), ColumnParent('dense_super_1', 'key{}'.format(i).encode()), Column(_i64(100), 'value1'.encode(), 0), ConsistencyLevel.ONE) + client.insert('k2'.encode(), ColumnParent('dense_super_1', 'key{}'.format(i).encode()), Column(_i64(200), 'value2'.encode(), 0), ConsistencyLevel.ONE) _validate_dense_thrift(client, cf='dense_super_1') _validate_dense_cql(cursor, cf='dense_super_1') @@ -208,16 +301,22 @@ class TestUpgradeSuperColumnsThrough(Tester): _validate_dense_thrift(client, cf='dense_super_1') - node.stop() self.set_node_to_current_version(node) - node.set_configuration_options(values={'start_rpc': 'true'}) + #4.0 doesn't support compact storage + if node.get_cassandra_version() >= '4': + cursor.execute("ALTER TABLE ks.dense_super_1 DROP COMPACT STORAGE;") + + node.stop() + if node.get_cassandra_version() < '4': + node.set_configuration_options(values={'start_rpc': 'true'}) node.start() - cursor = self.patient_cql_connection(node, row_factory=dict_factory) - client = get_thrift_client(host, port) + if node.get_cassandra_version() < '4': + client = get_thrift_client(host, port) + _validate_dense_thrift(client, cf='dense_super_1') - _validate_dense_thrift(client, cf='dense_super_1') - _validate_dense_cql(cursor, cf='dense_super_1') + cursor = self.patient_cql_connection(node, row_factory=dict_factory) + _validate_dense_cql(cursor, cf='dense_super_1', is_version_4_or_greater=node.get_cassandra_version() >= '4') def test_sparse_supercolumn(self): cluster = self.prepare() @@ -233,17 +332,16 @@ class TestUpgradeSuperColumnsThrough(Tester): client.transport.open() client.set_keyspace('ks') - cf = _create_sparse_super_cf('sparse_super_2') - client.system_add_column_family(cf) + _create_sparse_super_cf(client, 'sparse_super_2') for i in range(1, 3): - client.insert('k1', ColumnParent('sparse_super_2', 'key{}'.format(i)), Column("value1", _i64(100), 0), ConsistencyLevel.ONE) - client.insert('k1', ColumnParent('sparse_super_2', 'key{}'.format(i)), Column("col1", _i64(200), 0), ConsistencyLevel.ONE) - client.insert('k1', ColumnParent('sparse_super_2', 'key{}'.format(i)), Column("col2", _i64(300), 0), ConsistencyLevel.ONE) + client.insert('k1'.encode(), ColumnParent('sparse_super_2', 'key{}'.format(i).encode()), Column("value1".encode(), _i64(100), 0), ConsistencyLevel.ONE) + client.insert('k1'.encode(), ColumnParent('sparse_super_2', 'key{}'.format(i).encode()), Column("col1".encode(), _i64(200), 0), ConsistencyLevel.ONE) + client.insert('k1'.encode(), ColumnParent('sparse_super_2', 'key{}'.format(i).encode()), Column("col2".encode(), _i64(300), 0), ConsistencyLevel.ONE) - client.insert('k2', ColumnParent('sparse_super_2', 'key{}'.format(i)), Column("value2", _i64(100), 0), ConsistencyLevel.ONE) - client.insert('k2', ColumnParent('sparse_super_2', 'key{}'.format(i)), Column("col1", _i64(200), 0), ConsistencyLevel.ONE) - client.insert('k2', ColumnParent('sparse_super_2', 'key{}'.format(i)), Column("col2", _i64(300), 0), ConsistencyLevel.ONE) + client.insert('k2'.encode(), ColumnParent('sparse_super_2', 'key{}'.format(i).encode()), Column("value2".encode(), _i64(100), 0), ConsistencyLevel.ONE) + client.insert('k2'.encode(), ColumnParent('sparse_super_2', 'key{}'.format(i).encode()), Column("col1".encode(), _i64(200), 0), ConsistencyLevel.ONE) + client.insert('k2'.encode(), ColumnParent('sparse_super_2', 'key{}'.format(i).encode()), Column("col2".encode(), _i64(300), 0), ConsistencyLevel.ONE) _validate_sparse_thrift(client, cf='sparse_super_2') _validate_sparse_cql(cursor, cf='sparse_super_2') @@ -255,20 +353,27 @@ class TestUpgradeSuperColumnsThrough(Tester): _validate_sparse_thrift(client, cf='sparse_super_2') - node.stop() self.set_node_to_current_version(node) - node.set_configuration_options(values={'start_rpc': 'true'}) + is_version_4_or_greater = node.get_cassandra_version() >= '4' + #4.0 doesn't support compact storage + if is_version_4_or_greater: + cursor.execute("ALTER TABLE ks.sparse_super_2 DROP COMPACT STORAGE;") + + node.stop() + if not is_version_4_or_greater: + node.set_configuration_options(values={'start_rpc': 'true'}) node.start() - cursor = self.patient_cql_connection(node, row_factory=dict_factory) - client = get_thrift_client(host, port) + if not is_version_4_or_greater: + client = get_thrift_client(host, port) + _validate_sparse_thrift(client, cf='sparse_super_2') - _validate_sparse_thrift(client, cf='sparse_super_2') - _validate_sparse_cql(cursor, cf='sparse_super_2') + cursor = self.patient_cql_connection(node, row_factory=dict_factory) + _validate_sparse_cql(cursor, cf='sparse_super_2', is_version_4_or_greater=is_version_4_or_greater) @pytest.mark.upgrade_test -@since('2.1', max_version='4.0.0') +@since('2.1', max_version='3.99') class TestThrift(UpgradeTester): """ Verify dense and sparse supercolumn functionality with and without renamed columns @@ -289,20 +394,27 @@ class TestThrift(UpgradeTester): client.transport.open() client.set_keyspace('ks') - client.system_add_column_family(_create_dense_super_cf('dense_super_1')) + _create_dense_super_cf(client, 'dense_super_1') for i in range(1, 3): - client.insert('k1', ColumnParent('dense_super_1', 'key{}'.format(i)), Column(_i64(100), 'value1', 0), ConsistencyLevel.ONE) - client.insert('k2', ColumnParent('dense_super_1', 'key{}'.format(i)), Column(_i64(200), 'value2', 0), ConsistencyLevel.ONE) + client.insert('k1'.encode(), ColumnParent('dense_super_1', 'key{}'.format(i).encode()), Column(_i64(100), 'value1'.encode(), 0), ConsistencyLevel.ONE) + client.insert('k2'.encode(), ColumnParent('dense_super_1', 'key{}'.format(i).encode()), Column(_i64(200), 'value2'.encode(), 0), ConsistencyLevel.ONE) _validate_dense_cql(cursor) _validate_dense_thrift(client) + version_string = self.upgrade_version_string() + is_version_4_or_greater = version_string == 'trunk' or version_string >= '4.0' + #4.0 doesn't support compact storage + if is_version_4_or_greater: + cursor.execute("ALTER TABLE ks.dense_super_1 DROP COMPACT STORAGE;") + for is_upgraded, cursor in self.do_upgrade(cursor, row_factory=dict_factory, use_thrift=True): logger.debug("Querying {} node".format("upgraded" if is_upgraded else "old")) - client = get_thrift_client(host, port) - _validate_dense_cql(cursor) - _validate_dense_thrift(client) + if not is_version_4_or_greater: + client = get_thrift_client(host, port) + _validate_dense_thrift(client) + _validate_dense_cql(cursor, is_version_4_or_greater=is_version_4_or_greater) def test_dense_supercolumn_with_renames(self): cursor = self.prepare(row_factory=dict_factory) @@ -317,11 +429,11 @@ class TestThrift(UpgradeTester): client.transport.open() client.set_keyspace('ks') - client.system_add_column_family(_create_dense_super_cf('dense_super_2')) + _create_dense_super_cf(client, 'dense_super_2') for i in range(1, 3): - client.insert('k1', ColumnParent('dense_super_2', 'key{}'.format(i)), Column(_i64(100), 'value1', 0), ConsistencyLevel.ONE) - client.insert('k2', ColumnParent('dense_super_2', 'key{}'.format(i)), Column(_i64(200), 'value2', 0), ConsistencyLevel.ONE) + client.insert('k1'.encode(), ColumnParent('dense_super_2', 'key{}'.format(i).encode()), Column(_i64(100), 'value1'.encode(), 0), ConsistencyLevel.ONE) + client.insert('k2'.encode(), ColumnParent('dense_super_2', 'key{}'.format(i).encode()), Column(_i64(200), 'value2'.encode(), 0), ConsistencyLevel.ONE) cursor.execute("ALTER TABLE ks.dense_super_2 RENAME key TO renamed_key") cursor.execute("ALTER TABLE ks.dense_super_2 RENAME column1 TO renamed_column1") @@ -331,11 +443,18 @@ class TestThrift(UpgradeTester): _validate_dense_cql(cursor, cf='dense_super_2', key='renamed_key', column1='renamed_column1', column2='renamed_column2', value='renamed_value') _validate_dense_thrift(client, cf='dense_super_2') + version_string = self.upgrade_version_string() + is_version_4_or_greater = version_string == 'trunk' or version_string >= '4.0' + #4.0 doesn't support compact storage + if is_version_4_or_greater: + cursor.execute("ALTER TABLE ks.dense_super_2 DROP COMPACT STORAGE;") + for is_upgraded, cursor in self.do_upgrade(cursor, row_factory=dict_factory, use_thrift=True): logger.debug("Querying {} node".format("upgraded" if is_upgraded else "old")) - client = get_thrift_client(host, port) - _validate_dense_cql(cursor, cf='dense_super_2', key='renamed_key', column1='renamed_column1', column2='renamed_column2', value='renamed_value') - _validate_dense_thrift(client, cf='dense_super_2') + if not is_version_4_or_greater: + client = get_thrift_client(host, port) + _validate_dense_thrift(client, cf='dense_super_2') + _validate_dense_cql(cursor, cf='dense_super_2', key='renamed_key', column1='renamed_column1', column2='renamed_column2', value='renamed_value', is_version_4_or_greater=is_version_4_or_greater) def test_sparse_supercolumn_with_renames(self): cursor = self.prepare(row_factory=dict_factory) @@ -350,29 +469,35 @@ class TestThrift(UpgradeTester): client.transport.open() client.set_keyspace('ks') - cf = _create_sparse_super_cf('sparse_super_1') - client.system_add_column_family(cf) + _create_sparse_super_cf(client, 'sparse_super_1') cursor.execute("ALTER TABLE ks.sparse_super_1 RENAME key TO renamed_key") cursor.execute("ALTER TABLE ks.sparse_super_1 RENAME column1 TO renamed_column1") for i in range(1, 3): - client.insert('k1', ColumnParent('sparse_super_1', 'key{}'.format(i)), Column("value1", _i64(100), 0), ConsistencyLevel.ONE) - client.insert('k1', ColumnParent('sparse_super_1', 'key{}'.format(i)), Column("col1", _i64(200), 0), ConsistencyLevel.ONE) - client.insert('k1', ColumnParent('sparse_super_1', 'key{}'.format(i)), Column("col2", _i64(300), 0), ConsistencyLevel.ONE) + client.insert('k1'.encode(), ColumnParent('sparse_super_1', 'key{}'.format(i).encode()), Column("value1".encode(), _i64(100), 0), ConsistencyLevel.ONE) + client.insert('k1'.encode(), ColumnParent('sparse_super_1', 'key{}'.format(i).encode()), Column("col1".encode(), _i64(200), 0), ConsistencyLevel.ONE) + client.insert('k1'.encode(), ColumnParent('sparse_super_1', 'key{}'.format(i).encode()), Column("col2".encode(), _i64(300), 0), ConsistencyLevel.ONE) - client.insert('k2', ColumnParent('sparse_super_1', 'key{}'.format(i)), Column("value2", _i64(100), 0), ConsistencyLevel.ONE) - client.insert('k2', ColumnParent('sparse_super_1', 'key{}'.format(i)), Column("col1", _i64(200), 0), ConsistencyLevel.ONE) - client.insert('k2', ColumnParent('sparse_super_1', 'key{}'.format(i)), Column("col2", _i64(300), 0), ConsistencyLevel.ONE) + client.insert('k2'.encode(), ColumnParent('sparse_super_1', 'key{}'.format(i).encode()), Column("value2".encode(), _i64(100), 0), ConsistencyLevel.ONE) + client.insert('k2'.encode(), ColumnParent('sparse_super_1', 'key{}'.format(i).encode()), Column("col1".encode(), _i64(200), 0), ConsistencyLevel.ONE) + client.insert('k2'.encode(), ColumnParent('sparse_super_1', 'key{}'.format(i).encode()), Column("col2".encode(), _i64(300), 0), ConsistencyLevel.ONE) _validate_sparse_thrift(client) _validate_sparse_cql(cursor, column1='renamed_column1', key='renamed_key') + version_string = self.upgrade_version_string() + is_version_4_or_greater = version_string == 'trunk' or version_string >= '4.0' + #4.0 doesn't support compact storage + if is_version_4_or_greater: + cursor.execute("ALTER TABLE ks.sparse_super_1 DROP COMPACT STORAGE;") + for is_upgraded, cursor in self.do_upgrade(cursor, row_factory=dict_factory, use_thrift=True): logger.debug("Querying {} node".format("upgraded" if is_upgraded else "old")) - client = get_thrift_client(host, port) - _validate_sparse_cql(cursor, column1='renamed_column1', key='renamed_key') - _validate_sparse_thrift(client) + if not is_version_4_or_greater: + client = get_thrift_client(host, port) + _validate_sparse_thrift(client) + _validate_sparse_cql(cursor, column1='renamed_column1', key='renamed_key', is_version_4_or_greater=is_version_4_or_greater) def test_sparse_supercolumn(self): cursor = self.prepare(row_factory=dict_factory) @@ -387,26 +512,32 @@ class TestThrift(UpgradeTester): client.transport.open() client.set_keyspace('ks') - cf = _create_sparse_super_cf('sparse_super_2') - client.system_add_column_family(cf) + _create_sparse_super_cf(client, 'sparse_super_2') for i in range(1, 3): - client.insert('k1', ColumnParent('sparse_super_2', 'key{}'.format(i)), Column("value1", _i64(100), 0), ConsistencyLevel.ONE) - client.insert('k1', ColumnParent('sparse_super_2', 'key{}'.format(i)), Column("col1", _i64(200), 0), ConsistencyLevel.ONE) - client.insert('k1', ColumnParent('sparse_super_2', 'key{}'.format(i)), Column("col2", _i64(300), 0), ConsistencyLevel.ONE) + client.insert('k1'.encode(), ColumnParent('sparse_super_2', 'key{}'.format(i).encode()), Column("value1".encode(), _i64(100), 0), ConsistencyLevel.ONE) + client.insert('k1'.encode(), ColumnParent('sparse_super_2', 'key{}'.format(i).encode()), Column("col1".encode(), _i64(200), 0), ConsistencyLevel.ONE) + client.insert('k1'.encode(), ColumnParent('sparse_super_2', 'key{}'.format(i).encode()), Column("col2".encode(), _i64(300), 0), ConsistencyLevel.ONE) - client.insert('k2', ColumnParent('sparse_super_2', 'key{}'.format(i)), Column("value2", _i64(100), 0), ConsistencyLevel.ONE) - client.insert('k2', ColumnParent('sparse_super_2', 'key{}'.format(i)), Column("col1", _i64(200), 0), ConsistencyLevel.ONE) - client.insert('k2', ColumnParent('sparse_super_2', 'key{}'.format(i)), Column("col2", _i64(300), 0), ConsistencyLevel.ONE) + client.insert('k2'.encode(), ColumnParent('sparse_super_2', 'key{}'.format(i).encode()), Column("value2".encode(), _i64(100), 0), ConsistencyLevel.ONE) + client.insert('k2'.encode(), ColumnParent('sparse_super_2', 'key{}'.format(i).encode()), Column("col1".encode(), _i64(200), 0), ConsistencyLevel.ONE) + client.insert('k2'.encode(), ColumnParent('sparse_super_2', 'key{}'.format(i).encode()), Column("col2".encode(), _i64(300), 0), ConsistencyLevel.ONE) _validate_sparse_thrift(client, cf='sparse_super_2') _validate_sparse_cql(cursor, cf='sparse_super_2') + version_string = self.upgrade_version_string() + is_version_4_or_greater = version_string == 'trunk' or version_string >= '4.0' + #4.0 doesn't support compact storage + if is_version_4_or_greater: + cursor.execute("ALTER TABLE ks.sparse_super_2 DROP COMPACT STORAGE;") + for is_upgraded, cursor in self.do_upgrade(cursor, row_factory=dict_factory, use_thrift=True): logger.debug("Querying {} node".format("upgraded" if is_upgraded else "old")) - client = get_thrift_client(host, port) - _validate_sparse_thrift(client, cf='sparse_super_2') - _validate_sparse_cql(cursor, cf='sparse_super_2') + if not is_version_4_or_greater: + client = get_thrift_client(host, port) + _validate_sparse_thrift(client, cf='sparse_super_2') + _validate_sparse_cql(cursor, cf='sparse_super_2', is_version_4_or_greater=is_version_4_or_greater) topology_specs = [ @@ -427,6 +558,7 @@ for spec in specs: assert gen_class_name not in globals() upgrade_applies_to_env = RUN_STATIC_UPGRADE_MATRIX or spec['UPGRADE_PATH'].upgrade_meta.matches_current_env_version_family + cls = type(gen_class_name, (TestThrift,), spec) if not upgrade_applies_to_env: - pytest.mark.skip(reason='test not applicable to env.') - globals()[gen_class_name] = type(gen_class_name, (TestThrift,), spec) + add_skip(cls, 'test not applicable to env.') + globals()[gen_class_name] = cls diff --git a/upgrade_tests/upgrade_base.py b/upgrade_tests/upgrade_base.py index a403835..b831084 100644 --- a/upgrade_tests/upgrade_base.py +++ b/upgrade_tests/upgrade_base.py @@ -70,7 +70,7 @@ class UpgradeTester(Tester, metaclass=ABCMeta): super(UpgradeTester, self).setUp() def prepare(self, ordered=False, create_keyspace=True, use_cache=False, use_thrift=False, - nodes=None, rf=None, protocol_version=None, cl=None, **kwargs): + nodes=None, rf=None, protocol_version=None, cl=None, extra_config_options=None, **kwargs): nodes = self.NODES if nodes is None else nodes rf = self.RF if rf is None else rf @@ -83,6 +83,9 @@ class UpgradeTester(Tester, metaclass=ABCMeta): cluster = self.cluster + cluster.set_install_dir(version=self.UPGRADE_PATH.starting_version) + self.fixture_dtest_setup.reinitialize_cluster_for_different_version() + if ordered: cluster.set_partitioner("org.apache.cassandra.dht.ByteOrderedPartitioner") @@ -98,9 +101,11 @@ class UpgradeTester(Tester, metaclass=ABCMeta): cluster.set_configuration_options(values={'internode_compression': 'none'}) + if extra_config_options: + cluster.set_configuration_options(values=extra_config_options) + cluster.populate(nodes) node1 = cluster.nodelist()[0] - cluster.set_install_dir(version=self.UPGRADE_PATH.starting_version) self.fixture_dtest_setup.enable_for_jolokia = kwargs.pop('jolokia', False) if self.fixture_dtest_setup.enable_for_jolokia: remove_perf_disable_shared_mem(node1) @@ -131,7 +136,8 @@ class UpgradeTester(Tester, metaclass=ABCMeta): node1 = self.cluster.nodelist()[0] node2 = self.cluster.nodelist()[1] - # stop the nodes + # stop the nodes, this can fail due to https://issues.apache.org/jira/browse/CASSANDRA-8220 on MacOS + # for the tests that run against 2.0. You will need to run those in Linux. node1.drain() node1.stop(gently=True) @@ -165,7 +171,7 @@ class UpgradeTester(Tester, metaclass=ABCMeta): node1.set_log_level(logging.getLevelName(logging.root.level)) node1.set_configuration_options(values={'internode_compression': 'none'}) - if use_thrift: + if use_thrift and node1.get_cassandra_version() < '4': node1.set_configuration_options(values={'start_rpc': 'true'}) if self.fixture_dtest_setup.enable_for_jolokia: @@ -245,3 +251,19 @@ class UpgradeTester(Tester, metaclass=ABCMeta): '') ) assert self.UPGRADE_PATH is not None, no_upgrade_path_error + + def upgrade_version_string(self): + """ + Returns a hopefully useful version string that can be compared + to tune test behavior. For trunk this returns trunk, for an earlier + version like github:apache/cassandra-3.11 it returns a version number + as a string + :return: + """ + version_string = self.UPGRADE_PATH.upgrade_version + if version_string.startswith('github'): + version_string = version_string.partition('/')[2] + if "-" in version_string: + version_string = version_string.partition('-')[2] + return version_string + diff --git a/upgrade_tests/upgrade_compact_storage.py b/upgrade_tests/upgrade_compact_storage.py index ed85515..f3fe12a 100644 --- a/upgrade_tests/upgrade_compact_storage.py +++ b/upgrade_tests/upgrade_compact_storage.py @@ -34,6 +34,7 @@ class TestUpgradeSuperColumnsThrough(Tester): node.set_install_dir(version=tag) logger.debug("Set new cassandra dir for %s: %s" % (node.name, node.get_install_dir())) self.cluster.set_install_dir(version=tag) + self.fixture_dtest_setup.reinitialize_cluster_for_different_version() # Restart nodes on new version for node in nodes: @@ -45,6 +46,7 @@ class TestUpgradeSuperColumnsThrough(Tester): # Forcing cluster version on purpose cluster.set_install_dir(version=cassandra_version) + self.fixture_dtest_setup.reinitialize_cluster_for_different_version() cluster.populate(num_nodes) diff --git a/upgrade_tests/upgrade_manifest.py b/upgrade_tests/upgrade_manifest.py index d26fd34..1cd9b2c 100644 --- a/upgrade_tests/upgrade_manifest.py +++ b/upgrade_tests/upgrade_manifest.py @@ -4,22 +4,73 @@ from collections import namedtuple from dtest import RUN_STATIC_UPGRADE_MATRIX +import ccmlib.repository +from ccmlib.common import get_version_from_build + +from enum import Enum + logger = logging.getLogger(__name__) # UpgradePath's contain data about upgrade paths we wish to test # They also contain VersionMeta's for each version the path is testing UpgradePath = namedtuple('UpgradePath', ('name', 'starting_version', 'upgrade_version', 'starting_meta', 'upgrade_meta')) +VERSION_FAMILY = None +CONFIG = None + + +def is_same_family_current_to_indev(origin, destination): + """ + Within a version family it is useful to test that a prior release can upgrade to the indev version + """ + return origin.family == destination.family and origin.variant == "current" and destination.variant == "indev" + + +class VersionSelectionStrategies(Enum): + """ + Test upgrading from indev -> indev, current -> current across versions, and current -> indev within a version + """ + BOTH=(lambda origin, destination: (origin.variant == destination.variant) or is_same_family_current_to_indev(origin,destination)) + """ + Exclusively test in development branches so your bug fixes show up + """ + INDEV=(lambda origin, destination: origin.variant == 'indev' and destination.variant == 'indev' or is_same_family_current_to_indev(origin, destination),) + """ + Test upgrading from releases to the latest release as well as from the current release to the indev tip + within the same version. + """ + RELEASES=(lambda origin, destination: not VersionSelectionStrategies.INDEV.value[0](origin, destination) or is_same_family_current_to_indev(origin, destination),) + + +def set_config(config): + global CONFIG + CONFIG = config + set_version_family() -def _get_version_family(): + +def set_version_family(): """ Detects the version family (line) using dtest.py:CASSANDRA_VERSION_FROM_BUILD """ # todo CASSANDRA-14421 # current_version = CASSANDRA_VERSION_FROM_BUILD - current_version = '4.0' + # There are times when we want to know the C* version we're testing against + # before we call Tester.setUp. In the general case, we can't know that -- the + # test method could use any version it wants for self.cluster. However, we can + # get the version from build.xml in the C* repository specified by + # CASSANDRA_VERSION or CASSANDRA_DIR. This should use the same resolution + # strategy as the actual checkout code in Tester.setUp; if it does not, that is + # a bug. + cassandra_version_slug = CONFIG.getoption("--cassandra-version") + cassandra_dir = CONFIG.getoption("--cassandra-dir") + # Prefer CASSANDRA_VERSION if it's set in the environment. If not, use CASSANDRA_DIR + if cassandra_version_slug: + # fetch but don't build the specified C* version + ccm_repo_cache_dir, _ = ccmlib.repository.setup(cassandra_version_slug) + current_version = get_version_from_build(ccm_repo_cache_dir) + else: + current_version = get_version_from_build(cassandra_dir) - version_family = 'unknown' if current_version.vstring.startswith('2.0'): version_family = '2.0.x' elif current_version.vstring.startswith('2.1'): @@ -36,10 +87,9 @@ def _get_version_family(): # when this occurs, it's time to update this manifest a bit! raise RuntimeError("4.1+ not yet supported on upgrade tests!") - return version_family - - -VERSION_FAMILY = _get_version_family() + global VERSION_FAMILY + VERSION_FAMILY = version_family + logger.info("Setting version family to %s\n" % VERSION_FAMILY) class VersionMeta(namedtuple('_VersionMeta', ('name', 'family', 'variant', 'version', 'min_proto_v', 'max_proto_v', 'java_versions'))): @@ -70,22 +120,19 @@ class VersionMeta(namedtuple('_VersionMeta', ('name', 'family', 'variant', 'vers return self -indev_2_0_x = None # None if release not likely -current_2_0_x = VersionMeta(name='current_2_0_x', family='2.0.x', variant='current', version='2.0.17', min_proto_v=1, max_proto_v=2, java_versions=(7,)) - -indev_2_1_x = VersionMeta(name='indev_2_1_x', family='2.1.x', variant='indev', version='github:apache/cassandra-2.1', min_proto_v=1, max_proto_v=3, java_versions=(7, 8)) -current_2_1_x = VersionMeta(name='current_2_1_x', family='2.1.x', variant='current', version='2.1.17', min_proto_v=1, max_proto_v=3, java_versions=(7, 8)) +indev_2_1_x = VersionMeta(name='indev_2_1_x', family='2.1', variant='indev', version='github:apache/cassandra-2.1', min_proto_v=1, max_proto_v=3, java_versions=(7, 8)) +current_2_1_x = VersionMeta(name='current_2_1_x', family='2.1', variant='current', version='2.1.20', min_proto_v=1, max_proto_v=3, java_versions=(7, 8)) -indev_2_2_x = VersionMeta(name='indev_2_2_x', family='2.2.x', variant='indev', version='github:apache/cassandra-2.2', min_proto_v=1, max_proto_v=4, java_versions=(7, 8)) -current_2_2_x = VersionMeta(name='current_2_2_x', family='2.2.x', variant='current', version='2.2.9', min_proto_v=1, max_proto_v=4, java_versions=(7, 8)) +indev_2_2_x = VersionMeta(name='indev_2_2_x', family='2.2', variant='indev', version='github:apache/cassandra-2.2', min_proto_v=1, max_proto_v=4, java_versions=(7, 8)) +current_2_2_x = VersionMeta(name='current_2_2_x', family='2.2', variant='current', version='2.2.13', min_proto_v=1, max_proto_v=4, java_versions=(7, 8)) -indev_3_0_x = VersionMeta(name='indev_3_0_x', family='3.0.x', variant='indev', version='github:apache/cassandra-3.0', min_proto_v=3, max_proto_v=4, java_versions=(8,)) -current_3_0_x = VersionMeta(name='current_3_0_x', family='3.0.x', variant='current', version='3.0.12', min_proto_v=3, max_proto_v=4, java_versions=(8,)) +indev_3_0_x = VersionMeta(name='indev_3_0_x', family='3.0', variant='indev', version='github:apache/cassandra-3.0', min_proto_v=3, max_proto_v=4, java_versions=(8,)) +current_3_0_x = VersionMeta(name='current_3_0_x', family='3.0', variant='current', version='3.0.17', min_proto_v=3, max_proto_v=4, java_versions=(8,)) -indev_3_x = VersionMeta(name='indev_3_x', family='3.x', variant='indev', version='github:apache/cassandra-3.11', min_proto_v=3, max_proto_v=4, java_versions=(8,)) -current_3_x = VersionMeta(name='current_3_x', family='3.x', variant='current', version='3.10', min_proto_v=3, max_proto_v=4, java_versions=(8,)) +indev_3_11_x = VersionMeta(name='indev_3_11_x', family='3.11', variant='indev', version='github:apache/cassandra-3.11', min_proto_v=3, max_proto_v=4, java_versions=(8,)) +current_3_11_x = VersionMeta(name='current_3_11_x', family='3.11', variant='current', version='github:apache/cassandra-3.11', min_proto_v=3, max_proto_v=4, java_versions=(8,)) -indev_trunk = VersionMeta(name='indev_trunk', family='trunk', variant='indev', version='github:apache/trunk', min_proto_v=3, max_proto_v=4, java_versions=(8,)) +indev_trunk = VersionMeta(name='indev_trunk', family='trunk', variant='indev', version='github:apache/trunk', min_proto_v=4, max_proto_v=5, java_versions=(8,)) # MANIFEST maps a VersionMeta representing a line/variant to a list of other VersionMeta's representing supported upgrades @@ -96,20 +143,17 @@ indev_trunk = VersionMeta(name='indev_trunk', family='trunk', variant='indev', v # 3) Nodes upgraded to version B can read data stored by the predecessor version A, and from a data standpoint will function the same as if they always ran version B. # 4) If a new sstable format is present in version B, writes will occur in that format after upgrade. Running sstableupgrade on version B will proactively convert version A sstables to version B. MANIFEST = { - indev_2_0_x: [indev_2_1_x, current_2_1_x], - current_2_0_x: [indev_2_0_x, indev_2_1_x, current_2_1_x], + indev_2_1_x: [indev_2_2_x, current_2_2_x, indev_3_0_x, current_3_0_x, indev_3_11_x, current_3_11_x], + current_2_1_x: [indev_2_1_x, indev_2_2_x, current_2_2_x, indev_3_0_x, current_3_0_x, indev_3_11_x, current_3_11_x], - indev_2_1_x: [indev_2_2_x, current_2_2_x, indev_3_0_x, current_3_0_x, indev_3_x, current_3_x], - current_2_1_x: [indev_2_1_x, indev_2_2_x, current_2_2_x, indev_3_0_x, current_3_0_x, indev_3_x, current_3_x], + indev_2_2_x: [indev_3_0_x, current_3_0_x, indev_3_11_x, current_3_11_x], + current_2_2_x: [indev_2_2_x, indev_3_0_x, current_3_0_x, indev_3_11_x, current_3_11_x], - indev_2_2_x: [indev_3_0_x, current_3_0_x, indev_3_x, current_3_x], - current_2_2_x: [indev_2_2_x, indev_3_0_x, current_3_0_x, indev_3_x, current_3_x], + indev_3_0_x: [indev_3_11_x, current_3_11_x], + current_3_0_x: [indev_3_0_x, indev_3_11_x, current_3_11_x, indev_trunk], - indev_3_0_x: [indev_3_x, current_3_x], - current_3_0_x: [indev_3_0_x, indev_3_x, current_3_x, indev_trunk], - - current_3_x: [indev_3_x, indev_trunk], - indev_3_x: [indev_trunk] + current_3_11_x: [indev_3_11_x, indev_trunk], + indev_3_11_x: [indev_trunk] } # Local env and custom path testing instructions. Use these steps to REPLACE the normal upgrade test cases with your own. @@ -137,23 +181,6 @@ def _have_common_proto(origin_meta, destination_meta): """ return origin_meta.max_proto_v >= destination_meta.min_proto_v - -def _is_targeted_variant_combo(origin_meta, destination_meta): - """ - Takes two VersionMeta objects, in order of test from start version to next version. - Returns a boolean indicating if this is a test pair we care about. - - for now we only test upgrades of these types: - current -> in-dev (aka: released -> branch) - """ - # if we're overriding the test manifest, we don't want to filter anything out - if OVERRIDE_MANIFEST: - return True - - # is this an upgrade variant combination we care about? - return (origin_meta.variant == 'current' and destination_meta.variant == 'indev') - - def build_upgrade_pairs(): """ Using the manifest (above), builds a set of valid upgrades, according to current testing practices. @@ -163,14 +190,16 @@ def build_upgrade_pairs(): valid_upgrade_pairs = [] manifest = OVERRIDE_MANIFEST or MANIFEST + configured_strategy = CONFIG.getoption("--upgrade-version-selection").upper() + version_select_strategy = VersionSelectionStrategies[configured_strategy].value[0] + for origin_meta, destination_metas in list(manifest.items()): for destination_meta in destination_metas: - if not (origin_meta and destination_meta): # None means we don't care about that version, which means we don't care about iterations involving it either - logger.debug("skipping class creation as a version is undefined (this is normal), versions: {} and {}".format(origin_meta, destination_meta)) + if not version_select_strategy(origin_meta, destination_meta): continue - if not _is_targeted_variant_combo(origin_meta, destination_meta): - logger.debug("skipping class creation, no testing of '{}' to '{}' (for {} upgrade to {})".format(origin_meta.variant, destination_meta.variant, origin_meta.name, destination_meta.name)) + if not (origin_meta and destination_meta): # None means we don't care about that version, which means we don't care about iterations involving it either + logger.debug("skipping class creation as a version is undefined (this is normal), versions: {} and {}".format(origin_meta, destination_meta)) continue if not _have_common_proto(origin_meta, destination_meta): diff --git a/upgrade_tests/upgrade_schema_agreement_test.py b/upgrade_tests/upgrade_schema_agreement_test.py index 13ff744..7b07a81 100644 --- a/upgrade_tests/upgrade_schema_agreement_test.py +++ b/upgrade_tests/upgrade_schema_agreement_test.py @@ -52,6 +52,7 @@ class TestSchemaAgreementUpgrade(Tester): # Forcing cluster version on purpose cluster.set_install_dir(version=version) + self.fixture_dtest_setup.reinitialize_cluster_for_different_version() cluster.populate(num_nodes).start() return cluster @@ -145,6 +146,7 @@ class TestSchemaAgreementUpgrade(Tester): logger.debug("") logger.debug("Upgrading cluster to {}".format(version)) cluster.set_install_dir(version=version) + self.fixture_dtest_setup.reinitialize_cluster_for_different_version() for node in nodes: other_nodes = [n for n in nodes if n != node] diff --git a/upgrade_tests/upgrade_supercolumns_test.py b/upgrade_tests/upgrade_supercolumns_test.py index e216f5c..40ac11c 100644 --- a/upgrade_tests/upgrade_supercolumns_test.py +++ b/upgrade_tests/upgrade_supercolumns_test.py @@ -24,7 +24,7 @@ logger = logging.getLogger(__name__) # The data contained in the SSTables is (name, {'attr': {'name': name}}) for the name in NAMES. SCHEMA_PATH = os.path.join("./", "upgrade_tests", "supercolumn-data", "cassandra-2.0", "schema-2.0.cql") TABLES_PATH = os.path.join("./", "upgrade_tests", "supercolumn-data", "cassandra-2.0", "supcols", "cols") -NAMES = ["Alice", "Bob", "Claire", "Dave", "Ed", "Frank", "Grace"] +NAMES = [name.encode() for name in ["Alice", "Bob", "Claire", "Dave", "Ed", "Frank", "Grace"]] @pytest.mark.upgrade_test @@ -55,6 +55,7 @@ class TestSCUpgrade(Tester): # Forcing cluster version on purpose cluster.set_install_dir(version=cassandra_version) + self.fixture_dtest_setup.reinitialize_cluster_for_different_version() if "memtable_allocation_type" in cluster._config_options: del cluster._config_options['memtable_allocation_type'] cluster.populate(num_nodes).start() @@ -71,7 +72,7 @@ class TestSCUpgrade(Tester): client = get_thrift_client(host, port) client.transport.open() client.set_keyspace('supcols') - p = SlicePredicate(slice_range=SliceRange('', '', False, 1000)) + p = SlicePredicate(slice_range=SliceRange(''.encode(), ''.encode(), False, 1000)) for name in NAMES: super_col_value = client.get_slice(name, ColumnParent("cols"), p, ConsistencyLevel.ONE) logger.debug("get_slice(%s) returned %s" % (name, super_col_value)) @@ -79,7 +80,7 @@ class TestSCUpgrade(Tester): def verify_with_cql(self, session): session.execute("USE supcols") - expected = [[name, 'attr', 'name', name] for name in ['Grace', 'Claire', 'Dave', 'Frank', 'Ed', 'Bob', 'Alice']] + expected = [[name.encode(), 'attr'.encode(), 'name', name.encode()] for name in ['Grace', 'Claire', 'Dave', 'Frank', 'Ed', 'Bob', 'Alice']] assert_all(session, "SELECT * FROM cols", expected) def _upgrade_super_columns_through_versions_test(self, upgrade_path): @@ -123,15 +124,17 @@ class TestSCUpgrade(Tester): self.verify_with_thrift() for version in upgrade_path: + if version == 'git:cassandra-4.0' or version == 'git:trunk': + session.execute("ALTER TABLE supcols.cols DROP COMPACT STORAGE") self.upgrade_to_version(version) - if self.cluster.version() < '4': - node1.nodetool("enablethrift") - session = self.patient_exclusive_cql_connection(node1) self.verify_with_cql(session) - self.verify_with_thrift() + + if self.cluster.version() < '4': + node1.nodetool("enablethrift") + self.verify_with_thrift() cluster.remove(node=node1) @@ -156,11 +159,13 @@ class TestSCUpgrade(Tester): # Update Cassandra Directory for node in nodes: node.set_install_dir(version=tag) + logger.debug("Set new cassandra dir for %s: %s" % (node.name, node.get_install_dir())) + self.cluster.set_install_dir(version=tag) + self.fixture_dtest_setup.reinitialize_cluster_for_different_version() + for node in nodes: if tag < "2.1": if "memtable_allocation_type" in node.config_options: node.config_options.__delitem__("memtable_allocation_type") - logger.debug("Set new cassandra dir for %s: %s" % (node.name, node.get_install_dir())) - self.cluster.set_install_dir(version=tag) # Restart nodes on new version for node in nodes: diff --git a/upgrade_tests/upgrade_through_versions_test.py b/upgrade_tests/upgrade_through_versions_test.py index 397ea15..41bb7ec 100644 --- a/upgrade_tests/upgrade_through_versions_test.py +++ b/upgrade_tests/upgrade_through_versions_test.py @@ -19,9 +19,10 @@ from cassandra.query import SimpleStatement from dtest import RUN_STATIC_UPGRADE_MATRIX, Tester from tools.misc import generate_ssl_stores, new_node from .upgrade_base import switch_jdks -from .upgrade_manifest import (build_upgrade_pairs, current_2_0_x, - current_2_1_x, current_2_2_x, current_3_0_x, - indev_2_2_x, indev_3_x) +from .upgrade_manifest import (build_upgrade_pairs, + current_2_1_x, current_2_2_x, current_3_0_x, + indev_3_11_x, + current_3_11_x, indev_trunk) logger = logging.getLogger(__name__) @@ -67,7 +68,7 @@ def data_writer(tester, to_verify_queue, verification_done_queue, rewrite_probab session.execute(prepared, (val, key)) - to_verify_queue.put_nowait((key, val,)) + to_verify_queue.put((key, val,)) except Exception: logger.debug("Error in data writer process!") to_verify_queue.close() @@ -122,7 +123,7 @@ def data_checker(tester, to_verify_queue, verification_done_queue): # rewrite rows in the same sequence as originally written pass - tester.assertEqual(expected_val, actual_val, "Data did not match expected value!") + assert expected_val == actual_val, "Data did not match expected value!" def counter_incrementer(tester, to_verify_queue, verification_done_queue, rewrite_probability=0): @@ -225,6 +226,7 @@ def counter_checker(tester, to_verify_queue, verification_done_queue): @pytest.mark.upgrade_test @pytest.mark.resource_intensive +@pytest.mark.skip("Fake skip so that this isn't run outside of a generated class that removes this annotation") class TestUpgrade(Tester): """ Upgrades a 3-node Murmur3Partitioner cluster through versions specified in test_version_metas. @@ -247,13 +249,13 @@ class TestUpgrade(Tester): r'Unknown column cdc during deserialization', ) - def setUp(self): + def prepare(self): logger.debug("Upgrade test beginning, setting CASSANDRA_VERSION to {}, and jdk to {}. (Prior values will be restored after test)." .format(self.test_version_metas[0].version, self.test_version_metas[0].java_version)) - os.environ['CASSANDRA_VERSION'] = self.test_version_metas[0].version + cluster = self.cluster + cluster.set_install_dir(version=self.test_version_metas[0].version) switch_jdks(self.test_version_metas[0].java_version) - - super(TestUpgrade, self).setUp() + self.fixture_dtest_setup.reinitialize_cluster_for_different_version() logger.debug("Versions to test (%s): %s" % (type(self), str([v.version for v in self.test_version_metas]))) def init_config(self): @@ -273,6 +275,7 @@ class TestUpgrade(Tester): """ self.upgrade_scenario() + @pytest.mark.timeout(3000) def test_rolling_upgrade(self): """ Test rolling upgrade of the cluster, so we have mixed versions part way through. @@ -285,6 +288,7 @@ class TestUpgrade(Tester): """ self.upgrade_scenario(internode_ssl=True) + @pytest.mark.timeout(3000) def test_rolling_upgrade_with_internode_ssl(self): """ Rolling upgrade test using internode ssl. @@ -293,6 +297,8 @@ class TestUpgrade(Tester): def upgrade_scenario(self, populate=True, create_schema=True, rolling=False, after_upgrade_call=(), internode_ssl=False): # Record the rows we write as we go: + if populate: + self.prepare() self.row_values = set() cluster = self.cluster if cluster.version() >= '3.0': @@ -350,6 +356,7 @@ class TestUpgrade(Tester): (num + 1, len(self.cluster.nodelist()), version_meta.version)) self.cluster.set_install_dir(version=version_meta.version) + self.fixture_dtest_setup.reinitialize_cluster_for_different_version() # Stop write processes write_proc.terminate() @@ -367,6 +374,7 @@ class TestUpgrade(Tester): self.upgrade_to_version(version_meta, internode_ssl=internode_ssl) self.cluster.set_install_dir(version=version_meta.version) + self.fixture_dtest_setup.reinitialize_cluster_for_different_version() self._check_values() self._check_counters() @@ -432,7 +440,7 @@ class TestUpgrade(Tester): for node in nodes: node.set_install_dir(version=version_meta.version) logger.debug("Set new cassandra dir for %s: %s" % (node.name, node.get_install_dir())) - if internode_ssl and version_meta.version >= '4.0': + if internode_ssl and (version_meta.family == 'trunk' or version_meta.family >= '4.0'): node.set_configuration_options({'server_encryption_options': {'enabled': True, 'enable_legacy_ssl_storage_port': True}}) # hacky? yes. We could probably extend ccm to allow this publicly. @@ -553,7 +561,7 @@ class TestUpgrade(Tester): Returns the writer process, verifier process, and the to_verify_queue. """ # queue of writes to be verified - to_verify_queue = Queue() + to_verify_queue = Queue(10000) # queue of verified writes, which are update candidates verification_done_queue = Queue(maxsize=500) @@ -629,7 +637,7 @@ class TestUpgrade(Tester): if fail_count > 100: break - assert fail_count, 100 < "Too many counter increment failures" + assert fail_count < 100, "Too many counter increment failures" def _check_counters(self): logger.debug("Checking counter values...") @@ -668,7 +676,6 @@ class TestUpgrade(Tester): else: self.fail("Count query did not return") - class BootstrapMixin(object): """ Can be mixed into UpgradeTester or a subclass thereof to add bootstrap tests. @@ -705,6 +712,7 @@ class BootstrapMixin(object): def test_bootstrap_multidc(self): # try and add a new node # multi dc, 2 nodes in each dc + self.prepare() cluster = self.cluster if cluster.version() >= '3.0': @@ -773,13 +781,16 @@ def create_upgrade_class(clsname, version_metas, protocol_version, print(" to run these tests alone, use `nosetests {}.py:{}`".format(__name__, clsname)) upgrade_applies_to_env = RUN_STATIC_UPGRADE_MATRIX or version_metas[-1].matches_current_env_version_family - if not upgrade_applies_to_env: - pytest.mark.skip(reason='test not applicable to env.') newcls = type( clsname, parent_classes, {'test_version_metas': version_metas, '__test__': True, 'protocol_version': protocol_version, 'extra_config': extra_config} ) + # Remove the skip annotation in the superclass we just derived from, we will add it back if we actually intend + # to skip with a better message + newcls.pytestmark = [mark for mark in newcls.pytestmark if not mark.name == "skip"] + if not upgrade_applies_to_env: + newcls.pytestmark.append(pytest.mark.skip("test not applicable to env")) if clsname in globals(): raise RuntimeError("Class by name already exists!") @@ -791,41 +802,33 @@ def create_upgrade_class(clsname, version_metas, protocol_version, MultiUpgrade = namedtuple('MultiUpgrade', ('name', 'version_metas', 'protocol_version', 'extra_config')) MULTI_UPGRADES = ( - # Proto v1 upgrades (v1 supported on 2.0, 2.1, 2.2) - MultiUpgrade(name='ProtoV1Upgrade_AllVersions_EndsAt_indev_2_2_x', - version_metas=[current_2_0_x, current_2_1_x, indev_2_2_x], protocol_version=1, extra_config=None), - MultiUpgrade(name='ProtoV1Upgrade_AllVersions_RandomPartitioner_EndsAt_indev_2_2_x', - version_metas=[current_2_0_x, current_2_1_x, indev_2_2_x], protocol_version=1, - extra_config=( - ('partitioner', 'org.apache.cassandra.dht.RandomPartitioner'), - )), - - # Proto v2 upgrades (v2 is supported on 2.0, 2.1, 2.2) - MultiUpgrade(name='ProtoV2Upgrade_AllVersions_EndsAt_indev_2_2_x', - version_metas=[current_2_0_x, current_2_1_x, indev_2_2_x], protocol_version=2, extra_config=None), - MultiUpgrade(name='ProtoV2Upgrade_AllVersions_RandomPartitioner_EndsAt_indev_2_2_x', - version_metas=[current_2_0_x, current_2_1_x, indev_2_2_x], protocol_version=2, - extra_config=( - ('partitioner', 'org.apache.cassandra.dht.RandomPartitioner'), - )), - - # Proto v3 upgrades (v3 is supported on 2.1, 2.2, 3.0, 3.1, trunk) - MultiUpgrade(name='ProtoV3Upgrade_AllVersions_EndsAt_Trunk_HEAD', - version_metas=[current_2_1_x, current_2_2_x, current_3_0_x, indev_3_x], protocol_version=3, extra_config=None), - MultiUpgrade(name='ProtoV3Upgrade_AllVersions_RandomPartitioner_EndsAt_Trunk_HEAD', - version_metas=[current_2_1_x, current_2_2_x, current_3_0_x, indev_3_x], protocol_version=3, + # Proto v3 upgrades (v3 is supported on 2.1, 2.2, 3.0, 3.11) + MultiUpgrade(name='TestProtoV3Upgrade_AllVersions_EndsAt_3_11_X', + version_metas=[current_2_1_x, current_2_2_x, current_3_0_x, indev_3_11_x], protocol_version=3, extra_config=None), + MultiUpgrade(name='TestProtoV3Upgrade_AllVersions_RandomPartitioner_EndsAt_3_11_X_HEAD', + version_metas=[current_2_1_x, current_2_2_x, current_3_0_x, indev_3_11_x], protocol_version=3, extra_config=( ('partitioner', 'org.apache.cassandra.dht.RandomPartitioner'), )), # Proto v4 upgrades (v4 is supported on 2.2, 3.0, 3.1, trunk) - MultiUpgrade(name='ProtoV4Upgrade_AllVersions_EndsAt_Trunk_HEAD', - version_metas=[current_2_2_x, current_3_0_x, indev_3_x], protocol_version=4, extra_config=None), - MultiUpgrade(name='ProtoV4Upgrade_AllVersions_RandomPartitioner_EndsAt_Trunk_HEAD', - version_metas=[current_2_2_x, current_3_0_x, indev_3_x], protocol_version=4, + MultiUpgrade(name='TestProtoV4Upgrade_AllVersions_EndsAt_Trunk_HEAD', + version_metas=[current_2_2_x, current_3_0_x, current_3_11_x, indev_trunk], protocol_version=4, extra_config=None), + MultiUpgrade(name='TestProtoV4Upgrade_AllVersions_RandomPartitioner_EndsAt_Trunk_HEAD', + version_metas=[current_2_2_x, current_3_0_x, current_3_11_x, indev_trunk], protocol_version=4, extra_config=( ('partitioner', 'org.apache.cassandra.dht.RandomPartitioner'), )), + #Beta versions don't work with this test since it doesn't specify use beta in the client + #It's fine I guess for now? Can update on release + # Proto v5 upgrades (v5 is supported on 3.0, 3.11, trunk) + # MultiUpgrade(name='TestProtoV5Upgrade_AllVersions_EndsAt_Trunk_HEAD', + # version_metas=[current_3_0_x, current_3_x, indev_trunk], protocol_version=5, extra_config=None), + # MultiUpgrade(name='TestProtoV5Upgrade_AllVersions_RandomPartitioner_EndsAt_Trunk_HEAD', + # version_metas=[current_3_0_x, current_3_x, indev_trunk], protocol_version=5, + # extra_config=( + # ('partitioner', 'org.apache.cassandra.dht.RandomPartitioner'), + # )), ) for upgrade in MULTI_UPGRADES: --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org