Repository: cassandra-dtest Updated Branches: refs/heads/master 6d5ee3792 -> ac9c95607
Add tests for mixed version batchlog replay Patch by Jeff Jirsa; reviewed by Aleksey Yeschenko Project: http://git-wip-us.apache.org/repos/asf/cassandra-dtest/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra-dtest/commit/ac9c9560 Tree: http://git-wip-us.apache.org/repos/asf/cassandra-dtest/tree/ac9c9560 Diff: http://git-wip-us.apache.org/repos/asf/cassandra-dtest/diff/ac9c9560 Branch: refs/heads/master Commit: ac9c95607ce439de596da41c368d79c67d6dcdda Parents: 6d5ee37 Author: Jeff Jirsa <jji...@apple.com> Authored: Mon Aug 14 12:55:17 2017 -0700 Committer: Aleksey Yeschenko <alek...@yeschenko.com> Committed: Sat Aug 26 01:21:00 2017 +0100 ---------------------------------------------------------------------- batch_test.py | 96 +++++++++++++++++++++++------- byteman/fail_after_batchlog_write.btm | 19 ++++++ 2 files changed, 94 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/ac9c9560/batch_test.py ---------------------------------------------------------------------- diff --git a/batch_test.py b/batch_test.py index 4194f10..5c25c46 100644 --- a/batch_test.py +++ b/batch_test.py @@ -1,6 +1,7 @@ import sys import time from unittest import skipIf +from nose.tools import assert_greater_equal from cassandra import ConsistencyLevel, Timeout, Unavailable from cassandra.query import SimpleStatement @@ -9,6 +10,8 @@ from dtest import Tester, create_ks, debug from tools.assertions import (assert_all, assert_invalid, assert_one, assert_unavailable) from tools.decorators import since +from tools.jmxutils import (JolokiaAgent, make_mbean, + remove_perf_disable_shared_mem) class TestBatch(Tester): @@ -295,6 +298,15 @@ class TestBatch(Tester): self._logged_batch_compatibility_test(0, 1, 'github:apache/cassandra-2.2', 2, 4) @since('3.0', max_version='3.x') + def batchlog_replay_compatibility_1_test(self): + """ + @jira_ticket CASSANDRA-9673, test that logged batches still work with a mixed version cluster. + + Here we have one 3.0/3.x node and two 2.2 nodes and we send the batch request to the 3.0 node. + """ + self._batchlog_replay_compatibility_test(0, 1, 'github:apache/cassandra-2.2', 2, 4) + + @since('3.0', max_version='3.x') @skipIf(sys.platform == 'win32', 'Windows production support only on 2.2+') def logged_batch_compatibility_2_test(self): """ @@ -324,6 +336,15 @@ class TestBatch(Tester): self._logged_batch_compatibility_test(2, 2, 'github:apache/cassandra-2.2', 1, 4) @since('3.0', max_version='3.x') + def batchlog_replay_compatibility_4_test(self): + """ + @jira_ticket CASSANDRA-9673, test that logged batches still work with a mixed version cluster. + + Here we have two 3.0/3.x nodes and one 2.2 node and we send the batch request to the 2.2 node. + """ + self._batchlog_replay_compatibility_test(2, 2, 'github:apache/cassandra-2.2', 1, 4) + + @since('3.0', max_version='3.x') @skipIf(sys.platform == 'win32', 'Windows production support only on 2.2+') def logged_batch_compatibility_5_test(self): """ @@ -346,6 +367,43 @@ class TestBatch(Tester): res = sorted(rows) self.assertEquals([[0, 'Jack', 'Sparrow'], [1, 'Will', 'Turner']], [list(res[0]), list(res[1])]) + def _batchlog_replay_compatibility_test(self, coordinator_idx, current_nodes, previous_version, previous_nodes, protocol_version): + session = self.prepare_mixed(coordinator_idx, current_nodes, previous_version, previous_nodes, + protocol_version=protocol_version, install_byteman=True) + + coordinator = self.cluster.nodelist()[coordinator_idx] + coordinator.byteman_submit(['./byteman/fail_after_batchlog_write.btm']) + debug("Injected byteman scripts to enable batchlog replay {}".format(coordinator.name)) + + query = """ + BEGIN BATCH + INSERT INTO users (id, firstname, lastname) VALUES (0, 'Jack', 'Sparrow') + INSERT INTO users (id, firstname, lastname) VALUES (1, 'Will', 'Turner') + APPLY BATCH + """ + session.execute(query) + + total_batches_replayed = 0 + blm = make_mbean('db', type='BatchlogManager') + + for n in self.cluster.nodelist(): + if n == coordinator: + continue + + with JolokiaAgent(n) as jmx: + debug('Forcing batchlog replay for {}'.format(n.name)) + jmx.execute_method(blm, 'forceBatchlogReplay') + batches_replayed = jmx.read_attribute(blm, 'TotalBatchesReplayed') + debug('{} batches replayed on node {}'.format(batches_replayed, n.name)) + total_batches_replayed += batches_replayed + + assert_greater_equal(total_batches_replayed, 2) + + for node in self.cluster.nodelist(): + session = self.patient_exclusive_cql_connection(node, protocol_version=protocol_version) + rows = sorted(session.execute('SELECT id, firstname, lastname FROM ks.users')) + self.assertEqual([[0, 'Jack', 'Sparrow'], [1, 'Will', 'Turner']], [list(rows[0]), list(rows[1])]) + def assert_timedout(self, session, query, cl, acknowledged_by=None, received_responses=None): try: @@ -366,15 +424,17 @@ class TestBatch(Tester): else: assert False, "Expecting TimedOutException but no exception was raised" - def prepare(self, nodes=1, compression=True, version=None, protocol_version=None): - if not self.cluster.nodelist(): - self.cluster.populate(nodes) - if version: - for node in self.cluster.nodelist(): - node.set_install_dir(version=version) - debug("Set cassandra dir for {} to {}".format(node.name, node.get_install_dir())) + def prepare(self, nodes=1, compression=True, version=None, protocol_version=None, install_byteman=False): + if version: + self.cluster.set_install_dir(version=version) + debug("Set cassandra dir to {}".format(self.cluster.get_install_dir())) + + self.cluster.populate(nodes, install_byteman=install_byteman) + + for n in self.cluster.nodelist(): + remove_perf_disable_shared_mem(n) - self.cluster.start(wait_other_notice=True) + self.cluster.start(wait_other_notice=True) node1 = self.cluster.nodelist()[0] session = self.patient_cql_connection(node1, protocol_version=protocol_version) @@ -405,13 +465,12 @@ class TestBatch(Tester): time.sleep(.5) - def prepare_mixed(self, coordinator_idx, current_nodes, previous_version, previous_nodes, compression=True, protocol_version=None): - + def prepare_mixed(self, coordinator_idx, current_nodes, previous_version, previous_nodes, compression=True, protocol_version=None, install_byteman=False): debug("Testing with {} node(s) at version '{}', {} node(s) at current version" .format(previous_nodes, previous_version, current_nodes)) # start a cluster using the previous version - self.prepare(previous_nodes + current_nodes, compression, previous_version, protocol_version=protocol_version) + self.prepare(previous_nodes + current_nodes, compression, previous_version, protocol_version=protocol_version, install_byteman=install_byteman) # then upgrade the current nodes to the current version but not the previous nodes for i in xrange(current_nodes): @@ -426,18 +485,13 @@ class TestBatch(Tester): """ Upgrade a node to the current version """ - debug('Upgrading {}'.format(node.name)) - - debug('Shutting down node: ' + node.name) - node.drain() - node.watch_log_for("DRAINED") + debug('Upgrading {} to the current version'.format(node.name)) + debug('Shutting down {}'.format(node.name)) node.stop(wait_other_notice=False) - self.set_node_to_current_version(node) - debug("Set new cassandra dir for {}: {}".format(node.name, node.get_install_dir())) - + debug("Set cassandra dir for {} to {}".format(node.name, node.get_install_dir())) + # needed for jmx + remove_perf_disable_shared_mem(node) # Restart nodes on new version debug('Starting {} on new version ({})'.format(node.name, node.get_cassandra_version())) node.start(wait_other_notice=True, wait_for_binary_proto=True) - debug('Upgrading sstables') - node.nodetool('upgradesstables -a') http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/ac9c9560/byteman/fail_after_batchlog_write.btm ---------------------------------------------------------------------- diff --git a/byteman/fail_after_batchlog_write.btm b/byteman/fail_after_batchlog_write.btm new file mode 100644 index 0000000..8574b00 --- /dev/null +++ b/byteman/fail_after_batchlog_write.btm @@ -0,0 +1,19 @@ +# +# Inject node failure immediately after batchlog write. +# Method signature required in 3.x to avoid pausing before legacy mutations sent +# +RULE skip writing batched mutations +CLASS org.apache.cassandra.service.StorageProxy +METHOD syncWriteBatchedMutations +AT ENTRY +IF TRUE +DO return +ENDRULE + +RULE skip removing from batchlog +CLASS org.apache.cassandra.service.StorageProxy +METHOD asyncRemoveFromBatchlog +AT ENTRY +IF TRUE +DO return +ENDRULE --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org