http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/scrub_test.py ---------------------------------------------------------------------- diff --git a/scrub_test.py b/scrub_test.py index 4f9d22b..2df11c7 100644 --- a/scrub_test.py +++ b/scrub_test.py @@ -4,28 +4,33 @@ import re import subprocess import time import uuid - +import pytest import parse +import logging + from ccmlib import common -from dtest import Tester, debug, create_ks, create_cf +from dtest import Tester, create_ks, create_cf from tools.assertions import assert_length_equal, assert_stderr_clean -from tools.decorators import since + +since = pytest.mark.since +logger = logging.getLogger(__name__) KEYSPACE = 'ks' class TestHelper(Tester): - def setUp(self): + @pytest.fixture(scope='function', autouse=True) + def fixture_set_cluster_settings(self, fixture_dtest_setup): """ disable JBOD configuration for scrub tests. range-aware JBOD can skip generation in SSTable, and some tests rely on generation numbers/ (see CASSANDRA-11693 and increase_sstable_generations) """ - super(TestHelper, self).setUp() - self.cluster.set_datadir_count(1) + fixture_dtest_setup.cluster.set_datadir_count(1) + def get_table_paths(self, table): """ @@ -56,13 +61,13 @@ class TestHelper(Tester): Return the sstable files at a specific location """ ret = [] - debug('Checking sstables in {}'.format(paths)) + logger.debug('Checking sstables in {}'.format(paths)) for ext in ('*.db', '*.txt', '*.adler32', '*.sha1'): for path in paths: for fname in glob.glob(os.path.join(path, ext)): bname = os.path.basename(fname) - debug('Found sstable file {}'.format(bname)) + logger.debug('Found sstable file {}'.format(bname)) ret.append(bname) return ret @@ -77,7 +82,7 @@ class TestHelper(Tester): for path in paths: fullname = os.path.join(path, fname) if (os.path.exists(fullname)): - debug('Deleting {}'.format(fullname)) + logger.debug('Deleting {}'.format(fullname)) os.remove(fullname) def get_sstables(self, table, indexes): @@ -86,12 +91,12 @@ class TestHelper(Tester): """ sstables = {} table_sstables = self.get_sstable_files(self.get_table_paths(table)) - self.assertGreater(len(table_sstables), 0) + assert len(table_sstables) > 0 sstables[table] = sorted(table_sstables) for index in indexes: index_sstables = self.get_sstable_files(self.get_index_paths(table, index)) - self.assertGreater(len(index_sstables), 0) + assert len(index_sstables) > 0 sstables[index] = sorted('{}/{}'.format(index, sstable) for sstable in index_sstables) return sstables @@ -112,16 +117,16 @@ class TestHelper(Tester): node1 = self.cluster.nodelist()[0] env = common.make_cassandra_env(node1.get_install_cassandra_root(), node1.get_node_cassandra_root()) scrub_bin = node1.get_tool('sstablescrub') - debug(scrub_bin) + logger.debug(scrub_bin) args = [scrub_bin, ks, cf] p = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE) out, err = p.communicate() - debug(out) + logger.debug(out.decode("utf-8")) # if we have less than 64G free space, we get this warning - ignore it - if err and "Consider adding more capacity" not in err: - debug(err) - assert_stderr_clean(err) + if err and "Consider adding more capacity" not in err.decode("utf-8"): + logger.debug(err.decode("utf-8")) + assert_stderr_clean(err.decode("utf-8")) def perform_node_tool_cmd(self, cmd, table, indexes): """ @@ -169,11 +174,11 @@ class TestHelper(Tester): After finding the number of existing sstables, increase all of the generations by that amount. """ - for table_or_index, table_sstables in sstables.items(): + for table_or_index, table_sstables in list(sstables.items()): increment_by = len(set(parse.search('{}-{increment_by}-{suffix}.{file_extention}', s).named['increment_by'] for s in table_sstables)) sstables[table_or_index] = [self.increment_generation_by(s, increment_by) for s in table_sstables] - debug('sstables after increment {}'.format(str(sstables))) + logger.debug('sstables after increment {}'.format(str(sstables))) @since('2.2') @@ -228,18 +233,18 @@ class TestScrubIndexes(TestHelper): scrubbed_sstables = self.scrub('users', 'gender_idx', 'state_idx', 'birth_year_idx') self.increase_sstable_generations(initial_sstables) - self.assertEqual(initial_sstables, scrubbed_sstables) + assert initial_sstables == scrubbed_sstables users = self.query_users(session) - self.assertEqual(initial_users, users) + assert initial_users == users # Scrub and check sstables and data again scrubbed_sstables = self.scrub('users', 'gender_idx', 'state_idx', 'birth_year_idx') self.increase_sstable_generations(initial_sstables) - self.assertEqual(initial_sstables, scrubbed_sstables) + assert initial_sstables == scrubbed_sstables users = self.query_users(session) - self.assertEqual(initial_users, users) + assert initial_users == users # Restart and check data again cluster.stop() @@ -249,7 +254,7 @@ class TestScrubIndexes(TestHelper): session.execute('USE {}'.format(KEYSPACE)) users = self.query_users(session) - self.assertEqual(initial_users, users) + assert initial_users == users def test_standalone_scrub(self): cluster = self.cluster @@ -269,14 +274,14 @@ class TestScrubIndexes(TestHelper): scrubbed_sstables = self.standalonescrub('users', 'gender_idx', 'state_idx', 'birth_year_idx') self.increase_sstable_generations(initial_sstables) - self.assertEqual(initial_sstables, scrubbed_sstables) + assert initial_sstables == scrubbed_sstables cluster.start() session = self.patient_cql_connection(node1) session.execute('USE {}'.format(KEYSPACE)) users = self.query_users(session) - self.assertEqual(initial_users, users) + assert initial_users == users def test_scrub_collections_table(self): cluster = self.cluster @@ -297,25 +302,25 @@ class TestScrubIndexes(TestHelper): session.execute(("UPDATE users set uuids = [{id}] where user_id = {user_id}").format(id=_id, user_id=user_uuid)) initial_users = list(session.execute(("SELECT * from users where uuids contains {some_uuid}").format(some_uuid=_id))) - self.assertEqual(num_users, len(initial_users)) + assert num_users == len(initial_users) initial_sstables = self.flush('users', 'user_uuids_idx') scrubbed_sstables = self.scrub('users', 'user_uuids_idx') self.increase_sstable_generations(initial_sstables) - self.assertEqual(initial_sstables, scrubbed_sstables) + assert initial_sstables == scrubbed_sstables users = list(session.execute(("SELECT * from users where uuids contains {some_uuid}").format(some_uuid=_id))) - self.assertEqual(initial_users, users) + assert initial_users == users scrubbed_sstables = self.scrub('users', 'user_uuids_idx') self.increase_sstable_generations(initial_sstables) - self.assertEqual(initial_sstables, scrubbed_sstables) + assert initial_sstables == scrubbed_sstables users = list(session.execute(("SELECT * from users where uuids contains {some_uuid}").format(some_uuid=_id))) - self.assertListEqual(initial_users, users) + assert initial_users == users class TestScrub(TestHelper): @@ -365,18 +370,18 @@ class TestScrub(TestHelper): scrubbed_sstables = self.scrub('users') self.increase_sstable_generations(initial_sstables) - self.assertEqual(initial_sstables, scrubbed_sstables) + assert initial_sstables == scrubbed_sstables users = self.query_users(session) - self.assertEqual(initial_users, users) + assert initial_users == users # Scrub and check sstables and data again scrubbed_sstables = self.scrub('users') self.increase_sstable_generations(initial_sstables) - self.assertEqual(initial_sstables, scrubbed_sstables) + assert initial_sstables == scrubbed_sstables users = self.query_users(session) - self.assertEqual(initial_users, users) + assert initial_users == users # Restart and check data again cluster.stop() @@ -386,7 +391,7 @@ class TestScrub(TestHelper): session.execute('USE {}'.format(KEYSPACE)) users = self.query_users(session) - self.assertEqual(initial_users, users) + assert initial_users == users def test_standalone_scrub(self): cluster = self.cluster @@ -406,14 +411,14 @@ class TestScrub(TestHelper): scrubbed_sstables = self.standalonescrub('users') self.increase_sstable_generations(initial_sstables) - self.assertEqual(initial_sstables, scrubbed_sstables) + assert initial_sstables == scrubbed_sstables cluster.start() session = self.patient_cql_connection(node1) session.execute('USE {}'.format(KEYSPACE)) users = self.query_users(session) - self.assertEqual(initial_users, users) + assert initial_users == users def test_standalone_scrub_essential_files_only(self): cluster = self.cluster @@ -435,14 +440,14 @@ class TestScrub(TestHelper): scrubbed_sstables = self.standalonescrub('users') self.increase_sstable_generations(initial_sstables) - self.assertEqual(initial_sstables, scrubbed_sstables) + assert initial_sstables == scrubbed_sstables cluster.start() session = self.patient_cql_connection(node1) session.execute('USE {}'.format(KEYSPACE)) users = self.query_users(session) - self.assertEqual(initial_users, users) + assert initial_users == users def test_scrub_with_UDT(self): """ @@ -460,4 +465,4 @@ class TestScrub(TestHelper): node1.nodetool("scrub") time.sleep(2) match = node1.grep_log("org.apache.cassandra.serializers.MarshalException: Not enough bytes to read a set") - self.assertEqual(len(match), 0) + assert len(match) == 0
http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/secondary_indexes_test.py ---------------------------------------------------------------------- diff --git a/secondary_indexes_test.py b/secondary_indexes_test.py index 9cb6cc0..9b0f326 100644 --- a/secondary_indexes_test.py +++ b/secondary_indexes_test.py @@ -3,7 +3,10 @@ import random import re import time import uuid -from unittest import skipIf +import pytest +import logging + +from flaky import flaky from cassandra import InvalidRequest from cassandra.concurrent import (execute_concurrent, @@ -11,12 +14,13 @@ from cassandra.concurrent import (execute_concurrent, from cassandra.protocol import ConfigurationException from cassandra.query import BatchStatement, SimpleStatement -from dtest import (DISABLE_VNODES, OFFHEAP_MEMTABLES, Tester, debug, CASSANDRA_VERSION_FROM_BUILD, create_ks, create_cf) +from dtest import Tester, create_ks, create_cf from tools.assertions import assert_bootstrap_state, assert_invalid, assert_none, assert_one, assert_row_count from tools.data import block_until_index_is_built, rows_to_list -from tools.decorators import since from tools.misc import new_node +since = pytest.mark.since +logger = logging.getLogger(__name__) class TestSecondaryIndexes(Tester): @@ -30,7 +34,7 @@ class TestSecondaryIndexes(Tester): files.extend(os.listdir(index_sstables_dir)) return set(files) - def data_created_before_index_not_returned_in_where_query_test(self): + def test_data_created_before_index_not_returned_in_where_query(self): """ @jira_ticket CASSANDRA-3367 """ @@ -79,9 +83,10 @@ class TestSecondaryIndexes(Tester): session.execute("CREATE INDEX b_index ON ks.cf (b);") num_rows = 100 for i in range(num_rows): - indexed_value = i % (num_rows / 3) + indexed_value = i % (num_rows // 3) # use the same indexed value three times - session.execute("INSERT INTO ks.cf (a, b) VALUES ('%d', '%d');" % (i, indexed_value)) + session.execute("INSERT INTO ks.cf (a, b) VALUES ('{a}', '{b}');" + .format(a=i, b=indexed_value)) cluster.flush() @@ -99,31 +104,32 @@ class TestSecondaryIndexes(Tester): if match: concurrency = int(match.group(1)) expected_per_range = float(match.group(2)) - self.assertTrue(concurrency > 1, "Expected more than 1 concurrent range request, got %d" % concurrency) - self.assertTrue(expected_per_range > 0) + assert concurrency > 1, "Expected more than 1 concurrent range request, got %d" % concurrency + assert expected_per_range > 0 break else: self.fail("Didn't find matching trace event") query = SimpleStatement("SELECT * FROM ks.cf WHERE b='1';") result = session.execute(query, trace=True) - self.assertEqual(3, len(list(result))) + assert 3 == len(list(result)) check_trace_events(result.get_query_trace()) query = SimpleStatement("SELECT * FROM ks.cf WHERE b='1' LIMIT 100;") result = session.execute(query, trace=True) - self.assertEqual(3, len(list(result))) + assert 3 == len(list(result)) check_trace_events(result.get_query_trace()) query = SimpleStatement("SELECT * FROM ks.cf WHERE b='1' LIMIT 3;") result = session.execute(query, trace=True) - self.assertEqual(3, len(list(result))) + assert 3 == len(list(result)) check_trace_events(result.get_query_trace()) for limit in (1, 2): result = list(session.execute("SELECT * FROM ks.cf WHERE b='1' LIMIT %d;" % (limit,))) - self.assertEqual(limit, len(result)) + assert limit == len(result) + @flaky(3) def test_6924_dropping_ks(self): """ @jira_ticket CASSANDRA-6924 @@ -152,7 +158,7 @@ class TestSecondaryIndexes(Tester): # This only occurs when dropping and recreating with # the same name, so loop through this test a few times: for i in range(10): - debug("round %s" % i) + logger.debug("round %s" % i) try: session.execute("DROP KEYSPACE ks") except ConfigurationException: @@ -170,8 +176,9 @@ class TestSecondaryIndexes(Tester): rows = session.execute("select count(*) from ks.cf WHERE col1='asdf'") count = rows[0][0] - self.assertEqual(count, 10) + assert count == 10 + @flaky def test_6924_dropping_cf(self): """ @jira_ticket CASSANDRA-6924 @@ -190,7 +197,7 @@ class TestSecondaryIndexes(Tester): # This only occurs when dropping and recreating with # the same name, so loop through this test a few times: for i in range(10): - debug("round %s" % i) + logger.debug("round %s" % i) try: session.execute("DROP COLUMNFAMILY ks.cf") except InvalidRequest: @@ -207,7 +214,7 @@ class TestSecondaryIndexes(Tester): rows = session.execute("select count(*) from ks.cf WHERE col1='asdf'") count = rows[0][0] - self.assertEqual(count, 10) + assert count == 10 def test_8280_validate_indexed_values(self): """ @@ -282,21 +289,8 @@ class TestSecondaryIndexes(Tester): pass def wait_for_schema_agreement(self, session): - rows = list(session.execute("SELECT schema_version FROM system.local")) - local_version = rows[0] - - all_match = True - rows = list(session.execute("SELECT schema_version FROM system.peers")) - for peer_version in rows: - if peer_version != local_version: - all_match = False - break - - if all_match: - return - else: - time.sleep(1) - self.wait_for_schema_agreement(session) + if not session.cluster.control_connection.wait_for_schema_agreement(wait_time=120): + raise AssertionError("Failed to reach schema agreement") @since('3.0') def test_manual_rebuild_index(self): @@ -316,18 +310,18 @@ class TestSecondaryIndexes(Tester): block_until_index_is_built(node1, session, 'keyspace1', 'standard1', 'ix_c0') stmt = session.prepare('select * from standard1 where "C0" = ?') - self.assertEqual(1, len(list(session.execute(stmt, [lookup_value])))) + assert 1 == len(list(session.execute(stmt, [lookup_value]))) before_files = self._index_sstables_files(node1, 'keyspace1', 'standard1', 'ix_c0') node1.nodetool("rebuild_index keyspace1 standard1 ix_c0") block_until_index_is_built(node1, session, 'keyspace1', 'standard1', 'ix_c0') after_files = self._index_sstables_files(node1, 'keyspace1', 'standard1', 'ix_c0') - self.assertNotEqual(before_files, after_files) - self.assertEqual(1, len(list(session.execute(stmt, [lookup_value])))) + assert before_files != after_files + assert 1 == len(list(session.execute(stmt, [lookup_value]))) # verify that only the expected row is present in the build indexes table - self.assertEqual(1, len(list(session.execute("""SELECT * FROM system."IndexInfo";""")))) + assert 1 == len(list(session.execute("""SELECT * FROM system."IndexInfo";"""))) @since('4.0') def test_failing_manual_rebuild_index(self): @@ -355,12 +349,12 @@ class TestSecondaryIndexes(Tester): # Simulate a failing index rebuild before_files = self._index_sstables_files(node, 'k', 't', 'idx') node.byteman_submit(['./byteman/index_build_failure.btm']) - with self.assertRaises(Exception): + with pytest.raises(Exception): node.nodetool("rebuild_index k t idx") after_files = self._index_sstables_files(node, 'k', 't', 'idx') # Verify that the index is not rebuilt, not marked as built, and it still can answer queries - self.assertEqual(before_files, after_files) + assert before_files == after_files assert_none(session, """SELECT * FROM system."IndexInfo" WHERE table_name='k'""") assert_one(session, "SELECT * FROM k.t WHERE v = 1", [0, 1]) @@ -374,19 +368,19 @@ class TestSecondaryIndexes(Tester): after_files = self._index_sstables_files(node, 'k', 't', 'idx') # Verify that, the index is rebuilt, marked as built, and it can answer queries - self.assertNotEqual(before_files, after_files) + assert before_files != after_files assert_one(session, """SELECT table_name, index_name FROM system."IndexInfo" WHERE table_name='k'""", ['k', 'idx']) assert_one(session, "SELECT * FROM k.t WHERE v = 1", [0, 1]) # Simulate another failing index rebuild before_files = self._index_sstables_files(node, 'k', 't', 'idx') node.byteman_submit(['./byteman/index_build_failure.btm']) - with self.assertRaises(Exception): + with pytest.raises(Exception): node.nodetool("rebuild_index k t idx") after_files = self._index_sstables_files(node, 'k', 't', 'idx') # Verify that the index is not rebuilt, not marked as built, and it still can answer queries - self.assertEqual(before_files, after_files) + assert before_files == after_files assert_none(session, """SELECT * FROM system."IndexInfo" WHERE table_name='k'""") assert_one(session, "SELECT * FROM k.t WHERE v = 1", [0, 1]) @@ -397,7 +391,7 @@ class TestSecondaryIndexes(Tester): after_files = self._index_sstables_files(node, 'k', 't', 'idx') # Verify that the index is rebuilt, marked as built, and it can answer queries - self.assertNotEqual(before_files, after_files) + assert before_files != after_files assert_one(session, """SELECT table_name, index_name FROM system."IndexInfo" WHERE table_name='k'""", ['k', 'idx']) assert_one(session, "SELECT * FROM k.t WHERE v = 1", [0, 1]) @@ -456,22 +450,22 @@ class TestSecondaryIndexes(Tester): session.execute("CREATE TABLE k.t (k int PRIMARY KEY, v int)") session.execute("INSERT INTO k.t(k, v) VALUES (0, 1)") - debug("Create the index") + logger.debug("Create the index") session.execute("CREATE INDEX idx ON k.t(v)") block_until_index_is_built(node, session, 'k', 't', 'idx') before_files = self._index_sstables_files(node, 'k', 't', 'idx') - debug("Verify the index is marked as built and it can be queried") + logger.debug("Verify the index is marked as built and it can be queried") assert_one(session, """SELECT table_name, index_name FROM system."IndexInfo" WHERE table_name='k'""", ['k', 'idx']) assert_one(session, "SELECT * FROM k.t WHERE v = 1", [0, 1]) - debug("Restart the node and verify the index build is not submitted") + logger.debug("Restart the node and verify the index build is not submitted") node.stop() node.start(wait_for_binary_proto=True) after_files = self._index_sstables_files(node, 'k', 't', 'idx') - self.assertEqual(before_files, after_files) + assert before_files == after_files - debug("Verify the index is still marked as built and it can be queried") + logger.debug("Verify the index is still marked as built and it can be queried") session = self.patient_cql_connection(node) assert_one(session, """SELECT table_name, index_name FROM system."IndexInfo" WHERE table_name='k'""", ['k', 'idx']) assert_one(session, "SELECT * FROM k.t WHERE v = 1", [0, 1]) @@ -496,7 +490,7 @@ class TestSecondaryIndexes(Tester): session.execute("INSERT INTO tbl (id, c0, c1, c2) values (uuid(), 'a', 'e', 'f');") rows = list(session.execute("SELECT * FROM tbl WHERE c0 = 'a';")) - self.assertEqual(4, len(rows)) + assert 4 == len(rows) stmt = "SELECT * FROM tbl WHERE c0 = 'a' AND c1 = 'b';" assert_invalid(session, stmt, "Cannot execute this query as it might involve data filtering and thus may have " @@ -504,7 +498,7 @@ class TestSecondaryIndexes(Tester): "performance unpredictability, use ALLOW FILTERING") rows = list(session.execute("SELECT * FROM tbl WHERE c0 = 'a' AND c1 = 'b' ALLOW FILTERING;")) - self.assertEqual(2, len(rows)) + assert 2 == len(rows) @since('3.0') def test_only_coordinator_chooses_index_for_query(self): @@ -523,7 +517,7 @@ class TestSecondaryIndexes(Tester): session.execute("CREATE INDEX b_index ON ks.cf (b);") num_rows = 100 for i in range(num_rows): - indexed_value = i % (num_rows / 3) + indexed_value = i % (num_rows // 3) # use the same indexed value three times session.execute("INSERT INTO ks.cf (a, b) VALUES ('{a}', '{b}');" .format(a=i, b=indexed_value)) @@ -560,7 +554,7 @@ class TestSecondaryIndexes(Tester): actual=match_counts[event_source], all=match_counts)) def retry_on_failure(trace, regex, expected_matches, match_counts, event_source, min_expected, max_expected): - debug("Trace event inspection did not match expected, sleeping before re-fetching trace events. " + logger.debug("Trace event inspection did not match expected, sleeping before re-fetching trace events. " "Expected: {expected} Actual: {actual}".format(expected=expected_matches, actual=match_counts)) time.sleep(2) trace.populate(max_wait=2.0) @@ -568,7 +562,7 @@ class TestSecondaryIndexes(Tester): query = SimpleStatement("SELECT * FROM ks.cf WHERE b='1';") result = session.execute(query, trace=True) - self.assertEqual(3, len(list(result))) + assert 3 == len(list(result)) trace = result.get_query_trace() @@ -586,7 +580,7 @@ class TestSecondaryIndexes(Tester): [("127.0.0.1", 1, 200), ("127.0.0.2", 1, 200), ("127.0.0.3", 1, 200)], retry_on_failure) - @skipIf(DISABLE_VNODES, "Test should only run with vnodes") + @pytest.mark.vnodes def test_query_indexes_with_vnodes(self): """ Verifies correct query behaviour in the presence of vnodes @@ -597,7 +591,7 @@ class TestSecondaryIndexes(Tester): node1, node2 = cluster.nodelist() session = self.patient_cql_connection(node1) session.execute("CREATE KEYSPACE ks WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': '1'};") - session.execute("CREATE TABLE ks.compact_table (a int PRIMARY KEY, b int) WITH COMPACT STORAGE;") + session.execute("CREATE TABLE ks.compact_table (a int PRIMARY KEY, b int);") session.execute("CREATE INDEX keys_index ON ks.compact_table (b);") session.execute("CREATE TABLE ks.regular_table (a int PRIMARY KEY, b int)") session.execute("CREATE INDEX composites_index on ks.regular_table (b)") @@ -605,7 +599,7 @@ class TestSecondaryIndexes(Tester): for node in cluster.nodelist(): block_until_index_is_built(node, session, 'ks', 'regular_table', 'composites_index') - insert_args = [(i, i % 2) for i in xrange(100)] + insert_args = [(i, i % 2) for i in range(100)] execute_concurrent_with_args(session, session.prepare("INSERT INTO ks.compact_table (a, b) VALUES (?, ?)"), insert_args) @@ -614,9 +608,9 @@ class TestSecondaryIndexes(Tester): insert_args) res = session.execute("SELECT * FROM ks.compact_table WHERE b = 0") - self.assertEqual(len(rows_to_list(res)), 50) + assert len(rows_to_list(res)) == 50 res = session.execute("SELECT * FROM ks.regular_table WHERE b = 0") - self.assertEqual(len(rows_to_list(res)), 50) + assert len(rows_to_list(res)) == 50 class TestSecondaryIndexesOnCollections(Tester): @@ -649,7 +643,7 @@ class TestSecondaryIndexesOnCollections(Tester): results = execute_concurrent(session, cmds * 5, raise_on_first_error=True, concurrency=200) for (success, result) in results: - self.assertTrue(success, "didn't get success on insert: {0}".format(result)) + assert success, "didn't get success on insert: {0}".format(result) session.execute("CREATE INDEX idx_single_tuple ON simple_with_tuple(single_tuple);") session.execute("CREATE INDEX idx_double_tuple ON simple_with_tuple(double_tuple);") @@ -659,25 +653,25 @@ class TestSecondaryIndexesOnCollections(Tester): # check if indexes work on existing data for n in range(50): - self.assertEqual(5, len(list(session.execute("select * from simple_with_tuple where single_tuple = ({0});".format(n))))) - self.assertEqual(0, len(list(session.execute("select * from simple_with_tuple where single_tuple = (-1);".format(n))))) - self.assertEqual(5, len(list(session.execute("select * from simple_with_tuple where double_tuple = ({0},{0});".format(n))))) - self.assertEqual(0, len(list(session.execute("select * from simple_with_tuple where double_tuple = ({0},-1);".format(n))))) - self.assertEqual(5, len(list(session.execute("select * from simple_with_tuple where triple_tuple = ({0},{0},{0});".format(n))))) - self.assertEqual(0, len(list(session.execute("select * from simple_with_tuple where triple_tuple = ({0},{0},-1);".format(n))))) - self.assertEqual(5, len(list(session.execute("select * from simple_with_tuple where nested_one = ({0},({0},{0}));".format(n))))) - self.assertEqual(0, len(list(session.execute("select * from simple_with_tuple where nested_one = ({0},({0},-1));".format(n))))) + assert 5 == len(list(session.execute("select * from simple_with_tuple where single_tuple = ({0});".format(n)))) + assert 0 == len(list(session.execute("select * from simple_with_tuple where single_tuple = (-1);".format(n)))) + assert 5 == len(list(session.execute("select * from simple_with_tuple where double_tuple = ({0},{0});".format(n)))) + assert 0 == len(list(session.execute("select * from simple_with_tuple where double_tuple = ({0},-1);".format(n)))) + assert 5 == len(list(session.execute("select * from simple_with_tuple where triple_tuple = ({0},{0},{0});".format(n)))) + assert 0 == len(list(session.execute("select * from simple_with_tuple where triple_tuple = ({0},{0},-1);".format(n)))) + assert 5 == len(list(session.execute("select * from simple_with_tuple where nested_one = ({0},({0},{0}));".format(n)))) + assert 0 == len(list(session.execute("select * from simple_with_tuple where nested_one = ({0},({0},-1));".format(n)))) # check if indexes work on new data inserted after index creation results = execute_concurrent(session, cmds * 3, raise_on_first_error=True, concurrency=200) for (success, result) in results: - self.assertTrue(success, "didn't get success on insert: {0}".format(result)) + assert success, "didn't get success on insert: {0}".format(result) time.sleep(5) for n in range(50): - self.assertEqual(8, len(list(session.execute("select * from simple_with_tuple where single_tuple = ({0});".format(n))))) - self.assertEqual(8, len(list(session.execute("select * from simple_with_tuple where double_tuple = ({0},{0});".format(n))))) - self.assertEqual(8, len(list(session.execute("select * from simple_with_tuple where triple_tuple = ({0},{0},{0});".format(n))))) - self.assertEqual(8, len(list(session.execute("select * from simple_with_tuple where nested_one = ({0},({0},{0}));".format(n))))) + assert 8 == len(list(session.execute("select * from simple_with_tuple where single_tuple = ({0});".format(n)))) + assert 8 == len(list(session.execute("select * from simple_with_tuple where double_tuple = ({0},{0});".format(n)))) + assert 8 == len(list(session.execute("select * from simple_with_tuple where triple_tuple = ({0},{0},{0});".format(n)))) + assert 8 == len(list(session.execute("select * from simple_with_tuple where nested_one = ({0},({0},{0}));".format(n)))) # check if indexes work on mutated data for n in range(5): @@ -698,15 +692,15 @@ class TestSecondaryIndexesOnCollections(Tester): session.execute("update simple_with_tuple set nested_one = (-999,(-999,-999)) where id = {0}".format(row.id)) for n in range(5): - self.assertEqual(0, len(list(session.execute("select * from simple_with_tuple where single_tuple = ({0});".format(n))))) - self.assertEqual(0, len(list(session.execute("select * from simple_with_tuple where double_tuple = ({0},{0});".format(n))))) - self.assertEqual(0, len(list(session.execute("select * from simple_with_tuple where triple_tuple = ({0},{0},{0});".format(n))))) - self.assertEqual(0, len(list(session.execute("select * from simple_with_tuple where nested_one = ({0},({0},{0}));".format(n))))) + assert 0 == len(list(session.execute("select * from simple_with_tuple where single_tuple = ({0});".format(n)))) + assert 0 == len(list(session.execute("select * from simple_with_tuple where double_tuple = ({0},{0});".format(n)))) + assert 0 == len(list(session.execute("select * from simple_with_tuple where triple_tuple = ({0},{0},{0});".format(n)))) + assert 0 == len(list(session.execute("select * from simple_with_tuple where nested_one = ({0},({0},{0}));".format(n)))) - self.assertEqual(40, len(list(session.execute("select * from simple_with_tuple where single_tuple = (-999);")))) - self.assertEqual(40, len(list(session.execute("select * from simple_with_tuple where double_tuple = (-999,-999);")))) - self.assertEqual(40, len(list(session.execute("select * from simple_with_tuple where triple_tuple = (-999,-999,-999);")))) - self.assertEqual(40, len(list(session.execute("select * from simple_with_tuple where nested_one = (-999,(-999,-999));")))) + assert 40 == len(list(session.execute("select * from simple_with_tuple where single_tuple = (-999);"))) + assert 40 == len(list(session.execute("select * from simple_with_tuple where double_tuple = (-999,-999);"))) + assert 40 == len(list(session.execute("select * from simple_with_tuple where triple_tuple = (-999,-999,-999);"))) + assert 40 == len(list(session.execute("select * from simple_with_tuple where nested_one = (-999,(-999,-999));"))) def test_list_indexes(self): """ @@ -731,7 +725,7 @@ class TestSecondaryIndexesOnCollections(Tester): stmt = ("SELECT * from list_index_search.users where uuids contains {some_uuid}").format(some_uuid=uuid.uuid4()) row = list(session.execute(stmt)) - self.assertEqual(0, len(row)) + assert 0 == len(row) # add a row which doesn't specify data for the indexed column, and query again user1_uuid = uuid.uuid4() @@ -742,7 +736,7 @@ class TestSecondaryIndexesOnCollections(Tester): stmt = ("SELECT * from list_index_search.users where uuids contains {some_uuid}").format(some_uuid=uuid.uuid4()) row = list(session.execute(stmt)) - self.assertEqual(0, len(row)) + assert 0 == len(row) _id = uuid.uuid4() # alter the row to add a single item to the indexed list @@ -752,7 +746,7 @@ class TestSecondaryIndexesOnCollections(Tester): stmt = ("SELECT * from list_index_search.users where uuids contains {some_uuid}").format(some_uuid=_id) row = list(session.execute(stmt)) - self.assertEqual(1, len(row)) + assert 1 == len(row) # add a bunch of user records and query them back shared_uuid = uuid.uuid4() # this uuid will be on all records @@ -779,7 +773,7 @@ class TestSecondaryIndexesOnCollections(Tester): stmt = ("SELECT * from list_index_search.users where uuids contains {shared_uuid}").format(shared_uuid=shared_uuid) rows = list(session.execute(stmt)) result = [row for row in rows] - self.assertEqual(50000, len(result)) + assert 50000 == len(result) # shuffle the log in-place, and double-check a slice of records by querying the secondary index random.shuffle(log) @@ -789,14 +783,14 @@ class TestSecondaryIndexesOnCollections(Tester): ).format(unshared_uuid=log_entry['unshared_uuid']) rows = list(session.execute(stmt)) - self.assertEqual(1, len(rows)) + assert 1 == len(rows) db_user_id, db_email, db_uuids = rows[0] - self.assertEqual(db_user_id, log_entry['user_id']) - self.assertEqual(db_email, log_entry['email']) - self.assertEqual(str(db_uuids[0]), str(shared_uuid)) - self.assertEqual(str(db_uuids[1]), str(log_entry['unshared_uuid'])) + assert db_user_id == log_entry['user_id'] + assert db_email == log_entry['email'] + assert str(db_uuids[0]) == str(shared_uuid) + assert str(db_uuids[1]) == str(log_entry['unshared_uuid']) def test_set_indexes(self): """ @@ -820,7 +814,7 @@ class TestSecondaryIndexesOnCollections(Tester): stmt = ("SELECT * from set_index_search.users where uuids contains {some_uuid}").format(some_uuid=uuid.uuid4()) row = list(session.execute(stmt)) - self.assertEqual(0, len(row)) + assert 0 == len(row) # add a row which doesn't specify data for the indexed column, and query again user1_uuid = uuid.uuid4() @@ -830,7 +824,7 @@ class TestSecondaryIndexesOnCollections(Tester): stmt = ("SELECT * from set_index_search.users where uuids contains {some_uuid}").format(some_uuid=uuid.uuid4()) row = list(session.execute(stmt)) - self.assertEqual(0, len(row)) + assert 0 == len(row) _id = uuid.uuid4() # alter the row to add a single item to the indexed set @@ -839,7 +833,7 @@ class TestSecondaryIndexesOnCollections(Tester): stmt = ("SELECT * from set_index_search.users where uuids contains {some_uuid}").format(some_uuid=_id) row = list(session.execute(stmt)) - self.assertEqual(1, len(row)) + assert 1 == len(row) # add a bunch of user records and query them back shared_uuid = uuid.uuid4() # this uuid will be on all records @@ -866,7 +860,7 @@ class TestSecondaryIndexesOnCollections(Tester): stmt = ("SELECT * from set_index_search.users where uuids contains {shared_uuid}").format(shared_uuid=shared_uuid) rows = session.execute(stmt) result = [row for row in rows] - self.assertEqual(50000, len(result)) + assert 50000 == len(result) # shuffle the log in-place, and double-check a slice of records by querying the secondary index random.shuffle(log) @@ -876,14 +870,14 @@ class TestSecondaryIndexesOnCollections(Tester): ).format(unshared_uuid=log_entry['unshared_uuid']) rows = list(session.execute(stmt)) - self.assertEqual(1, len(rows)) + assert 1 == len(rows) db_user_id, db_email, db_uuids = rows[0] - self.assertEqual(db_user_id, log_entry['user_id']) - self.assertEqual(db_email, log_entry['email']) - self.assertTrue(shared_uuid in db_uuids) - self.assertTrue(log_entry['unshared_uuid'] in db_uuids) + assert db_user_id == log_entry['user_id'] + assert db_email == log_entry['email'] + assert shared_uuid in db_uuids + assert log_entry['unshared_uuid'] in db_uuids @since('3.0') def test_multiple_indexes_on_single_map_column(self): @@ -916,29 +910,29 @@ class TestSecondaryIndexesOnCollections(Tester): session.execute("INSERT INTO map_tbl (id, amap) values (uuid(), {'faz': 1, 'baz': 2});") value_search = list(session.execute("SELECT * FROM map_tbl WHERE amap CONTAINS 1")) - self.assertEqual(2, len(value_search), "incorrect number of rows when querying on map values") + assert 2 == len(value_search), "incorrect number of rows when querying on map values" key_search = list(session.execute("SELECT * FROM map_tbl WHERE amap CONTAINS KEY 'foo'")) - self.assertEqual(1, len(key_search), "incorrect number of rows when querying on map keys") + assert 1 == len(key_search), "incorrect number of rows when querying on map keys" entries_search = list(session.execute("SELECT * FROM map_tbl WHERE amap['foo'] = 1")) - self.assertEqual(1, len(entries_search), "incorrect number of rows when querying on map entries") + assert 1 == len(entries_search), "incorrect number of rows when querying on map entries" session.cluster.refresh_schema_metadata() table_meta = session.cluster.metadata.keyspaces["map_double_index"].tables["map_tbl"] - self.assertEqual(3, len(table_meta.indexes)) - self.assertItemsEqual(['map_keys', 'map_values', 'map_entries'], table_meta.indexes) - self.assertEqual(3, len(session.cluster.metadata.keyspaces["map_double_index"].indexes)) + assert 3 == len(table_meta.indexes) + assert {'map_keys', 'map_values', 'map_entries'} == set(table_meta.indexes.keys()) + assert 3 == len(session.cluster.metadata.keyspaces["map_double_index"].indexes) - self.assertTrue('map_keys' in table_meta.export_as_string()) - self.assertTrue('map_values' in table_meta.export_as_string()) - self.assertTrue('map_entries' in table_meta.export_as_string()) + assert 'map_keys' in table_meta.export_as_string() + assert 'map_values' in table_meta.export_as_string() + assert 'map_entries' in table_meta.export_as_string() session.execute("DROP TABLE map_tbl") session.cluster.refresh_schema_metadata() - self.assertEqual(0, len(session.cluster.metadata.keyspaces["map_double_index"].indexes)) + assert 0 == len(session.cluster.metadata.keyspaces["map_double_index"].indexes) - @skipIf(OFFHEAP_MEMTABLES, 'Hangs with offheap memtables') + @pytest.mark.no_offheap_memtables def test_map_indexes(self): """ Checks that secondary indexes on maps work for querying on both keys and values @@ -961,7 +955,7 @@ class TestSecondaryIndexesOnCollections(Tester): stmt = "SELECT * from map_index_search.users where uuids contains key {some_uuid}".format(some_uuid=uuid.uuid4()) rows = list(session.execute(stmt)) - self.assertEqual(0, len(rows)) + assert 0 == len(rows) # add a row which doesn't specify data for the indexed column, and query again user1_uuid = uuid.uuid4() @@ -972,7 +966,7 @@ class TestSecondaryIndexesOnCollections(Tester): stmt = ("SELECT * from map_index_search.users where uuids contains key {some_uuid}").format(some_uuid=uuid.uuid4()) rows = list(session.execute(stmt)) - self.assertEqual(0, len(rows)) + assert 0 == len(rows) _id = uuid.uuid4() @@ -983,7 +977,7 @@ class TestSecondaryIndexesOnCollections(Tester): stmt = ("SELECT * from map_index_search.users where uuids contains key {some_uuid}").format(some_uuid=_id) rows = list(session.execute(stmt)) - self.assertEqual(1, len(rows)) + assert 1 == len(rows) # add a bunch of user records and query them back shared_uuid = uuid.uuid4() # this uuid will be on all records @@ -1012,7 +1006,7 @@ class TestSecondaryIndexesOnCollections(Tester): ).format(shared_uuid=shared_uuid) rows = session.execute(stmt) result = [row for row in rows] - self.assertEqual(50000, len(result)) + assert 50000 == len(result) # shuffle the log in-place, and double-check a slice of records by querying the secondary index on keys random.shuffle(log) @@ -1023,15 +1017,15 @@ class TestSecondaryIndexesOnCollections(Tester): row = session.execute(stmt) result = list(row) - rows = self.assertEqual(1, len(result)) + assert 1 == len(result) db_user_id, db_email, db_uuids = result[0] - self.assertEqual(db_user_id, log_entry['user_id']) - self.assertEqual(db_email, log_entry['email']) + assert db_user_id == log_entry['user_id'] + assert db_email == log_entry['email'] - self.assertTrue(shared_uuid in db_uuids) - self.assertTrue(log_entry['unshared_uuid1'] in db_uuids) + assert shared_uuid in db_uuids + assert log_entry['unshared_uuid1'] in db_uuids # attempt to add an index on map values as well (should fail pre 3.0) stmt = "CREATE INDEX user_uuids_values on map_index_search.users (uuids);" @@ -1066,14 +1060,14 @@ class TestSecondaryIndexesOnCollections(Tester): ).format(unshared_uuid2=log_entry['unshared_uuid2']) rows = list(session.execute(stmt)) - self.assertEqual(1, len(rows), rows) + assert 1 == len(rows), rows db_user_id, db_email, db_uuids = rows[0] - self.assertEqual(db_user_id, log_entry['user_id']) - self.assertEqual(db_email, log_entry['email']) + assert db_user_id == log_entry['user_id'] + assert db_email == log_entry['email'] - self.assertTrue(shared_uuid in db_uuids) - self.assertTrue(log_entry['unshared_uuid2'] in db_uuids.values()) + assert shared_uuid in db_uuids + assert log_entry['unshared_uuid2'] in list(db_uuids.values()) class TestUpgradeSecondaryIndexes(Tester): @@ -1106,22 +1100,22 @@ class TestUpgradeSecondaryIndexes(Tester): node1.drain() node1.watch_log_for("DRAINED") node1.stop(wait_other_notice=False) - debug("Upgrading to current version") + logger.debug("Upgrading to current version") self.set_node_to_current_version(node1) node1.start(wait_other_notice=True) [node1] = cluster.nodelist() session = self.patient_cql_connection(node1) - debug(cluster.cassandra_version()) + logger.debug(cluster.cassandra_version()) assert_one(session, query, [0, 0]) def upgrade_to_version(self, tag, nodes=None): - debug('Upgrading to ' + tag) + logger.debug('Upgrading to ' + tag) if nodes is None: nodes = self.cluster.nodelist() for node in nodes: - debug('Shutting down node: ' + node.name) + logger.debug('Shutting down node: ' + node.name) node.drain() node.watch_log_for("DRAINED") node.stop(wait_other_notice=False) @@ -1129,25 +1123,24 @@ class TestUpgradeSecondaryIndexes(Tester): # Update Cassandra Directory for node in nodes: node.set_install_dir(version=tag) - debug("Set new cassandra dir for %s: %s" % (node.name, node.get_install_dir())) + logger.debug("Set new cassandra dir for %s: %s" % (node.name, node.get_install_dir())) self.cluster.set_install_dir(version=tag) # Restart nodes on new version for node in nodes: - debug('Starting %s on new version (%s)' % (node.name, tag)) + logger.debug('Starting %s on new version (%s)' % (node.name, tag)) # Setup log4j / logback again (necessary moving from 2.0 -> 2.1): node.set_log_level("INFO") node.start(wait_other_notice=True) # node.nodetool('upgradesstables -a') -@skipIf(CASSANDRA_VERSION_FROM_BUILD == '3.9', "Test doesn't run on 3.9") @since('3.10') class TestPreJoinCallback(Tester): - def __init__(self, *args, **kwargs): - # Ignore these log patterns: - self.ignore_log_patterns = [ + @pytest.fixture(autouse=True) + def fixture_add_additional_log_patterns(self, fixture_dtest_setup): + fixture_dtest_setup.ignore_log_patterns = [ # ignore all streaming errors during bootstrap r'Exception encountered during startup', r'Streaming error occurred', @@ -1156,7 +1149,6 @@ class TestPreJoinCallback(Tester): r'\[Stream.*\] Remote peer 127.0.0.\d:7000 failed stream session', r'Error while waiting on bootstrap to complete. Bootstrap will have to be restarted.' ] - Tester.__init__(self, *args, **kwargs) def _base_test(self, joinFn): cluster = self.cluster @@ -1182,16 +1174,16 @@ class TestPreJoinCallback(Tester): # Run the join function to test joinFn(cluster, tokens[1]) - def bootstrap_test(self): + def test_bootstrap(self): def bootstrap(cluster, token): node2 = new_node(cluster) node2.set_configuration_options(values={'initial_token': token}) node2.start(wait_for_binary_proto=True) - self.assertTrue(node2.grep_log('Executing pre-join post-bootstrap tasks')) + assert node2.grep_log('Executing pre-join post-bootstrap tasks') self._base_test(bootstrap) - def resume_test(self): + def test_resume(self): def resume(cluster, token): node1 = cluster.nodes['node1'] # set up byteman on node1 to inject a failure when streaming to node2 @@ -1217,33 +1209,33 @@ class TestPreJoinCallback(Tester): node2.nodetool("bootstrap resume") assert_bootstrap_state(self, node2, 'COMPLETED') - self.assertTrue(node2.grep_log('Executing pre-join post-bootstrap tasks')) + assert node2.grep_log('Executing pre-join post-bootstrap tasks') self._base_test(resume) - def manual_join_test(self): + def test_manual_join(self): def manual_join(cluster, token): node2 = new_node(cluster) node2.set_configuration_options(values={'initial_token': token}) node2.start(join_ring=False, wait_for_binary_proto=True, wait_other_notice=240) - self.assertTrue(node2.grep_log('Not joining ring as requested')) - self.assertFalse(node2.grep_log('Executing pre-join')) + assert node2.grep_log('Not joining ring as requested') + assert not node2.grep_log('Executing pre-join') node2.nodetool("join") - self.assertTrue(node2.grep_log('Executing pre-join post-bootstrap tasks')) + assert node2.grep_log('Executing pre-join post-bootstrap tasks') self._base_test(manual_join) - def write_survey_test(self): + def test_write_survey(self): def write_survey_and_join(cluster, token): node2 = new_node(cluster) node2.set_configuration_options(values={'initial_token': token}) node2.start(jvm_args=["-Dcassandra.write_survey=true"], wait_for_binary_proto=True) - self.assertTrue(node2.grep_log('Startup complete, but write survey mode is active, not becoming an active ring member.')) - self.assertFalse(node2.grep_log('Executing pre-join')) + assert node2.grep_log('Startup complete, but write survey mode is active, not becoming an active ring member.') + assert not node2.grep_log('Executing pre-join') node2.nodetool("join") - self.assertTrue(node2.grep_log('Leaving write survey mode and joining ring at operator request')) - self.assertTrue(node2.grep_log('Executing pre-join post-bootstrap tasks')) + assert node2.grep_log('Leaving write survey mode and joining ring at operator request') + assert node2.grep_log('Executing pre-join post-bootstrap tasks') self._base_test(write_survey_and_join) http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/snapshot_test.py ---------------------------------------------------------------------- diff --git a/snapshot_test.py b/snapshot_test.py index 1aa5a70..9b561ce 100644 --- a/snapshot_test.py +++ b/snapshot_test.py @@ -4,16 +4,20 @@ import os import shutil import subprocess import time +import pytest +import logging from cassandra.concurrent import execute_concurrent_with_args -from dtest import (Tester, cleanup_cluster, create_ccm_cluster, create_ks, - debug, get_test_path) +from dtest_setup_overrides import DTestSetupOverrides +from dtest import Tester, create_ks from tools.assertions import assert_one from tools.files import replace_in_file, safe_mkdtemp from tools.hacks import advance_to_next_cl_segment -from tools.misc import ImmutableMapping -from tools.decorators import since +from tools.misc import ImmutableMapping, get_current_test_name + +since = pytest.mark.since +logger = logging.getLogger(__name__) class SnapshotTester(Tester): @@ -28,10 +32,10 @@ class SnapshotTester(Tester): execute_concurrent_with_args(session, insert_statement, args, concurrency=20) def make_snapshot(self, node, ks, cf, name): - debug("Making snapshot....") + logger.debug("Making snapshot....") node.flush() snapshot_cmd = 'snapshot {ks} -cf {cf} -t {name}'.format(ks=ks, cf=cf, name=name) - debug("Running snapshot cmd: {snapshot_cmd}".format(snapshot_cmd=snapshot_cmd)) + logger.debug("Running snapshot cmd: {snapshot_cmd}".format(snapshot_cmd=snapshot_cmd)) node.nodetool(snapshot_cmd) tmpdir = safe_mkdtemp() os.mkdir(os.path.join(tmpdir, ks)) @@ -47,8 +51,8 @@ class SnapshotTester(Tester): snapshot_dir = snapshot_dirs[0] else: continue - debug("snapshot_dir is : " + snapshot_dir) - debug("snapshot copy is : " + tmpdir) + logger.debug("snapshot_dir is : " + snapshot_dir) + logger.debug("snapshot copy is : " + tmpdir) # Copy files from the snapshot dir to existing temp dir distutils.dir_util.copy_tree(str(snapshot_dir), os.path.join(tmpdir, str(x), ks, cf)) @@ -57,8 +61,8 @@ class SnapshotTester(Tester): return tmpdir def restore_snapshot(self, snapshot_dir, node, ks, cf): - debug("Restoring snapshot....") - for x in xrange(0, self.cluster.data_dir_count): + logger.debug("Restoring snapshot....") + for x in range(0, self.cluster.data_dir_count): snap_dir = os.path.join(snapshot_dir, str(x), ks, cf) if os.path.exists(snap_dir): ip = node.address() @@ -70,11 +74,11 @@ class SnapshotTester(Tester): if exit_status != 0: raise Exception("sstableloader command '%s' failed; exit status: %d'; stdout: %s; stderr: %s" % - (" ".join(args), exit_status, stdout, stderr)) + (" ".join(args), exit_status, stdout.decode("utf-8"), stderr.decode("utf-8"))) def restore_snapshot_schema(self, snapshot_dir, node, ks, cf): - debug("Restoring snapshot schema....") - for x in xrange(0, self.cluster.data_dir_count): + logger.debug("Restoring snapshot schema....") + for x in range(0, self.cluster.data_dir_count): schema_path = os.path.join(snapshot_dir, str(x), ks, cf, 'schema.cql') if os.path.exists(schema_path): node.run_cqlsh(cmds="SOURCE '%s'" % schema_path) @@ -96,13 +100,13 @@ class TestSnapshot(SnapshotTester): # away when we restore: self.insert_rows(session, 100, 200) rows = session.execute('SELECT count(*) from ks.cf') - self.assertEqual(rows[0][0], 200) + assert rows[0][0] == 200 # Drop the keyspace, make sure we have no data: session.execute('DROP KEYSPACE ks') self.create_schema(session) rows = session.execute('SELECT count(*) from ks.cf') - self.assertEqual(rows[0][0], 0) + assert rows[0][0] == 0 # Restore data from snapshot: self.restore_snapshot(snapshot_dir, node1, 'ks', 'cf') @@ -110,10 +114,10 @@ class TestSnapshot(SnapshotTester): rows = session.execute('SELECT count(*) from ks.cf') # clean up - debug("removing snapshot_dir: " + snapshot_dir) + logger.debug("removing snapshot_dir: " + snapshot_dir) shutil.rmtree(snapshot_dir) - self.assertEqual(rows[0][0], 100) + assert rows[0][0] == 100 @since('3.0') def test_snapshot_and_restore_drop_table_remove_dropped_column(self): @@ -146,7 +150,7 @@ class TestSnapshot(SnapshotTester): assert_one(session, "SELECT * FROM ks.cf", [1, "a", "b"]) # Clean up - debug("removing snapshot_dir: " + snapshot_dir) + logger.debug("removing snapshot_dir: " + snapshot_dir) shutil.rmtree(snapshot_dir) @since('3.11') @@ -182,22 +186,27 @@ class TestSnapshot(SnapshotTester): assert_one(session, "SELECT * FROM ks.cf", [1, "a"]) # Clean up - debug("removing snapshot_dir: " + snapshot_dir) + logger.debug("removing snapshot_dir: " + snapshot_dir) shutil.rmtree(snapshot_dir) class TestArchiveCommitlog(SnapshotTester): - cluster_options = ImmutableMapping({'commitlog_segment_size_in_mb': 1}) + + @pytest.fixture(scope='function', autouse=True) + def fixture_dtest_setup_overrides(self): + dtest_setup_overrides = DTestSetupOverrides() + dtest_setup_overrides.cluster_options = ImmutableMapping({'start_rpc': 'true'}) + return dtest_setup_overrides def make_snapshot(self, node, ks, cf, name): - debug("Making snapshot....") + logger.debug("Making snapshot....") node.flush() snapshot_cmd = 'snapshot {ks} -cf {cf} -t {name}'.format(ks=ks, cf=cf, name=name) - debug("Running snapshot cmd: {snapshot_cmd}".format(snapshot_cmd=snapshot_cmd)) + logger.debug("Running snapshot cmd: {snapshot_cmd}".format(snapshot_cmd=snapshot_cmd)) node.nodetool(snapshot_cmd) tmpdirs = [] base_tmpdir = safe_mkdtemp() - for x in xrange(0, self.cluster.data_dir_count): + for x in range(0, self.cluster.data_dir_count): tmpdir = os.path.join(base_tmpdir, str(x)) os.mkdir(tmpdir) # Copy files from the snapshot dir to existing temp dir @@ -207,7 +216,7 @@ class TestArchiveCommitlog(SnapshotTester): return tmpdirs def restore_snapshot(self, snapshot_dir, node, ks, cf, name): - debug("Restoring snapshot for cf ....") + logger.debug("Restoring snapshot for cf ....") data_dir = os.path.join(node.get_path(), 'data{0}'.format(os.path.basename(snapshot_dir))) cfs = [s for s in os.listdir(snapshot_dir) if s.startswith(cf + "-")] if len(cfs) > 0: @@ -220,7 +229,7 @@ class TestArchiveCommitlog(SnapshotTester): os.mkdir(os.path.join(data_dir, ks)) os.mkdir(os.path.join(data_dir, ks, cf_id)) - debug("snapshot_dir is : " + snapshot_dir) + logger.debug("snapshot_dir is : " + snapshot_dir) distutils.dir_util.copy_tree(snapshot_dir, os.path.join(data_dir, ks, cf_id)) def test_archive_commitlog(self): @@ -232,7 +241,7 @@ class TestArchiveCommitlog(SnapshotTester): """ self.run_archive_commitlog(restore_point_in_time=False, archive_active_commitlogs=True) - def dont_test_archive_commitlog(self): + def test_dont_archive_commitlog(self): """ Run the archive commitlog test, but forget to add the restore commands """ @@ -267,7 +276,7 @@ class TestArchiveCommitlog(SnapshotTester): # Create a temp directory for storing commitlog archives: tmp_commitlog = safe_mkdtemp() - debug("tmp_commitlog: " + tmp_commitlog) + logger.debug("tmp_commitlog: " + tmp_commitlog) # Edit commitlog_archiving.properties and set an archive # command: @@ -289,14 +298,14 @@ class TestArchiveCommitlog(SnapshotTester): ) session.execute('CREATE TABLE ks.cf ( key bigint PRIMARY KEY, val text);') - debug("Writing first 30,000 rows...") + logger.debug("Writing first 30,000 rows...") self.insert_rows(session, 0, 30000) # Record when this first set of inserts finished: insert_cutoff_times = [time.gmtime()] # Delete all commitlog backups so far: for f in glob.glob(tmp_commitlog + "/*"): - debug('Removing {}'.format(f)) + logger.debug('Removing {}'.format(f)) os.remove(f) snapshot_dirs = self.make_snapshot(node1, 'ks', 'cf', 'basic') @@ -323,14 +332,14 @@ class TestArchiveCommitlog(SnapshotTester): try: # Write more data: - debug("Writing second 30,000 rows...") + logger.debug("Writing second 30,000 rows...") self.insert_rows(session, 30000, 60000) node1.flush() time.sleep(10) # Record when this second set of inserts finished: insert_cutoff_times.append(time.gmtime()) - debug("Writing final 5,000 rows...") + logger.debug("Writing final 5,000 rows...") self.insert_rows(session, 60000, 65000) # Record when the third set of inserts finished: insert_cutoff_times.append(time.gmtime()) @@ -340,17 +349,16 @@ class TestArchiveCommitlog(SnapshotTester): rows = session.execute('SELECT count(*) from ks.cf') # Make sure we have the same amount of rows as when we snapshotted: - self.assertEqual(rows[0][0], 65000) + assert rows[0][0] == 65000 # Check that there are at least one commit log backed up that # is not one of the active commit logs: commitlog_dir = os.path.join(node1.get_path(), 'commitlogs') - debug("node1 commitlog dir: " + commitlog_dir) - debug("node1 commitlog dir contents: " + str(os.listdir(commitlog_dir))) - debug("tmp_commitlog contents: " + str(os.listdir(tmp_commitlog))) + logger.debug("node1 commitlog dir: " + commitlog_dir) + logger.debug("node1 commitlog dir contents: " + str(os.listdir(commitlog_dir))) + logger.debug("tmp_commitlog contents: " + str(os.listdir(tmp_commitlog))) - self.assertNotEqual(set(os.listdir(tmp_commitlog)) - set(os.listdir(commitlog_dir)), - set()) + assert_directory_not_empty(tmp_commitlog, commitlog_dir) cluster.flush() cluster.compact() @@ -358,15 +366,16 @@ class TestArchiveCommitlog(SnapshotTester): # Destroy the cluster cluster.stop() - debug("node1 commitlog dir contents after stopping: " + str(os.listdir(commitlog_dir))) - debug("tmp_commitlog contents after stopping: " + str(os.listdir(tmp_commitlog))) + logger.debug("node1 commitlog dir contents after stopping: " + str(os.listdir(commitlog_dir))) + logger.debug("tmp_commitlog contents after stopping: " + str(os.listdir(tmp_commitlog))) - self.copy_logs(self.cluster, name=self.id().split(".")[0] + "_pre-restore") - cleanup_cluster(self.cluster, self.test_path) - self.test_path = get_test_path() - cluster = self.cluster = create_ccm_cluster(self.test_path, name='test') + self.copy_logs(name=get_current_test_name() + "_pre-restore") + self.fixture_dtest_setup.cleanup_and_replace_cluster() + cluster = self.cluster cluster.populate(1) - node1, = cluster.nodelist() + nodes = cluster.nodelist() + assert len(nodes) == 1 + node1 = nodes[0] # Restore schema from snapshots: for system_ks_snapshot_dir in system_ks_snapshot_dirs: @@ -400,7 +409,7 @@ class TestArchiveCommitlog(SnapshotTester): rows = session.execute('SELECT count(*) from ks.cf') # Make sure we have the same amount of rows as when we snapshotted: - self.assertEqual(rows[0][0], 30000) + assert rows[0][0] == 30000 # Edit commitlog_archiving.properties. Remove the archive # command and set a restore command and restore_directories: @@ -416,7 +425,7 @@ class TestArchiveCommitlog(SnapshotTester): replace_in_file(os.path.join(node1.get_path(), 'conf', 'commitlog_archiving.properties'), [(r'^restore_point_in_time=.*$', 'restore_point_in_time={restore_time}'.format(restore_time=restore_time))]) - debug("Restarting node1..") + logger.debug("Restarting node1..") node1.stop() node1.start(wait_for_binary_proto=True) @@ -428,31 +437,31 @@ class TestArchiveCommitlog(SnapshotTester): # Now we should have 30000 rows from the snapshot + 30000 rows # from the commitlog backups: if not restore_archived_commitlog: - self.assertEqual(rows[0][0], 30000) + assert rows[0][0] == 30000 elif restore_point_in_time: - self.assertEqual(rows[0][0], 60000) + assert rows[0][0] == 60000 else: - self.assertEqual(rows[0][0], 65000) + assert rows[0][0] == 65000 finally: # clean up - debug("removing snapshot_dir: " + ",".join(snapshot_dirs)) + logger.debug("removing snapshot_dir: " + ",".join(snapshot_dirs)) for snapshot_dir in snapshot_dirs: shutil.rmtree(snapshot_dir) - debug("removing snapshot_dir: " + ",".join(system_ks_snapshot_dirs)) + logger.debug("removing snapshot_dir: " + ",".join(system_ks_snapshot_dirs)) for system_ks_snapshot_dir in system_ks_snapshot_dirs: shutil.rmtree(system_ks_snapshot_dir) - debug("removing snapshot_dir: " + ",".join(system_cfs_snapshot_dirs)) + logger.debug("removing snapshot_dir: " + ",".join(system_cfs_snapshot_dirs)) for system_cfs_snapshot_dir in system_cfs_snapshot_dirs: shutil.rmtree(system_cfs_snapshot_dir) - debug("removing snapshot_dir: " + ",".join(system_ut_snapshot_dirs)) + logger.debug("removing snapshot_dir: " + ",".join(system_ut_snapshot_dirs)) for system_ut_snapshot_dir in system_ut_snapshot_dirs: shutil.rmtree(system_ut_snapshot_dir) - debug("removing snapshot_dir: " + ",".join(system_col_snapshot_dirs)) + logger.debug("removing snapshot_dir: " + ",".join(system_col_snapshot_dirs)) for system_col_snapshot_dir in system_col_snapshot_dirs: shutil.rmtree(system_col_snapshot_dir) - debug("removing tmp_commitlog: " + tmp_commitlog) + logger.debug("removing tmp_commitlog: " + tmp_commitlog) shutil.rmtree(tmp_commitlog) def test_archive_and_restore_commitlog_repeatedly(self): @@ -461,14 +470,13 @@ class TestArchiveCommitlog(SnapshotTester): Run archive commit log restoration test repeatedly to make sure it is idempotent and doesn't fail if done repeatedly """ - cluster = self.cluster cluster.populate(1) node1 = cluster.nodelist()[0] # Create a temp directory for storing commitlog archives: tmp_commitlog = safe_mkdtemp() - debug("tmp_commitlog: {}".format(tmp_commitlog)) + logger.debug("tmp_commitlog: {}".format(tmp_commitlog)) # Edit commitlog_archiving.properties and set an archive # command: @@ -481,32 +489,31 @@ class TestArchiveCommitlog(SnapshotTester): cluster.start(wait_for_binary_proto=True) - debug("Creating initial connection") + logger.debug("Creating initial connection") session = self.patient_cql_connection(node1) create_ks(session, 'ks', 1) session.execute('CREATE TABLE ks.cf ( key bigint PRIMARY KEY, val text);') - debug("Writing 30,000 rows...") + logger.debug("Writing 30,000 rows...") self.insert_rows(session, 0, 60000) try: # Check that there are at least one commit log backed up that # is not one of the active commit logs: commitlog_dir = os.path.join(node1.get_path(), 'commitlogs') - debug("node1 commitlog dir: " + commitlog_dir) + logger.debug("node1 commitlog dir: " + commitlog_dir) cluster.flush() - self.assertNotEqual(set(os.listdir(tmp_commitlog)) - set(os.listdir(commitlog_dir)), - set()) + assert_directory_not_empty(tmp_commitlog, commitlog_dir) - debug("Flushing and doing first restart") + logger.debug("Flushing and doing first restart") cluster.compact() node1.drain() # restart the node which causes the active commitlogs to be archived node1.stop() node1.start(wait_for_binary_proto=True) - debug("Stopping and second restart") + logger.debug("Stopping and second restart") node1.stop() node1.start(wait_for_binary_proto=True) @@ -514,7 +521,14 @@ class TestArchiveCommitlog(SnapshotTester): session = self.patient_cql_connection(node1) rows = session.execute('SELECT count(*) from ks.cf') - self.assertEqual(rows[0][0], 60000) + assert rows[0][0] == 60000 finally: - debug("removing tmp_commitlog: " + tmp_commitlog) + logger.debug("removing tmp_commitlog: " + tmp_commitlog) shutil.rmtree(tmp_commitlog) + + +def assert_directory_not_empty(tmp_commitlog, commitlog_dir): + commitlog_dir_ret = set(commitlog_dir) + for tmp_commitlog_file in set(os.listdir(tmp_commitlog)): + commitlog_dir_ret.discard(tmp_commitlog_file) + assert len(commitlog_dir_ret) != 0 http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/snitch_test.py ---------------------------------------------------------------------- diff --git a/snitch_test.py b/snitch_test.py index 334a1a1..164c81e 100644 --- a/snitch_test.py +++ b/snitch_test.py @@ -1,16 +1,17 @@ import os import socket import time - -from nose.plugins.attrib import attr +import pytest +import logging from cassandra import ConsistencyLevel -from dtest import Tester, debug -from nose.tools import assert_true, assert_equal, assert_greater_equal -from tools.decorators import since +from dtest import Tester from tools.jmxutils import (JolokiaAgent, make_mbean, remove_perf_disable_shared_mem) +since = pytest.mark.since +logger = logging.getLogger(__name__) + @since('2.2.5') class TestGossipingPropertyFileSnitch(Tester): @@ -103,21 +104,21 @@ class TestGossipingPropertyFileSnitch(Tester): # read data from node2 just to make sure data and connectivity is OK session = self.patient_exclusive_cql_connection(node2) new_rows = list(session.execute("SELECT * FROM {}".format(stress_table))) - self.assertEquals(original_rows, new_rows) + assert original_rows == new_rows out, err, _ = node1.nodetool('gossipinfo') - self.assertEqual(0, len(err), err) - debug(out) + assert 0 == len(err), err + logger.debug(out) - self.assertIn("/{}".format(NODE1_BROADCAST_ADDRESS), out) - self.assertIn("INTERNAL_IP:{}:{}".format('9' if running40 else '6', NODE1_LISTEN_ADDRESS), out) - self.assertIn("INTERNAL_ADDRESS_AND_PORT:7:{}".format(NODE1_40_LISTEN_ADDRESS), out) - self.assertIn("/{}".format(NODE2_BROADCAST_ADDRESS), out) - self.assertIn("INTERNAL_IP:{}:{}".format('9' if running40 else '6', NODE2_LISTEN_ADDRESS), out) - self.assertIn("INTERNAL_ADDRESS_AND_PORT:7:{}".format(NODE1_40_LISTEN_ADDRESS), out) + assert "/{}".format(NODE1_BROADCAST_ADDRESS) in out + assert "INTERNAL_IP:{}:{}".format('9' if running40 else '6', NODE1_LISTEN_ADDRESS) in out + assert "INTERNAL_ADDRESS_AND_PORT:7:{}".format(NODE1_40_LISTEN_ADDRESS) in out + assert "/{}".format(NODE2_BROADCAST_ADDRESS) in out + assert "INTERNAL_IP:{}:{}".format('9' if running40 else '6', NODE2_LISTEN_ADDRESS) in out + assert "INTERNAL_ADDRESS_AND_PORT:7:{}".format(NODE1_40_LISTEN_ADDRESS) in out class TestDynamicEndpointSnitch(Tester): - @attr('resource-intensive') + @pytest.mark.resource_intensive @since('3.10') def test_multidatacenter_local_quorum(self): ''' @@ -175,20 +176,18 @@ class TestDynamicEndpointSnitch(Tester): for x in range(0, 300): degraded_reads_before = bad_jmx.read_attribute(read_stage, 'Value') scores_before = jmx.read_attribute(des, 'Scores') - assert_true(no_cross_dc(scores_before, [node4, node5, node6]), - "Cross DC scores were present: " + str(scores_before)) + assert no_cross_dc(scores_before, [node4, node5, node6]), "Cross DC scores were present: " + str(scores_before) future = session.execute_async(read_stmt, [x]) future.result() scores_after = jmx.read_attribute(des, 'Scores') - assert_true(no_cross_dc(scores_after, [node4, node5, node6]), - "Cross DC scores were present: " + str(scores_after)) + assert no_cross_dc(scores_after, [node4, node5, node6]), "Cross DC scores were present: " + str(scores_after) if snitchable(scores_before, scores_after, [coordinator_node, healthy_node, degraded_node]): snitchable_count = snitchable_count + 1 # If the DES correctly routed the read around the degraded node, # it shouldn't have another completed read request in metrics - assert_equal(degraded_reads_before, + assert (degraded_reads_before == bad_jmx.read_attribute(read_stage, 'Value')) else: # sleep to give dynamic snitch time to recalculate scores @@ -196,4 +195,4 @@ class TestDynamicEndpointSnitch(Tester): # check that most reads were snitchable, with some # room allowed in case score recalculation is slow - assert_greater_equal(snitchable_count, 250) + assert snitchable_count >= 250 http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/sslnodetonode_test.py ---------------------------------------------------------------------- diff --git a/sslnodetonode_test.py b/sslnodetonode_test.py index 4c4a188..c34fa11 100644 --- a/sslnodetonode_test.py +++ b/sslnodetonode_test.py @@ -2,10 +2,14 @@ import os import os.path import shutil import time +import pytest +import logging from dtest import Tester from tools import sslkeygen -from tools.decorators import since + +since = pytest.mark.since +logger = logging.getLogger(__name__) # as the error message logged will be different per netty ssl implementation (jdk vs openssl (libre vs boring vs ...)), # the best we can do is just look for a SSLHandshakeException @@ -16,9 +20,8 @@ _LOG_ERR_GENERAL = "javax.net.ssl.SSLException" @since('3.6') class TestNodeToNodeSSLEncryption(Tester): - def ssl_enabled_test(self): + def test_ssl_enabled(self): """Should be able to start with valid ssl options""" - credNode1 = sslkeygen.generate_credentials("127.0.0.1") credNode2 = sslkeygen.generate_credentials("127.0.0.2", credNode1.cakeystore, credNode1.cacert) @@ -26,21 +29,19 @@ class TestNodeToNodeSSLEncryption(Tester): self.cluster.start() self.cql_connection(self.node1) - def ssl_correct_hostname_with_validation_test(self): + def test_ssl_correct_hostname_with_validation(self): """Should be able to start with valid ssl options""" - credNode1 = sslkeygen.generate_credentials("127.0.0.1") credNode2 = sslkeygen.generate_credentials("127.0.0.2", credNode1.cakeystore, credNode1.cacert) self.setup_nodes(credNode1, credNode2, endpoint_verification=True) - self.allow_log_errors = False + self.fixture_dtest_setup.allow_log_errors = False self.cluster.start() time.sleep(2) self.cql_connection(self.node1) - def ssl_wrong_hostname_no_validation_test(self): + def test_ssl_wrong_hostname_no_validation(self): """Should be able to start with valid ssl options""" - credNode1 = sslkeygen.generate_credentials("127.0.0.80") credNode2 = sslkeygen.generate_credentials("127.0.0.81", credNode1.cakeystore, credNode1.cacert) @@ -49,49 +50,46 @@ class TestNodeToNodeSSLEncryption(Tester): time.sleep(2) self.cql_connection(self.node1) - def ssl_wrong_hostname_with_validation_test(self): + def test_ssl_wrong_hostname_with_validation(self): """Should be able to start with valid ssl options""" - credNode1 = sslkeygen.generate_credentials("127.0.0.80") credNode2 = sslkeygen.generate_credentials("127.0.0.81", credNode1.cakeystore, credNode1.cacert) self.setup_nodes(credNode1, credNode2, endpoint_verification=True) - self.allow_log_errors = True + self.fixture_dtest_setup.allow_log_errors = True self.cluster.start(no_wait=True) found = self._grep_msg(self.node1, _LOG_ERR_HANDSHAKE, _LOG_ERR_GENERAL) - self.assertTrue(found) + assert found found = self._grep_msg(self.node2, _LOG_ERR_HANDSHAKE, _LOG_ERR_GENERAL) - self.assertTrue(found) + assert found self.cluster.stop() - def ssl_client_auth_required_fail_test(self): + def test_ssl_client_auth_required_fail(self): """peers need to perform mutual auth (cient auth required), but do not supply the local cert""" - credNode1 = sslkeygen.generate_credentials("127.0.0.1") credNode2 = sslkeygen.generate_credentials("127.0.0.2") self.setup_nodes(credNode1, credNode2, client_auth=True) - self.allow_log_errors = True + self.fixture_dtest_setup.allow_log_errors = True self.cluster.start(no_wait=True) time.sleep(2) found = self._grep_msg(self.node1, _LOG_ERR_HANDSHAKE, _LOG_ERR_GENERAL) - self.assertTrue(found) + assert found found = self._grep_msg(self.node2, _LOG_ERR_HANDSHAKE, _LOG_ERR_GENERAL) - self.assertTrue(found) + assert found self.cluster.stop() - self.assertTrue(found) + assert found - def ssl_client_auth_required_succeed_test(self): + def test_ssl_client_auth_required_succeed(self): """peers need to perform mutual auth (cient auth required), but do not supply the loca cert""" - credNode1 = sslkeygen.generate_credentials("127.0.0.1") credNode2 = sslkeygen.generate_credentials("127.0.0.2", credNode1.cakeystore, credNode1.cacert) sslkeygen.import_cert(credNode1.basedir, 'ca127.0.0.2', credNode2.cacert, credNode1.cakeystore) @@ -102,23 +100,22 @@ class TestNodeToNodeSSLEncryption(Tester): self.cluster.start() self.cql_connection(self.node1) - def ca_mismatch_test(self): + def test_ca_mismatch(self): """CA mismatch should cause nodes to fail to connect""" - credNode1 = sslkeygen.generate_credentials("127.0.0.1") credNode2 = sslkeygen.generate_credentials("127.0.0.2") # mismatching CA! self.setup_nodes(credNode1, credNode2) - self.allow_log_errors = True + self.fixture_dtest_setup.allow_log_errors = True self.cluster.start(no_wait=True) found = self._grep_msg(self.node1, _LOG_ERR_HANDSHAKE) self.cluster.stop() - self.assertTrue(found) + assert found @since('4.0') - def optional_outbound_tls_test(self): + def test_optional_outbound_tls(self): """listen on TLS port, but optionally connect using TLS. this supports the upgrade case of starting with a non-encrypted cluster and then upgrading each node to use encryption. @jira_ticket CASSANDRA-10404 --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org