This is an automated email from the ASF dual-hosted git repository. slebresne pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/cassandra-dtest.git
The following commit(s) were added to refs/heads/master by this push: new f25832a Add test for the upgrade of KEYS 2i to 4.0 (CASSANDRA-15906) f25832a is described below commit f25832aef72d62e9f00a45c54b2cdc17dc9f2d7b Author: Sylvain Lebresne <lebre...@gmail.com> AuthorDate: Thu Jun 25 17:52:14 2020 +0200 Add test for the upgrade of KEYS 2i to 4.0 (CASSANDRA-15906) --- upgrade_tests/thrift_upgrade_test.py | 195 ++++++++++++++++++++++++++++++++++- 1 file changed, 190 insertions(+), 5 deletions(-) diff --git a/upgrade_tests/thrift_upgrade_test.py b/upgrade_tests/thrift_upgrade_test.py index b427660..8846625 100644 --- a/upgrade_tests/thrift_upgrade_test.py +++ b/upgrade_tests/thrift_upgrade_test.py @@ -2,15 +2,17 @@ import itertools import pytest import logging -from cassandra.query import dict_factory +from cassandra.query import dict_factory, SimpleStatement from dtest import RUN_STATIC_UPGRADE_MATRIX, Tester from thrift_bindings.thrift010 import Cassandra from thrift_bindings.thrift010.Cassandra import (Column, ColumnDef, - ColumnParent, ConsistencyLevel, - SlicePredicate, SliceRange) + ColumnParent, ConsistencyLevel, + IndexType, + SlicePredicate, SliceRange) from thrift_test import _i64, get_thrift_client -from tools.assertions import assert_length_equal, assert_lists_of_dicts_equal +from tools.assertions import (assert_all, assert_length_equal, + assert_lists_of_dicts_equal) from tools.misc import wait_for_agreement, add_skip from .upgrade_base import UpgradeTester from .upgrade_manifest import build_upgrade_pairs @@ -187,7 +189,6 @@ def _validate_dense_thrift(client, cf='dense_super_1'): assert cosc.super_column.columns[0].name == _i64(100) assert cosc.super_column.columns[0].value == 'value1'.encode() - @pytest.mark.upgrade_test class TestUpgradeSuperColumnsThrough(Tester): def upgrade_to_version(self, tag, nodes=None): @@ -373,6 +374,190 @@ class TestUpgradeSuperColumnsThrough(Tester): @pytest.mark.upgrade_test +@since('4') +class TestUpgradeTo40(Tester): + """ + Thrift is dead in 4.0. However, we still want to ensure users that used thrift + in 3.0 or earlier have an upgrade path to 4.0 and this class provides tests + cases for this. + + Note that we don't want to run this if the "current" version (the one we're + upgrading to) is not 4.0 or more, as the tests makes assumptions on that. + """ + def prepare(self, start_version, num_nodes=1, rf=1): + """ + Prepare the test, starting a cluster on the initial version, creating + a keyspace (named 'ks') and returning a CQL and a thrift connection to + the first node (and set on the created keyspace). + + :param start_version: the version to set the node at initially. + :param num_nodes: the number of nodes to use. + :param rf: replication factor for the keyspace created. + :return: a pair (cql, thrift) of a CQL connection and an open thrift + connection to the first node in the cluster. + """ + self.cluster.set_install_dir(version=start_version) + self.fixture_dtest_setup.reinitialize_cluster_for_different_version() + + self.cluster.populate(num_nodes) + for node in self.cluster.nodelist(): + node.set_configuration_options(values={'start_rpc': 'true'}) + + self.cluster.start() + logger.debug("Started node on %s", start_version) + + node = self.cluster.nodelist()[0] + cql = self.patient_cql_connection(node) + + cql.execute("CREATE KEYSPACE ks WITH replication = {{ 'class': 'SimpleStrategy', 'replication_factor': '{}' }}".format(rf)) + cql.execute("USE ks") + + host, port = node.network_interfaces['thrift'] + thrift = get_thrift_client(host, port) + thrift.transport.open() + thrift.set_keyspace('ks') + return cql, thrift + + def _connect(self, node): + connection = self.patient_cql_connection(node) + connection.execute("USE ks") + return connection + + def _do_rolling_upgrade(self, after_each_upgrade=None): + """ + Upgrade all the nodes in the cluster to the "current" version (so 4.0+) + in a rolling fashion. + + :param after_each_upgrade: if not None, a function that is called with 2 + arguments (in that order): + - the index of the node we just upgraded. + - a CQL connection to that node we just upgraded + :return: a CQL connection to the first node in the cluster (now upgraded). + """ + for idx, node in enumerate(self.cluster.nodelist(), start=1): + self.set_node_to_current_version(node) + upgraded_version = node.get_cassandra_version() + logger.debug("Upgrading node %i (%s) to %s", idx, node.address(), upgraded_version) + node.stop() + node.start() + if after_each_upgrade: + after_each_upgrade(idx, self._connect(node)) + + return self._connect(self.cluster.nodelist()[0]) + + def test_keys_index_3_0_created(self): + self.test_keys_index_3_x_created('github:apache/cassandra-3.0') + + def test_keys_index_3_11_created(self): + self.test_keys_index_3_x_created('github:apache/cassandra-3.11') + + def test_keys_index_3_x_created(self, from_version): + cql, thrift = self.prepare(start_version=from_version, + num_nodes=3, + rf=3) + + # Create a table with a KEYS index. This can only be done from thrift. + logger.debug("Creating table with index from thrift") + + indexed_column = ColumnDef('c1'.encode(), 'UTF8Type', IndexType.KEYS, 'idx') + other_column = ColumnDef('c2'.encode(), 'UTF8Type', None, None) + table_def = Cassandra.CfDef( + 'ks', + 'ti', + key_validation_class='UTF8Type', + comparator_type='UTF8Type', + default_validation_class='UTF8Type', + column_metadata=[indexed_column, other_column], + ) + thrift.system_add_column_family(table_def) + logger.debug("Waiting for schema agreement") + wait_for_agreement(thrift) + + # We're going to insert and delete some rows, and need to validate the + # indexed entries are what we expect. To make this easier, we define + # _insert and _delete methods that not only insert/delete the provided + # rows, but also keep track of all the entries whose 'c1 == v1' in + # `expected_entries`, as that is the index value we'll use for + # validation. + + expected_entries = [] + + def _insert(connection, r): + logger.debug("Inserting %s", r) + q = "INSERT INTO ti(key, c1, c2) VALUES ('{}', '{}', '{}')".format(r[0], r[1], r[2]) + connection.execute(SimpleStatement(q, consistency_level=ConsistencyLevel.QUORUM)) + if r[1] == 'v1': + expected_entries.append(r) + + def _delete(connection, r): + logger.debug("Deleting %s", r) + q = "DELETE FROM ti WHERE key='{}'".format(r[0]) + connection.execute(SimpleStatement(q, consistency_level=ConsistencyLevel.QUORUM)) + if r[1] == 'v1': + expected_entries.remove(r) + + def _validate_entries(connection): + logger.debug("Expecting entries %s", expected_entries) + assert_all(connection, "SELECT key, c2 FROM ti WHERE c1='v1'", + [[key, c2] for [key, _, c2] in expected_entries], + ignore_order=True, cl=ConsistencyLevel.QUORUM) + + to_insert = [ + ['k0', 'v1', 'goo'], + ['k1', 'v1', 'foo'], + ['k2', 'v2', 'bar'], + ['k3', 'v1', 'baz'], + ['k4', 'v3', 'oof'], + ['k5', 'v0', 'zab'], + ] + for row in to_insert: + _insert(cql, row) + + # Sanity check that we can query the index properly + logger.debug("Checking index before upgrade") + _validate_entries(cql) + + # Delete one entry, so we test upgrade with a tombstone in + _delete(cql, to_insert[1]) + _validate_entries(cql) + + # Before upgrading, we need to DROP COMPACT first, or this won't work. + cql.execute("ALTER TABLE ti DROP COMPACT STORAGE") + + # Let's make sure our DROP COMPACT STORAGE didn't break our index even + # before upgrade. + _validate_entries(cql) + + # At every step, we'll add a few entries and ensure we can query the + # index. Specifically, each node will add 4 keys, 2 indexed, 2 non + # indexed, query from all nodes, then remove one of the indexed entry + # and query again. + def _after_upgrade(idx, client): + logger.debug("Checking index after upgrade of node %i", idx) + + added = [] + for i in range(4): + key = 'k{}{}'.format(idx, i) + c1 = 'v1' if i % 2 == 0 else 'v2' + c2 = 'val{}{}'.format(idx, i) + to_add = [key, c1, c2] + _insert(client, to_add) + added.append(to_add) + + # Test querying from every node, so we hit both upgraded and + # non-upgraded in general + for idx, node in enumerate(self.cluster.nodelist(), start=1): + _validate_entries(self._connect(node)) + + _delete(client, added[0]) + + for idx, node in enumerate(self.cluster.nodelist(), start=1): + _validate_entries(self._connect(node)) + + self._do_rolling_upgrade(_after_upgrade) + + +@pytest.mark.upgrade_test @since('2.1', max_version='3.99') class TestThrift(UpgradeTester): """ --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org