http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/materialized_views_test.py
----------------------------------------------------------------------
diff --git a/materialized_views_test.py b/materialized_views_test.py
index 22c69c8..8d38ee8 100644
--- a/materialized_views_test.py
+++ b/materialized_views_test.py
@@ -3,30 +3,34 @@ import re
 import sys
 import time
 import traceback
+import pytest
+import threading
+import logging
+
+from flaky import flaky
+from enum import Enum
+from queue import Empty
 from functools import partial
 from multiprocessing import Process, Queue
-from unittest import skip, skipIf
 
 from cassandra import ConsistencyLevel, InvalidRequest, WriteFailure
 from cassandra.cluster import NoHostAvailable
 from cassandra.concurrent import execute_concurrent_with_args
 from cassandra.cluster import Cluster
 from cassandra.query import SimpleStatement
-# TODO add in requirements.txt
-from enum import Enum  # Remove when switching to py3
-from nose.plugins.attrib import attr
-from nose.tools import (assert_equal)
 
 from distutils.version import LooseVersion
-from dtest import Tester, debug, get_ip_from_node, create_ks, 
supports_v5_protocol
+from dtest import Tester, get_ip_from_node, create_ks
 from tools.assertions import (assert_all, assert_crc_check_chance_equal,
                               assert_invalid, assert_none, assert_one,
                               assert_unavailable)
 from tools.data import rows_to_list
-from tools.decorators import since
 from tools.misc import new_node
 from tools.jmxutils import (JolokiaAgent, make_mbean, 
remove_perf_disable_shared_mem)
 
+since = pytest.mark.since
+logger = logging.getLogger(__name__)
+
 # CASSANDRA-10978. Migration wait (in seconds) to use in bootstrapping tests. 
Needed to handle
 # pathological case of flushing schema keyspace for multiple data directories. 
See CASSANDRA-6696
 # for multiple data directory changes and CASSANDRA-10421 for compaction 
logging that must be
@@ -34,6 +38,7 @@ from tools.jmxutils import (JolokiaAgent, make_mbean, 
remove_perf_disable_shared
 MIGRATION_WAIT = 5
 
 
+@flaky
 @since('3.0')
 class TestMaterializedViews(Tester):
     """
@@ -80,7 +85,7 @@ class TestMaterializedViews(Tester):
             self.cluster.compact()
 
     def _settle_nodes(self):
-        debug("Settling all nodes")
+        logger.debug("Settling all nodes")
         stage_match = 
re.compile("(?P<name>\S+)\s+(?P<active>\d+)\s+(?P<pending>\d+)\s+(?P<completed>\d+)\s+(?P<blocked>\d+)\s+(?P<alltimeblocked>\d+)")
 
         def _settled_stages(node):
@@ -92,7 +97,7 @@ class TestMaterializedViews(Tester):
                     active = int(match.group('active'))
                     pending = int(match.group('pending'))
                     if active != 0 or pending != 0:
-                        debug("%s - pool %s still has %d active and %d 
pending" % (node.name, match.group("name"), active, pending))
+                        logger.debug("%s - pool %s still has %d active and %d 
pending" % (node.name, match.group("name"), active, pending))
                         return False
             return True
 
@@ -111,7 +116,7 @@ class TestMaterializedViews(Tester):
             return 'system.views_builds_in_progress'
 
     def _wait_for_view(self, ks, view):
-        debug("waiting for view")
+        logger.debug("waiting for view")
 
         def _view_build_finished(node):
             s = self.patient_exclusive_cql_connection(node)
@@ -137,7 +142,7 @@ class TestMaterializedViews(Tester):
                 query = "SELECT COUNT(*) FROM %s WHERE keyspace_name='%s' AND 
view_name='%s'" %\
                         (self._build_progress_table(), ks, view)
                 result = list(session.execute(query))
-                self.assertEqual(result[0].count, 0)
+                assert 0 == result[0].count
             except AssertionError:
                 break
 
@@ -157,21 +162,20 @@ class TestMaterializedViews(Tester):
     def _replay_batchlogs(self):
         for node in self.cluster.nodelist():
             if node.is_running():
-                debug("Replaying batchlog on node {}".format(node.name))
+                logger.debug("Replaying batchlog on node {}".format(node.name))
                 node.nodetool("replaybatchlog")
                 # CASSANDRA-13069 - Ensure replayed mutations are removed from 
the batchlog
                 node_session = self.patient_exclusive_cql_connection(node)
                 result = list(node_session.execute("SELECT count(*) FROM 
system.batches;"))
-                self.assertEqual(result[0].count, 0)
+                assert result[0].count == 0
 
-    def create_test(self):
+    def test_create(self):
         """Test the materialized view creation"""
-
         session = self.prepare(user_table=True)
 
         result = list(session.execute(("SELECT * FROM system_schema.views "
                                        "WHERE keyspace_name='ks' AND 
base_table_name='users' ALLOW FILTERING")))
-        self.assertEqual(len(result), 1, "Expecting 1 materialized view, got" 
+ str(result))
+        assert len(result) == 1, "Expecting 1 materialized view == got" + 
str(result)
 
     def test_gcgs_validation(self):
         """Verify that it's not possible to create or set a too low 
gc_grace_seconds on MVs"""
@@ -211,73 +215,69 @@ class TestMaterializedViews(Tester):
                        "updates. Setting gc_grace_seconds too low might cause 
undelivered updates"
                        " to expire before being replayed.")
 
-    def insert_test(self):
+    def test_insert(self):
         """Test basic insertions"""
-
         session = self.prepare(user_table=True)
 
         self._insert_data(session)
 
         result = list(session.execute("SELECT * FROM users;"))
-        self.assertEqual(len(result), 4, "Expecting {} users, got 
{}".format(4, len(result)))
+        assert len(result) == 4, "Expecting {} users, got {}".format(4 == 
len(result))
 
         result = list(session.execute("SELECT * FROM users_by_state WHERE 
state='TX';"))
-        self.assertEqual(len(result), 2, "Expecting {} users, got 
{}".format(2, len(result)))
+        assert len(result) == 2, "Expecting {} users, got {}".format(2 == 
len(result))
 
         result = list(session.execute("SELECT * FROM users_by_state WHERE 
state='CA';"))
-        self.assertEqual(len(result), 1, "Expecting {} users, got 
{}".format(1, len(result)))
+        assert len(result) == 1, "Expecting {} users, got {}".format(1 == 
len(result))
 
         result = list(session.execute("SELECT * FROM users_by_state WHERE 
state='MA';"))
-        self.assertEqual(len(result), 0, "Expecting {} users, got 
{}".format(0, len(result)))
+        assert len(result) == 0, "Expecting {} users, got {}".format(0 == 
len(result))
 
-    def populate_mv_after_insert_test(self):
+    def test_populate_mv_after_insert(self):
         """Test that a view is OK when created with existing data"""
-
         session = self.prepare(consistency_level=ConsistencyLevel.QUORUM)
 
         session.execute("CREATE TABLE t (id int PRIMARY KEY, v int)")
 
-        for i in xrange(1000):
+        for i in range(1000):
             session.execute("INSERT INTO t (id, v) VALUES ({v}, 
{v})".format(v=i))
 
         session.execute(("CREATE MATERIALIZED VIEW t_by_v AS SELECT * FROM t 
WHERE v IS NOT NULL "
                          "AND id IS NOT NULL PRIMARY KEY (v, id)"))
 
-        debug("wait for view to build")
+        logger.debug("wait for view to build")
         self._wait_for_view("ks", "t_by_v")
 
-        debug("wait that all batchlogs are replayed")
+        logger.debug("wait that all batchlogs are replayed")
         self._replay_batchlogs()
 
-        for i in xrange(1000):
+        for i in range(1000):
             assert_one(session, "SELECT * FROM t_by_v WHERE v = {}".format(i), 
[i, i])
 
-    def populate_mv_after_insert_wide_rows_test(self):
+    def test_populate_mv_after_insert_wide_rows(self):
         """Test that a view is OK when created with existing data with wide 
rows"""
-
         session = self.prepare(consistency_level=ConsistencyLevel.QUORUM)
 
         session.execute("CREATE TABLE t (id int, v int, PRIMARY KEY (id, v))")
 
-        for i in xrange(5):
-            for j in xrange(10000):
+        for i in range(5):
+            for j in range(10000):
                 session.execute("INSERT INTO t (id, v) VALUES ({}, 
{})".format(i, j))
 
         session.execute(("CREATE MATERIALIZED VIEW t_by_v AS SELECT * FROM t 
WHERE v IS NOT NULL "
                          "AND id IS NOT NULL PRIMARY KEY (v, id)"))
 
-        debug("wait for view to build")
+        logger.debug("wait for view to build")
         self._wait_for_view("ks", "t_by_v")
 
-        debug("wait that all batchlogs are replayed")
+        logger.debug("wait that all batchlogs are replayed")
         self._replay_batchlogs()
-        for i in xrange(5):
-            for j in xrange(10000):
+        for i in range(5):
+            for j in range(10000):
                 assert_one(session, "SELECT * FROM t_by_v WHERE id = {} AND v 
= {}".format(i, j), [j, i])
 
-    def crc_check_chance_test(self):
+    def test_crc_check_chance(self):
         """Test that crc_check_chance parameter is properly populated after mv 
creation and update"""
-
         session = self.prepare()
 
         session.execute("CREATE TABLE t (id int PRIMARY KEY, v int)")
@@ -290,9 +290,8 @@ class TestMaterializedViews(Tester):
 
         assert_crc_check_chance_equal(session, "t_by_v", 0.3, view=True)
 
-    def prepared_statement_test(self):
+    def test_prepared_statement(self):
         """Test basic insertions with prepared statement"""
-
         session = self.prepare(user_table=True)
 
         insertPrepared = session.prepare(
@@ -309,20 +308,19 @@ class TestMaterializedViews(Tester):
         session.execute(insertPrepared.bind(('user4', 'ch@ngem3d', 'm', 'TX', 
1974)))
 
         result = list(session.execute("SELECT * FROM users;"))
-        self.assertEqual(len(result), 4, "Expecting {} users, got 
{}".format(4, len(result)))
+        assert len(result) == 4, "Expecting {} users, got {}".format(4, 
len(result))
 
         result = list(session.execute(selectPrepared.bind(['TX'])))
-        self.assertEqual(len(result), 2, "Expecting {} users, got 
{}".format(2, len(result)))
+        assert len(result) == 2, "Expecting {} users, got {}".format(2, 
len(result))
 
         result = list(session.execute(selectPrepared.bind(['CA'])))
-        self.assertEqual(len(result), 1, "Expecting {} users, got 
{}".format(1, len(result)))
+        assert len(result) == 1, "Expecting {} users, got {}".format(1, 
len(result))
 
         result = list(session.execute(selectPrepared.bind(['MA'])))
-        self.assertEqual(len(result), 0, "Expecting {} users, got 
{}".format(0, len(result)))
+        assert len(result) == 0, "Expecting {} users, got {}".format(0, 
len(result))
 
-    def immutable_test(self):
+    def test_immutable(self):
         """Test that a materialized view is immutable"""
-
         session = self.prepare(user_table=True)
 
         # cannot insert
@@ -345,9 +343,8 @@ class TestMaterializedViews(Tester):
         assert_invalid(session, "ALTER TABLE users_by_state ADD first_name 
varchar",
                        "Cannot use ALTER TABLE on Materialized View")
 
-    def drop_mv_test(self):
+    def test_drop_mv(self):
         """Test that we can drop a view properly"""
-
         session = self.prepare(user_table=True)
 
         # create another materialized view
@@ -357,22 +354,21 @@ class TestMaterializedViews(Tester):
 
         result = list(session.execute(("SELECT * FROM system_schema.views "
                                        "WHERE keyspace_name='ks' AND 
base_table_name='users' ALLOW FILTERING")))
-        self.assertEqual(len(result), 2, "Expecting {} materialized view, got 
{}".format(2, len(result)))
+        assert len(result) == 2, "Expecting {} materialized view, got 
{}".format(2, len(result))
 
         session.execute("DROP MATERIALIZED VIEW ks.users_by_state;")
 
         result = list(session.execute(("SELECT * FROM system_schema.views "
                                        "WHERE keyspace_name='ks' AND 
base_table_name='users' ALLOW FILTERING")))
-        self.assertEqual(len(result), 1, "Expecting {} materialized view, got 
{}".format(1, len(result)))
+        assert len(result) == 1, "Expecting {} materialized view, got 
{}".format(1, len(result))
 
-    def drop_column_test(self):
+    def test_drop_column(self):
         """Test that we cannot drop a column if it is used by a MV"""
-
         session = self.prepare(user_table=True)
 
         result = list(session.execute(("SELECT * FROM system_schema.views "
                                        "WHERE keyspace_name='ks' AND 
base_table_name='users' ALLOW FILTERING")))
-        self.assertEqual(len(result), 1, "Expecting {} materialized view, got 
{}".format(1, len(result)))
+        assert len(result) == 1, "Expecting {} materialized view, got 
{}".format(1, len(result))
 
         assert_invalid(
             session,
@@ -380,17 +376,13 @@ class TestMaterializedViews(Tester):
             "Cannot drop column state on base table with materialized views."
         )
 
-    def drop_table_test(self):
+    def test_drop_table(self):
         """Test that we cannot drop a table without deleting its MVs first"""
-
         session = self.prepare(user_table=True)
 
         result = list(session.execute(("SELECT * FROM system_schema.views "
                                        "WHERE keyspace_name='ks' AND 
base_table_name='users' ALLOW FILTERING")))
-        self.assertEqual(
-            len(result), 1,
-            "Expecting {} materialized view, got {}".format(1, len(result))
-        )
+        assert len(result) == 1, "Expecting {} materialized view, got 
{}".format(1, len(result))
 
         assert_invalid(
             session,
@@ -400,24 +392,17 @@ class TestMaterializedViews(Tester):
 
         result = list(session.execute(("SELECT * FROM system_schema.views "
                                        "WHERE keyspace_name='ks' AND 
base_table_name='users' ALLOW FILTERING")))
-        self.assertEqual(
-            len(result), 1,
-            "Expecting {} materialized view, got {}".format(1, len(result))
-        )
+        assert len(result) == 1, "Expecting {} materialized view, got 
{}".format(1, len(result))
 
         session.execute("DROP MATERIALIZED VIEW ks.users_by_state;")
         session.execute("DROP TABLE ks.users;")
 
         result = list(session.execute(("SELECT * FROM system_schema.views "
                                        "WHERE keyspace_name='ks' AND 
base_table_name='users' ALLOW FILTERING")))
-        self.assertEqual(
-            len(result), 0,
-            "Expecting {} materialized view, got {}".format(1, len(result))
-        )
+        assert len(result) == 0, "Expecting {} materialized view, got 
{}".format(1, len(result))
 
-    def clustering_column_test(self):
+    def test_clustering_column(self):
         """Test that we can use clustering columns as primary key for a 
materialized view"""
-
         session = self.prepare(consistency_level=ConsistencyLevel.QUORUM)
 
         session.execute(("CREATE TABLE users (username varchar, password 
varchar, gender varchar, "
@@ -434,10 +419,10 @@ class TestMaterializedViews(Tester):
         self._insert_data(session)
 
         result = list(session.execute("SELECT * FROM 
ks.users_by_state_birth_year WHERE state='TX'"))
-        self.assertEqual(len(result), 2, "Expecting {} users, got 
{}".format(2, len(result)))
+        assert len(result) == 2, "Expecting {} users, got {}".format(2, 
len(result))
 
         result = list(session.execute("SELECT * FROM 
ks.users_by_state_birth_year WHERE state='TX' AND birth_year=1968"))
-        self.assertEqual(len(result), 1, "Expecting {} users, got 
{}".format(1, len(result)))
+        assert len(result) == 1, "Expecting {} users, got {}".format(1, 
len(result))
 
     def _add_dc_after_mv_test(self, rf):
         """
@@ -448,47 +433,47 @@ class TestMaterializedViews(Tester):
 
         session = self.prepare(rf=rf)
 
-        debug("Creating schema")
+        logger.debug("Creating schema")
         session.execute("CREATE TABLE t (id int PRIMARY KEY, v int)")
         session.execute(("CREATE MATERIALIZED VIEW t_by_v AS SELECT * FROM t "
                          "WHERE v IS NOT NULL AND id IS NOT NULL PRIMARY KEY 
(v, id)"))
 
-        debug("Writing 1k to base")
-        for i in xrange(1000):
+        logger.debug("Writing 1k to base")
+        for i in range(1000):
             session.execute("INSERT INTO t (id, v) VALUES ({id}, 
{v})".format(id=i, v=-i))
 
-        debug("Reading 1k from view")
-        for i in xrange(1000):
+        logger.debug("Reading 1k from view")
+        for i in range(1000):
             assert_one(session, "SELECT * FROM t_by_v WHERE v = 
{}".format(-i), [-i, i])
 
-        debug("Reading 1k from base")
-        for i in xrange(1000):
+        logger.debug("Reading 1k from base")
+        for i in range(1000):
             assert_one(session, "SELECT * FROM t WHERE id = {}".format(i), [i, 
-i])
 
-        debug("Bootstrapping new node in another dc")
+        logger.debug("Bootstrapping new node in another dc")
         node4 = new_node(self.cluster, data_center='dc2')
         node4.start(wait_other_notice=True, wait_for_binary_proto=True, 
jvm_args=["-Dcassandra.migration_task_wait_in_seconds={}".format(MIGRATION_WAIT)])
 
-        debug("Bootstrapping new node in another dc")
+        logger.debug("Bootstrapping new node in another dc")
         node5 = new_node(self.cluster, remote_debug_port='1414', 
data_center='dc2')
         
node5.start(jvm_args=["-Dcassandra.migration_task_wait_in_seconds={}".format(MIGRATION_WAIT)])
 
         session2 = self.patient_exclusive_cql_connection(node4)
 
-        debug("Verifying data from new node in view")
-        for i in xrange(1000):
+        logger.debug("Verifying data from new node in view")
+        for i in range(1000):
             assert_one(session2, "SELECT * FROM ks.t_by_v WHERE v = 
{}".format(-i), [-i, i])
 
-        debug("Inserting 100 into base")
-        for i in xrange(1000, 1100):
+        logger.debug("Inserting 100 into base")
+        for i in range(1000, 1100):
             session.execute("INSERT INTO t (id, v) VALUES ({id}, 
{v})".format(id=i, v=-i))
 
-        debug("Verify 100 in view")
-        for i in xrange(1000, 1100):
+        logger.debug("Verify 100 in view")
+        for i in range(1000, 1100):
             assert_one(session, "SELECT * FROM t_by_v WHERE v = 
{}".format(-i), [-i, i])
 
-    @attr('resource-intensive')
-    def add_dc_after_mv_simple_replication_test(self):
+    @pytest.mark.resource_intensive
+    def test_add_dc_after_mv_simple_replication(self):
         """
         @jira_ticket CASSANDRA-10634
 
@@ -497,8 +482,8 @@ class TestMaterializedViews(Tester):
 
         self._add_dc_after_mv_test(1)
 
-    @attr('resource-intensive')
-    def add_dc_after_mv_network_replication_test(self):
+    @pytest.mark.resource_intensive
+    def test_add_dc_after_mv_network_replication(self):
         """
         @jira_ticket CASSANDRA-10634
 
@@ -507,8 +492,8 @@ class TestMaterializedViews(Tester):
 
         self._add_dc_after_mv_test({'dc1': 1, 'dc2': 1})
 
-    @attr('resource-intensive')
-    def add_node_after_mv_test(self):
+    @pytest.mark.resource_intensive
+    def test_add_node_after_mv(self):
         """
         @jira_ticket CASSANDRA-10978
 
@@ -521,10 +506,10 @@ class TestMaterializedViews(Tester):
         session.execute(("CREATE MATERIALIZED VIEW t_by_v AS SELECT * FROM t "
                          "WHERE v IS NOT NULL AND id IS NOT NULL PRIMARY KEY 
(v, id)"))
 
-        for i in xrange(1000):
+        for i in range(1000):
             session.execute("INSERT INTO t (id, v) VALUES ({id}, 
{v})".format(id=i, v=-i))
 
-        for i in xrange(1000):
+        for i in range(1000):
             assert_one(session, "SELECT * FROM t_by_v WHERE v = 
{}".format(-i), [-i, i])
 
         node4 = new_node(self.cluster)
@@ -539,17 +524,17 @@ class TestMaterializedViews(Tester):
         """
         assert_one(session2, "SELECT count(*) FROM system.built_views WHERE 
keyspace_name = 'ks' AND view_name = 't_by_v'", [1])
 
-        for i in xrange(1000):
+        for i in range(1000):
             assert_one(session2, "SELECT * FROM ks.t_by_v WHERE v = 
{}".format(-i), [-i, i])
 
-        for i in xrange(1000, 1100):
+        for i in range(1000, 1100):
             session.execute("INSERT INTO t (id, v) VALUES ({id}, 
{v})".format(id=i, v=-i))
 
-        for i in xrange(1000, 1100):
+        for i in range(1000, 1100):
             assert_one(session, "SELECT * FROM t_by_v WHERE v = 
{}".format(-i), [-i, i])
 
-    @attr('resource-intensive')
-    def add_node_after_wide_mv_with_range_deletions_test(self):
+    @pytest.mark.resource_intensive
+    def test_add_node_after_wide_mv_with_range_deletions(self):
         """
         @jira_ticket CASSANDRA-11670
 
@@ -562,26 +547,26 @@ class TestMaterializedViews(Tester):
         session.execute(("CREATE MATERIALIZED VIEW t_by_v AS SELECT * FROM t "
                          "WHERE v IS NOT NULL AND id IS NOT NULL PRIMARY KEY 
(v, id)"))
 
-        for i in xrange(10):
-            for j in xrange(100):
+        for i in range(10):
+            for j in range(100):
                 session.execute("INSERT INTO t (id, v) VALUES ({id}, 
{v})".format(id=i, v=j))
 
         self.cluster.flush()
 
-        for i in xrange(10):
-            for j in xrange(100):
+        for i in range(10):
+            for j in range(100):
                 assert_one(session, "SELECT * FROM t WHERE id = {} and v = 
{}".format(i, j), [i, j])
                 assert_one(session, "SELECT * FROM t_by_v WHERE id = {} and v 
= {}".format(i, j), [j, i])
 
-        for i in xrange(10):
-            for j in xrange(100):
+        for i in range(10):
+            for j in range(100):
                 if j % 10 == 0:
                     session.execute("DELETE FROM t WHERE id = {} AND v >= {} 
and v < {}".format(i, j, j + 2))
 
         self.cluster.flush()
 
-        for i in xrange(10):
-            for j in xrange(100):
+        for i in range(10):
+            for j in range(100):
                 if j % 10 == 0 or (j - 1) % 10 == 0:
                     assert_none(session, "SELECT * FROM t WHERE id = {} and v 
= {}".format(i, j))
                     assert_none(session, "SELECT * FROM t_by_v WHERE id = {} 
and v = {}".format(i, j))
@@ -591,13 +576,13 @@ class TestMaterializedViews(Tester):
 
         node4 = new_node(self.cluster)
         node4.set_configuration_options(values={'max_mutation_size_in_kb': 
20})  # CASSANDRA-11670
-        debug("Start join at {}".format(time.strftime("%H:%M:%S")))
+        logger.debug("Start join at {}".format(time.strftime("%H:%M:%S")))
         node4.start(wait_for_binary_proto=True, 
jvm_args=["-Dcassandra.migration_task_wait_in_seconds={}".format(MIGRATION_WAIT)])
 
         session2 = self.patient_exclusive_cql_connection(node4)
 
-        for i in xrange(10):
-            for j in xrange(100):
+        for i in range(10):
+            for j in range(100):
                 if j % 10 == 0 or (j - 1) % 10 == 0:
                     assert_none(session2, "SELECT * FROM ks.t WHERE id = {} 
and v = {}".format(i, j))
                     assert_none(session2, "SELECT * FROM ks.t_by_v WHERE id = 
{} and v = {}".format(i, j))
@@ -605,12 +590,12 @@ class TestMaterializedViews(Tester):
                     assert_one(session2, "SELECT * FROM ks.t WHERE id = {} and 
v = {}".format(i, j), [i, j])
                     assert_one(session2, "SELECT * FROM ks.t_by_v WHERE id = 
{} and v = {}".format(i, j), [j, i])
 
-        for i in xrange(10):
-            for j in xrange(100, 110):
+        for i in range(10):
+            for j in range(100, 110):
                 session.execute("INSERT INTO t (id, v) VALUES ({id}, 
{v})".format(id=i, v=j))
 
-        for i in xrange(10):
-            for j in xrange(110):
+        for i in range(10):
+            for j in range(110):
                 if j < 100 and (j % 10 == 0 or (j - 1) % 10 == 0):
                     assert_none(session2, "SELECT * FROM ks.t WHERE id = {} 
and v = {}".format(i, j))
                     assert_none(session2, "SELECT * FROM ks.t_by_v WHERE id = 
{} and v = {}".format(i, j))
@@ -618,8 +603,8 @@ class TestMaterializedViews(Tester):
                     assert_one(session2, "SELECT * FROM ks.t WHERE id = {} and 
v = {}".format(i, j), [i, j])
                     assert_one(session2, "SELECT * FROM ks.t_by_v WHERE id = 
{} and v = {}".format(i, j), [j, i])
 
-    @attr('resource-intensive')
-    def add_node_after_very_wide_mv_test(self):
+    @pytest.mark.resource_intensive
+    def test_add_node_after_very_wide_mv(self):
         """
         @jira_ticket CASSANDRA-11670
 
@@ -632,37 +617,37 @@ class TestMaterializedViews(Tester):
         session.execute(("CREATE MATERIALIZED VIEW t_by_v AS SELECT * FROM t "
                          "WHERE v IS NOT NULL AND id IS NOT NULL PRIMARY KEY 
(v, id)"))
 
-        for i in xrange(5):
-            for j in xrange(5000):
+        for i in range(5):
+            for j in range(5000):
                 session.execute("INSERT INTO t (id, v) VALUES ({id}, 
{v})".format(id=i, v=j))
 
         self.cluster.flush()
 
-        for i in xrange(5):
-            for j in xrange(5000):
+        for i in range(5):
+            for j in range(5000):
                 assert_one(session, "SELECT * FROM t_by_v WHERE id = {} and v 
= {}".format(i, j), [j, i])
 
         node4 = new_node(self.cluster)
         node4.set_configuration_options(values={'max_mutation_size_in_kb': 
20})  # CASSANDRA-11670
-        debug("Start join at {}".format(time.strftime("%H:%M:%S")))
+        logger.debug("Start join at {}".format(time.strftime("%H:%M:%S")))
         node4.start(wait_for_binary_proto=True, 
jvm_args=["-Dcassandra.migration_task_wait_in_seconds={}".format(MIGRATION_WAIT)])
 
         session2 = self.patient_exclusive_cql_connection(node4)
 
-        for i in xrange(5):
-            for j in xrange(5000):
+        for i in range(5):
+            for j in range(5000):
                 assert_one(session2, "SELECT * FROM ks.t_by_v WHERE id = {} 
and v = {}".format(i, j), [j, i])
 
-        for i in xrange(5):
-            for j in xrange(5100):
+        for i in range(5):
+            for j in range(5100):
                 session.execute("INSERT INTO t (id, v) VALUES ({id}, 
{v})".format(id=i, v=j))
 
-        for i in xrange(5):
-            for j in xrange(5100):
+        for i in range(5):
+            for j in range(5100):
                 assert_one(session, "SELECT * FROM t_by_v WHERE id = {} and v 
= {}".format(i, j), [j, i])
 
-    @attr('resource-intensive')
-    def add_write_survey_node_after_mv_test(self):
+    @pytest.mark.resource_intensive
+    def test_add_write_survey_node_after_mv(self):
         """
         @jira_ticket CASSANDRA-10621
         @jira_ticket CASSANDRA-10978
@@ -676,24 +661,23 @@ class TestMaterializedViews(Tester):
         session.execute(("CREATE MATERIALIZED VIEW t_by_v AS SELECT * FROM t "
                          "WHERE v IS NOT NULL AND id IS NOT NULL PRIMARY KEY 
(v, id)"))
 
-        for i in xrange(1000):
+        for i in range(1000):
             session.execute("INSERT INTO t (id, v) VALUES ({id}, 
{v})".format(id=i, v=-i))
 
-        for i in xrange(1000):
+        for i in range(1000):
             assert_one(session, "SELECT * FROM t_by_v WHERE v = 
{}".format(-i), [-i, i])
 
         node4 = new_node(self.cluster)
         node4.start(wait_for_binary_proto=True, 
jvm_args=["-Dcassandra.write_survey=true", 
"-Dcassandra.migration_task_wait_in_seconds={}".format(MIGRATION_WAIT)])
 
-        for i in xrange(1000, 1100):
+        for i in range(1000, 1100):
             session.execute("INSERT INTO t (id, v) VALUES ({id}, 
{v})".format(id=i, v=-i))
 
-        for i in xrange(1100):
+        for i in range(1100):
             assert_one(session, "SELECT * FROM t_by_v WHERE v = 
{}".format(-i), [-i, i])
 
-    def allow_filtering_test(self):
+    def test_allow_filtering(self):
         """Test that allow filtering works as usual for a materialized view"""
-
         session = self.prepare()
 
         session.execute("CREATE TABLE t (id int PRIMARY KEY, v int, v2 text, 
v3 decimal)")
@@ -702,19 +686,19 @@ class TestMaterializedViews(Tester):
         session.execute(("CREATE MATERIALIZED VIEW t_by_v2 AS SELECT * FROM t "
                          "WHERE v2 IS NOT NULL AND id IS NOT NULL PRIMARY KEY 
(v2, id)"))
 
-        for i in xrange(1000):
+        for i in range(1000):
             session.execute("INSERT INTO t (id, v, v2, v3) VALUES ({v}, {v}, 
'a', 3.0)".format(v=i))
 
-        for i in xrange(1000):
+        for i in range(1000):
             assert_one(session, "SELECT * FROM t_by_v WHERE v = 
{v}".format(v=i), [i, i, 'a', 3.0])
 
         rows = list(session.execute("SELECT * FROM t_by_v2 WHERE v2 = 'a'"))
-        self.assertEqual(len(rows), 1000, "Expected 1000 rows but got 
{}".format(len(rows)))
+        assert len(rows) == 1000, "Expected 1000 rows but got 
{}".format(len(rows))
 
         assert_invalid(session, "SELECT * FROM t_by_v WHERE v = 1 AND v2 = 
'a'")
         assert_invalid(session, "SELECT * FROM t_by_v2 WHERE v2 = 'a' AND v = 
1")
 
-        for i in xrange(1000):
+        for i in range(1000):
             assert_one(
                 session,
                 "SELECT * FROM t_by_v WHERE v = {} AND v3 = 3.0 ALLOW 
FILTERING".format(i),
@@ -726,9 +710,8 @@ class TestMaterializedViews(Tester):
                 ['a', i, i, 3.0]
             )
 
-    def secondary_index_test(self):
+    def test_secondary_index(self):
         """Test that secondary indexes cannot be created on a materialized 
view"""
-
         session = self.prepare()
 
         session.execute("CREATE TABLE t (id int PRIMARY KEY, v int, v2 text, 
v3 decimal)")
@@ -737,34 +720,32 @@ class TestMaterializedViews(Tester):
         assert_invalid(session, "CREATE INDEX ON t_by_v (v2)",
                        "Secondary indexes are not supported on materialized 
views")
 
-    def ttl_test(self):
+    def test_ttl(self):
         """
         Test that TTL works as expected for a materialized view
         @expected_result The TTL is propagated properly between tables.
         """
-
         session = self.prepare()
         session.execute("CREATE TABLE t (id int PRIMARY KEY, v int, v2 int, v3 
int)")
         session.execute(("CREATE MATERIALIZED VIEW t_by_v2 AS SELECT * FROM t "
                          "WHERE v2 IS NOT NULL AND id IS NOT NULL PRIMARY KEY 
(v2, id)"))
 
-        for i in xrange(100):
+        for i in range(100):
             session.execute("INSERT INTO t (id, v, v2, v3) VALUES ({v}, {v}, 
{v}, {v}) USING TTL 10".format(v=i))
 
-        for i in xrange(100):
+        for i in range(100):
             assert_one(session, "SELECT * FROM t_by_v2 WHERE v2 = 
{}".format(i), [i, i, i, i])
 
         time.sleep(20)
 
         rows = list(session.execute("SELECT * FROM t_by_v2"))
-        self.assertEqual(len(rows), 0, "Expected 0 rows but got 
{}".format(len(rows)))
+        assert len(rows) == 0, "Expected 0 rows but got {}".format(len(rows))
 
-    def query_all_new_column_test(self):
+    def test_query_all_new_column(self):
         """
         Test that a materialized view created with a 'SELECT *' works as 
expected when adding a new column
         @expected_result The new column is present in the view.
         """
-
         session = self.prepare(user_table=True)
 
         self._insert_data(session)
@@ -778,20 +759,19 @@ class TestMaterializedViews(Tester):
         session.execute("ALTER TABLE users ADD first_name varchar;")
 
         results = list(session.execute("SELECT * FROM users_by_state WHERE 
state = 'TX' AND username = 'user1'"))
-        self.assertEqual(len(results), 1)
-        self.assertTrue(hasattr(results[0], 'first_name'), 'Column 
"first_name" not found')
+        assert len(results) == 1
+        assert hasattr(results[0], 'first_name'), 'Column "first_name" not 
found'
         assert_one(
             session,
             "SELECT * FROM users_by_state WHERE state = 'TX' AND username = 
'user1'",
             ['TX', 'user1', 1968, None, 'f', 'ch@ngem3a', None]
         )
 
-    def query_new_column_test(self):
+    def test_query_new_column(self):
         """
         Test that a materialized view created with 'SELECT <col1, ...>' works 
as expected when adding a new column
         @expected_result The new column is not present in the view.
         """
-
         session = self.prepare(user_table=True)
 
         session.execute(("CREATE MATERIALIZED VIEW users_by_state2 AS SELECT 
username FROM users "
@@ -808,20 +788,19 @@ class TestMaterializedViews(Tester):
         session.execute("ALTER TABLE users ADD first_name varchar;")
 
         results = list(session.execute("SELECT * FROM users_by_state2 WHERE 
state = 'TX' AND username = 'user1'"))
-        self.assertEqual(len(results), 1)
-        self.assertFalse(hasattr(results[0], 'first_name'), 'Column 
"first_name" found in view')
+        assert len(results) == 1
+        assert not hasattr(results[0], 'first_name'), 'Column "first_name" 
found in view'
         assert_one(
             session,
             "SELECT * FROM users_by_state2 WHERE state = 'TX' AND username = 
'user1'",
             ['TX', 'user1']
         )
 
-    def rename_column_test(self):
+    def test_rename_column(self):
         """
         Test that a materialized view created with a 'SELECT *' works as 
expected when renaming a column
         @expected_result The column is also renamed in the view.
         """
-
         session = self.prepare(user_table=True)
 
         self._insert_data(session)
@@ -835,20 +814,19 @@ class TestMaterializedViews(Tester):
         session.execute("ALTER TABLE users RENAME username TO user")
 
         results = list(session.execute("SELECT * FROM users_by_state WHERE 
state = 'TX' AND user = 'user1'"))
-        self.assertEqual(len(results), 1)
-        self.assertTrue(hasattr(results[0], 'user'), 'Column "user" not found')
+        assert len(results) == 1
+        assert hasattr(results[0], 'user'), 'Column "user" not found'
         assert_one(
             session,
             "SELECT state, user, birth_year, gender FROM users_by_state WHERE 
state = 'TX' AND user = 'user1'",
             ['TX', 'user1', 1968, 'f']
         )
 
-    def rename_column_atomicity_test(self):
+    def test_rename_column_atomicity(self):
         """
         Test that column renaming is atomically done between a table and its 
materialized views
         @jira_ticket CASSANDRA-12952
         """
-
         session = self.prepare(nodes=1, user_table=True, install_byteman=True)
         node = self.cluster.nodelist()[0]
 
@@ -861,13 +839,13 @@ class TestMaterializedViews(Tester):
         )
 
         # Rename a column with an injected byteman rule to kill the node after 
the first schema update
-        self.allow_log_errors = True
+        self.fixture_dtest_setup.allow_log_errors = True
         script_version = '4x' if self.cluster.version() >= '4' else '3x'
         
node.byteman_submit(['./byteman/merge_schema_failure_{}.btm'.format(script_version)])
-        with self.assertRaises(NoHostAvailable):
+        with pytest.raises(NoHostAvailable):
             session.execute("ALTER TABLE users RENAME username TO user")
 
-        debug('Restarting node')
+        logger.debug('Restarting node')
         node.stop()
         node.start(wait_for_binary_proto=True)
         session = self.patient_cql_connection(node, 
consistency_level=ConsistencyLevel.ONE)
@@ -884,58 +862,57 @@ class TestMaterializedViews(Tester):
             ['TX', 'user1', 1968, 'f', 'ch@ngem3a', None]
         )
 
-    def lwt_test(self):
+    def test_lwt(self):
         """Test that lightweight transaction behave properly with a 
materialized view"""
-
         session = self.prepare()
 
         session.execute("CREATE TABLE t (id int PRIMARY KEY, v int, v2 text, 
v3 decimal)")
         session.execute(("CREATE MATERIALIZED VIEW t_by_v AS SELECT * FROM t "
                          "WHERE v IS NOT NULL AND id IS NOT NULL PRIMARY KEY 
(v, id)"))
 
-        debug("Inserting initial data using IF NOT EXISTS")
-        for i in xrange(1000):
+        logger.debug("Inserting initial data using IF NOT EXISTS")
+        for i in range(1000):
             session.execute(
                 "INSERT INTO t (id, v, v2, v3) VALUES ({v}, {v}, 'a', 3.0) IF 
NOT EXISTS".format(v=i)
             )
         self._replay_batchlogs()
 
-        debug("All rows should have been inserted")
-        for i in xrange(1000):
+        logger.debug("All rows should have been inserted")
+        for i in range(1000):
             assert_one(
                 session,
                 "SELECT * FROM t_by_v WHERE v = {}".format(i),
                 [i, i, 'a', 3.0]
             )
 
-        debug("Tyring to UpInsert data with a different value using IF NOT 
EXISTS")
-        for i in xrange(1000):
+        logger.debug("Tyring to UpInsert data with a different value using IF 
NOT EXISTS")
+        for i in range(1000):
             v = i * 2
             session.execute(
                 "INSERT INTO t (id, v, v2, v3) VALUES ({id}, {v}, 'a', 3.0) IF 
NOT EXISTS".format(id=i, v=v)
             )
         self._replay_batchlogs()
 
-        debug("No rows should have changed")
-        for i in xrange(1000):
+        logger.debug("No rows should have changed")
+        for i in range(1000):
             assert_one(
                 session,
                 "SELECT * FROM t_by_v WHERE v = {}".format(i),
                 [i, i, 'a', 3.0]
             )
 
-        debug("Update the 10 first rows with a different value")
-        for i in xrange(1000):
+        logger.debug("Update the 10 first rows with a different value")
+        for i in range(1000):
             v = i + 2000
             session.execute(
                 "UPDATE t SET v={v} WHERE id = {id} IF v < 10".format(id=i, 
v=v)
             )
         self._replay_batchlogs()
 
-        debug("Verify that only the 10 first rows changed.")
+        logger.debug("Verify that only the 10 first rows changed.")
         results = list(session.execute("SELECT * FROM t_by_v;"))
-        self.assertEqual(len(results), 1000)
-        for i in xrange(1000):
+        assert len(results) == 1000
+        for i in range(1000):
             v = i + 2000 if i < 10 else i
             assert_one(
                 session,
@@ -943,18 +920,18 @@ class TestMaterializedViews(Tester):
                 [v, i, 'a', 3.0]
             )
 
-        debug("Deleting the first 10 rows")
-        for i in xrange(1000):
+        logger.debug("Deleting the first 10 rows")
+        for i in range(1000):
             v = i + 2000
             session.execute(
                 "DELETE FROM t WHERE id = {id} IF v = {v} ".format(id=i, v=v)
             )
         self._replay_batchlogs()
 
-        debug("Verify that only the 10 first rows have been deleted.")
+        logger.debug("Verify that only the 10 first rows have been deleted.")
         results = list(session.execute("SELECT * FROM t_by_v;"))
-        self.assertEqual(len(results), 990)
-        for i in xrange(10, 1000):
+        assert len(results) == 990
+        for i in range(10, 1000):
             assert_one(
                 session,
                 "SELECT * FROM t_by_v WHERE v = {}".format(i),
@@ -971,7 +948,7 @@ class TestMaterializedViews(Tester):
         session = self.prepare(options=options, install_byteman=True)
         node1, node2, node3 = self.cluster.nodelist()
 
-        debug("Avoid premature MV build finalization with byteman")
+        logger.debug("Avoid premature MV build finalization with byteman")
         for node in self.cluster.nodelist():
             if self.cluster.version() >= '4':
                 
node.byteman_submit(['./byteman/4.0/skip_view_build_finalization.btm'])
@@ -982,42 +959,42 @@ class TestMaterializedViews(Tester):
 
         session.execute("CREATE TABLE t (id int PRIMARY KEY, v int, v2 text, 
v3 decimal)")
 
-        debug("Inserting initial data")
-        for i in xrange(10000):
+        logger.debug("Inserting initial data")
+        for i in range(10000):
             session.execute(
                 "INSERT INTO t (id, v, v2, v3) VALUES ({v}, {v}, 'a', 3.0) IF 
NOT EXISTS".format(v=i)
             )
 
-        debug("Create a MV")
+        logger.debug("Create a MV")
         session.execute(("CREATE MATERIALIZED VIEW t_by_v AS SELECT * FROM t "
                          "WHERE v IS NOT NULL AND id IS NOT NULL PRIMARY KEY 
(v, id)"))
 
-        debug("Wait and ensure the MV build has started. Waiting up to 2 
minutes.")
+        logger.debug("Wait and ensure the MV build has started. Waiting up to 
2 minutes.")
         self._wait_for_view_build_start(session, 'ks', 't_by_v', 
wait_minutes=2)
 
-        debug("Stop the cluster. Interrupt the MV build process.")
+        logger.debug("Stop the cluster. Interrupt the MV build process.")
         self.cluster.stop()
 
-        debug("Checking logs to verify that the view build tasks have been 
created")
+        logger.debug("Checking logs to verify that the view build tasks have 
been created")
         for node in self.cluster.nodelist():
-            self.assertTrue(node.grep_log('Starting new view build', 
filename='debug.log'))
-            self.assertFalse(node.grep_log('Resuming view build', 
filename='debug.log'))
+            assert node.grep_log('Starting new view build', 
filename='debug.log')
+            assert not node.grep_log('Resuming view build', 
filename='debug.log')
             node.mark_log(filename='debug.log')
 
-        debug("Restart the cluster")
+        logger.debug("Restart the cluster")
         self.cluster.start(wait_for_binary_proto=True)
         session = self.patient_cql_connection(node1)
         session.execute("USE ks")
 
-        debug("MV shouldn't be built yet.")
-        self.assertNotEqual(len(list(session.execute("SELECT COUNT(*) FROM 
t_by_v"))), 10000)
+        logger.debug("MV shouldn't be built yet.")
+        assert len(list(session.execute("SELECT COUNT(*) FROM t_by_v"))) != 
10000
 
-        debug("Wait and ensure the MV build resumed. Waiting up to 2 minutes.")
+        logger.debug("Wait and ensure the MV build resumed. Waiting up to 2 
minutes.")
         self._wait_for_view("ks", "t_by_v")
 
-        debug("Verify all data")
+        logger.debug("Verify all data")
         assert_one(session, "SELECT COUNT(*) FROM t_by_v", [10000])
-        for i in xrange(10000):
+        for i in range(10000):
             assert_one(
                 session,
                 "SELECT * FROM t_by_v WHERE v = {}".format(i),
@@ -1025,10 +1002,11 @@ class TestMaterializedViews(Tester):
                 cl=ConsistencyLevel.ALL
             )
 
-        debug("Checking logs to verify that some view build tasks have been 
resumed")
+        logger.debug("Checking logs to verify that some view build tasks have 
been resumed")
         for node in self.cluster.nodelist():
-            self.assertTrue(node.grep_log('Resuming view build', 
filename='debug.log'))
+            assert node.grep_log('Resuming view build', filename='debug.log')
 
+    @pytest.mark.skip(reason="Frequently fails in CI. Skipping until fixed as 
tracked by CASSANDRA-14148")
     @since('4.0')
     def test_drop_while_building(self):
         """Test that a parallel MV build is interrupted when the view is 
removed"""
@@ -1036,31 +1014,31 @@ class TestMaterializedViews(Tester):
         session = 
self.prepare(options={'concurrent_materialized_view_builders': 4}, 
install_byteman=True)
         session.execute("CREATE TABLE t (id int PRIMARY KEY, v int, v2 text, 
v3 decimal)")
 
-        debug("Inserting initial data")
-        for i in xrange(5000):
+        logger.debug("Inserting initial data")
+        for i in range(5000):
             session.execute("INSERT INTO t (id, v, v2, v3) VALUES ({v}, {v}, 
'a', 3.0) IF NOT EXISTS".format(v=i))
 
-        debug("Slowing down MV build with byteman")
+        logger.debug("Slowing down MV build with byteman")
         for node in self.cluster.nodelist():
             node.byteman_submit(['./byteman/4.0/view_builder_task_sleep.btm'])
 
-        debug("Create a MV")
+        logger.debug("Create a MV")
         session.execute(("CREATE MATERIALIZED VIEW t_by_v AS SELECT * FROM t "
                          "WHERE v IS NOT NULL AND id IS NOT NULL PRIMARY KEY 
(v, id)"))
 
-        debug("Wait and ensure the MV build has started. Waiting up to 2 
minutes.")
+        logger.debug("Wait and ensure the MV build has started. Waiting up to 
2 minutes.")
         self._wait_for_view_build_start(session, 'ks', 't_by_v', 
wait_minutes=2)
 
-        debug("Drop the MV while it is still building")
+        logger.debug("Drop the MV while it is still building")
         session.execute("DROP MATERIALIZED VIEW t_by_v")
 
-        debug("Verify that the build has been stopped before its finalization 
without errors")
+        logger.debug("Verify that the build has been stopped before its 
finalization without errors")
         for node in self.cluster.nodelist():
             self.check_logs_for_errors()
-            self.assertFalse(node.grep_log('Marking view', 
filename='debug.log'))
-            self.assertTrue(node.grep_log('Stopping current view builder due 
to schema change', filename='debug.log'))
+            assert not node.grep_log('Marking view', filename='debug.log')
+            assert node.grep_log('Stopping current view builder due to schema 
change', filename='debug.log')
 
-        debug("Verify that the view has been removed")
+        logger.debug("Verify that the view has been removed")
         failed = False
         try:
             session.execute("SELECT COUNT(*) FROM t_by_v")
@@ -1068,11 +1046,11 @@ class TestMaterializedViews(Tester):
             failed = True
         self.assertTrue(failed, "The view shouldn't be queryable")
 
-        debug("Create the MV again")
+        logger.debug("Create the MV again")
         session.execute(("CREATE MATERIALIZED VIEW t_by_v AS SELECT * FROM t "
                          "WHERE v IS NOT NULL AND id IS NOT NULL PRIMARY KEY 
(v, id)"))
 
-        debug("Verify that the MV has been successfully created")
+        logger.debug("Verify that the MV has been successfully created")
         self._wait_for_view('ks', 't_by_v')
         assert_one(session, "SELECT COUNT(*) FROM t_by_v", [5000])
 
@@ -1084,54 +1062,54 @@ class TestMaterializedViews(Tester):
         session.execute("CREATE TABLE t (id int PRIMARY KEY, v int, v2 text, 
v3 decimal)")
         nodes = self.cluster.nodelist()
 
-        debug("Inserting initial data")
-        for i in xrange(5000):
+        logger.debug("Inserting initial data")
+        for i in range(5000):
             session.execute("INSERT INTO t (id, v, v2, v3) VALUES ({v}, {v}, 
'a', 3.0) IF NOT EXISTS".format(v=i))
 
-        debug("Slowing down MV build with byteman")
+        logger.debug("Slowing down MV build with byteman")
         for node in nodes:
             node.byteman_submit(['./byteman/4.0/view_builder_task_sleep.btm'])
 
-        debug("Create a MV")
+        logger.debug("Create a MV")
         session.execute(("CREATE MATERIALIZED VIEW t_by_v AS SELECT * FROM t "
                          "WHERE v IS NOT NULL AND id IS NOT NULL PRIMARY KEY 
(v, id)"))
 
-        debug("Wait and ensure the MV build has started. Waiting up to 2 
minutes.")
+        logger.debug("Wait and ensure the MV build has started. Waiting up to 
2 minutes.")
         self._wait_for_view_build_start(session, 'ks', 't_by_v', 
wait_minutes=2)
 
-        debug("Stopping all running view build tasks with nodetool")
+        logger.debug("Stopping all running view build tasks with nodetool")
         for node in nodes:
-            node.watch_log_for('Starting new view build for range', 
filename='debug.log', timeout=60)
+            node.watch_log_for('Starting new view build for range', 
filename='debug.log', timeout=120)
             node.nodetool('stop VIEW_BUILD')
 
-        debug("Checking logs to verify that some view build tasks have been 
stopped")
+        logger.debug("Checking logs to verify that some view build tasks have 
been stopped")
         for node in nodes:
-            node.watch_log_for('Stopped build for view', filename='debug.log', 
timeout=60)
-            node.watch_log_for('Compaction interrupted: View build', 
filename='system.log', timeout=60)
+            node.watch_log_for('Stopped build for view', filename='debug.log', 
timeout=120)
+            node.watch_log_for('Compaction interrupted: View build', 
filename='system.log', timeout=120)
             self.check_logs_for_errors()
 
-        debug("Drop the MV while it is still building")
+        logger.debug("Drop the MV while it is still building")
         session.execute("DROP MATERIALIZED VIEW t_by_v")
 
-        debug("Verify that the build has been stopped before its finalization 
without errors")
+        logger.debug("Verify that the build has been stopped before its 
finalization without errors")
         for node in nodes:
             self.check_logs_for_errors()
-            self.assertFalse(node.grep_log('Marking view', 
filename='debug.log'))
-            self.assertTrue(node.grep_log('Stopping current view builder due 
to schema change', filename='debug.log'))
+            assert not node.grep_log('Marking view', filename='debug.log')
+            assert node.grep_log('Stopping current view builder due to schema 
change', filename='debug.log')
 
-        debug("Verify that the view has been removed")
+        logger.debug("Verify that the view has been removed")
         failed = False
         try:
             session.execute("SELECT COUNT(*) FROM t_by_v")
         except InvalidRequest:
             failed = True
-        self.assertTrue(failed, "The view shouldn't be queryable")
+        assert failed, "The view shouldn't be queryable"
 
-        debug("Create the MV again")
+        logger.debug("Create the MV again")
         session.execute(("CREATE MATERIALIZED VIEW t_by_v AS SELECT * FROM t "
                          "WHERE v IS NOT NULL AND id IS NOT NULL PRIMARY KEY 
(v, id)"))
 
-        debug("Verify that the MV has been successfully created")
+        logger.debug("Verify that the MV has been successfully created")
         self._wait_for_view('ks', 't_by_v')
         assert_one(session, "SELECT COUNT(*) FROM t_by_v", [5000])
 
@@ -1143,51 +1121,51 @@ class TestMaterializedViews(Tester):
         session.execute("CREATE TABLE t (id int PRIMARY KEY, v int, v2 text, 
v3 decimal)")
         nodes = self.cluster.nodelist()
 
-        debug("Inserting initial data")
-        for i in xrange(5000):
+        logger.debug("Inserting initial data")
+        for i in range(5000):
             session.execute("INSERT INTO t (id, v, v2, v3) VALUES ({v}, {v}, 
'a', 3.0) IF NOT EXISTS".format(v=i))
 
-        debug("Slowing down MV build with byteman")
+        logger.debug("Slowing down MV build with byteman")
         for node in nodes:
             node.byteman_submit(['./byteman/4.0/view_builder_task_sleep.btm'])
 
-        debug("Create a MV")
+        logger.debug("Create a MV")
         session.execute(("CREATE MATERIALIZED VIEW t_by_v AS SELECT * FROM t "
                          "WHERE v IS NOT NULL AND id IS NOT NULL PRIMARY KEY 
(v, id)"))
 
-        debug("Wait and ensure the MV build has started. Waiting up to 2 
minutes.")
+        logger.debug("Wait and ensure the MV build has started. Waiting up to 
2 minutes.")
         self._wait_for_view_build_start(session, 'ks', 't_by_v', 
wait_minutes=2)
 
-        debug("Stopping all running view build tasks with nodetool")
+        logger.debug("Stopping all running view build tasks with nodetool")
         for node in nodes:
-            node.watch_log_for('Starting new view build for range', 
filename='debug.log', timeout=60)
+            node.watch_log_for('Starting new view build for range', 
filename='debug.log', timeout=120)
             node.nodetool('stop VIEW_BUILD')
 
-        debug("Checking logs to verify that some view build tasks have been 
stopped")
+        logger.debug("Checking logs to verify that some view build tasks have 
been stopped")
         for node in nodes:
-            node.watch_log_for('Stopped build for view', filename='debug.log', 
timeout=60)
-            node.watch_log_for('Compaction interrupted: View build', 
filename='system.log', timeout=60)
-            node.watch_log_for('Interrupted build for view', 
filename='debug.log', timeout=60)
-            self.assertFalse(node.grep_log('Marking view', 
filename='debug.log'))
+            node.watch_log_for('Stopped build for view', filename='debug.log', 
timeout=120)
+            node.watch_log_for('Compaction interrupted: View build', 
filename='system.log', timeout=120)
+            node.watch_log_for('Interrupted build for view', 
filename='debug.log', timeout=120)
+            assert not node.grep_log('Marking view', filename='debug.log')
             self.check_logs_for_errors()
 
-        debug("Check that MV shouldn't be built yet.")
-        self.assertNotEqual(len(list(session.execute("SELECT COUNT(*) FROM 
t_by_v"))), 5000)
+        logger.debug("Check that MV shouldn't be built yet.")
+        assert len(list(session.execute("SELECT COUNT(*) FROM t_by_v"))) != 
5000
 
-        debug("Restart the cluster")
+        logger.debug("Restart the cluster")
         self.cluster.stop()
         marks = [node.mark_log() for node in nodes]
         self.cluster.start(wait_for_binary_proto=True)
         session = self.patient_cql_connection(nodes[0])
 
-        debug("Verify that the MV has been successfully created")
+        logger.debug("Verify that the MV has been successfully created")
         self._wait_for_view('ks', 't_by_v')
         assert_one(session, "SELECT COUNT(*) FROM ks.t_by_v", [5000])
 
-        debug("Checking logs to verify that the view build has been resumed 
and completed after restart")
+        logger.debug("Checking logs to verify that the view build has been 
resumed and completed after restart")
         for node, mark in zip(nodes, marks):
-            self.assertTrue(node.grep_log('Resuming view build', 
filename='debug.log', from_mark=mark))
-            self.assertTrue(node.grep_log('Marking view', 
filename='debug.log', from_mark=mark))
+            assert node.grep_log('Resuming view build', filename='debug.log', 
from_mark=mark)
+            assert node.grep_log('Marking view', filename='debug.log', 
from_mark=mark)
             self.check_logs_for_errors()
 
     @since('3.0')
@@ -1207,7 +1185,7 @@ class TestMaterializedViews(Tester):
         node1, node2, node3 = self.cluster.nodelist()
         session.execute('USE ks')
 
-        debug("MV with same key and unselected columns")
+        logger.debug("MV with same key and unselected columns")
         session.execute("CREATE TABLE t2 (k int, a int, b int, c int, primary 
key(k, a)) with default_time_to_live=600")
         session.execute(("CREATE MATERIALIZED VIEW mv2 AS SELECT k,a,b FROM t2 
"
                          "WHERE k IS NOT NULL AND a IS NOT NULL PRIMARY KEY 
(a, k)"))
@@ -1250,7 +1228,7 @@ class TestMaterializedViews(Tester):
             assert_none(session, "SELECT * FROM t2")
             assert_none(session, "SELECT * FROM mv2")
 
-        debug("MV with extra key")
+        logger.debug("MV with extra key")
         session.execute("CREATE TABLE t (k int PRIMARY KEY, a int, b int) with 
default_time_to_live=600")
         session.execute(("CREATE MATERIALIZED VIEW mv AS SELECT * FROM t "
                          "WHERE k IS NOT NULL AND a IS NOT NULL PRIMARY KEY 
(k, a)"))
@@ -1291,10 +1269,12 @@ class TestMaterializedViews(Tester):
             assert_one(session, "SELECT * FROM t", [1, 6, 1])
             assert_one(session, "SELECT * FROM mv", [1, 6, 1])
 
+    @flaky
     @since('3.0')
     def test_no_base_column_in_view_pk_complex_timestamp_with_flush(self):
         self._test_no_base_column_in_view_pk_complex_timestamp(flush=True)
 
+    @pytest.mark.skip(reason="Frequently fails in CI. Skipping until fixed as 
tracked by CASSANDRA-14148")
     @since('3.0')
     def test_no_base_column_in_view_pk_complex_timestamp_without_flush(self):
         self._test_no_base_column_in_view_pk_complex_timestamp(flush=False)
@@ -1475,9 +1455,9 @@ class TestMaterializedViews(Tester):
         assert_one(session, "SELECT k,a,b FROM mv WHERE k = 2", [2, 2, 2])
 
         # stop node2, node3
-        debug('Shutdown node2')
+        logger.debug('Shutdown node2')
         node2.stop(wait_other_notice=True)
-        debug('Shutdown node3')
+        logger.debug('Shutdown node3')
         node3.stop(wait_other_notice=True)
         # shadow a = 1, create a = 2
         query = SimpleStatement("UPDATE t USING TIMESTAMP 9 SET a = 2 WHERE k 
= 1", consistency_level=ConsistencyLevel.ONE)
@@ -1486,33 +1466,33 @@ class TestMaterializedViews(Tester):
         query = SimpleStatement("UPDATE t USING TTL 3 SET a = 2 WHERE k = 2", 
consistency_level=ConsistencyLevel.ONE)
         self.update_view(session, query, flush)
 
-        debug('Starting node2')
+        logger.debug('Starting node2')
         node2.start(wait_other_notice=True, wait_for_binary_proto=True)
-        debug('Starting node3')
+        logger.debug('Starting node3')
         node3.start(wait_other_notice=True, wait_for_binary_proto=True)
 
         # For k = 1 & a = 1, We should get a digest mismatch of tombstones and 
repaired
         query = SimpleStatement("SELECT * FROM mv WHERE k = 1 AND a = 1", 
consistency_level=ConsistencyLevel.ALL)
         result = session.execute(query, trace=True)
         self.check_trace_events(result.get_query_trace(), True)
-        self.assertEqual(0, len(result.current_rows))
+        assert 0 == len(result.current_rows)
 
         # For k = 1 & a = 1, second time no digest mismatch
         result = session.execute(query, trace=True)
         self.check_trace_events(result.get_query_trace(), False)
         assert_none(session, "SELECT * FROM mv WHERE k = 1 AND a = 1")
-        self.assertEqual(0, len(result.current_rows))
+        assert 0 == len(result.current_rows)
 
         # For k = 1 & a = 2, We should get a digest mismatch of data and 
repaired for a = 2
         query = SimpleStatement("SELECT * FROM mv WHERE k = 1 AND a = 2", 
consistency_level=ConsistencyLevel.ALL)
         result = session.execute(query, trace=True)
         self.check_trace_events(result.get_query_trace(), True)
-        self.assertEqual(1, len(result.current_rows))
+        assert 1 == len(result.current_rows)
 
         # For k = 1 & a = 2, second time no digest mismatch
         result = session.execute(query, trace=True)
         self.check_trace_events(result.get_query_trace(), False)
-        self.assertEqual(1, len(result.current_rows))
+        assert 1 == len(result.current_rows)
         assert_one(session, "SELECT k,a,b,writetime(b) FROM mv WHERE k = 1", 
[1, 2, 1, 20])
 
         time.sleep(3)
@@ -1520,13 +1500,13 @@ class TestMaterializedViews(Tester):
         query = SimpleStatement("SELECT * FROM mv WHERE k = 2 AND a = 2", 
consistency_level=ConsistencyLevel.ALL)
         result = session.execute(query, trace=True)
         self.check_trace_events(result.get_query_trace(), True)
-        debug(result.current_rows)
-        self.assertEqual(0, len(result.current_rows))
+        logger.debug(result.current_rows)
+        assert 0 == len(result.current_rows)
 
         # For k = 2 & a = 2, second time no digest mismatch
         result = session.execute(query, trace=True)
         self.check_trace_events(result.get_query_trace(), False)
-        self.assertEqual(0, len(result.current_rows))
+        assert 0 == len(result.current_rows)
 
     @since('3.0')
     def test_expired_liveness_with_limit_rf1_nodes1(self):
@@ -1555,11 +1535,11 @@ class TestMaterializedViews(Tester):
                          "WHERE k IS NOT NULL AND a IS NOT NULL PRIMARY KEY 
(k, a)"))
         session.cluster.control_connection.wait_for_schema_agreement()
 
-        for k in xrange(100):
+        for k in range(100):
             session.execute("INSERT INTO t (k, a, b) VALUES ({}, {}, 
{})".format(k, k, k))
 
         # generate view row with expired liveness except for row 50 and 99
-        for k in xrange(100):
+        for k in range(100):
             if k == 50 or k == 99:
                 continue
             session.execute("DELETE a FROM t where k = {};".format(k))
@@ -1570,7 +1550,7 @@ class TestMaterializedViews(Tester):
         assert_all(session, "SELECT k,a,b FROM mv", [[50, 50, 50], [99, 99, 
99]])
 
         # verify IN
-        keys = xrange(100)
+        keys = range(100)
         assert_one(session, "SELECT k,a,b FROM mv WHERE k in ({}) limit 
1".format(', '.join(str(x) for x in keys)),
                    [50, 50, 50])
         assert_all(session, "SELECT k,a,b FROM mv WHERE k in ({}) limit 
2".format(', '.join(str(x) for x in keys)),
@@ -1642,7 +1622,7 @@ class TestMaterializedViews(Tester):
         assert_none(session, "SELECT * FROM t_by_v")
         assert_one(session, "SELECT * FROM t", [1, None, None, None])
 
-    def view_tombstone_test(self):
+    def test_view_tombstone(self):
         """
         Test that a materialized views properly tombstone
 
@@ -1690,7 +1670,7 @@ class TestMaterializedViews(Tester):
 
         assert_none(session, "SELECT * FROM t_by_v WHERE v = 1")
 
-        debug('Shutdown node2')
+        logger.debug('Shutdown node2')
         node2.stop(wait_other_notice=True)
 
         session.execute(SimpleStatement("UPDATE t USING TIMESTAMP 4 SET v = 1 
WHERE id = 1",
@@ -1746,10 +1726,10 @@ class TestMaterializedViews(Tester):
             if expect_digest:
                 self.fail("Didn't find digest mismatch")
 
-    def simple_repair_test_by_base(self):
+    def test_simple_repair_by_base(self):
         self._simple_repair_test(repair_base=True)
 
-    def simple_repair_test_by_view(self):
+    def test_simple_repair_by_view(self):
         self._simple_repair_test(repair_view=True)
 
     def _simple_repair_test(self, repair_base=False, repair_view=False):
@@ -1766,24 +1746,24 @@ class TestMaterializedViews(Tester):
 
         session.cluster.control_connection.wait_for_schema_agreement()
 
-        debug('Shutdown node2')
+        logger.debug('Shutdown node2')
         node2.stop(wait_other_notice=True)
 
-        for i in xrange(1000):
+        for i in range(1000):
             session.execute("INSERT INTO t (id, v, v2, v3) VALUES ({v}, {v}, 
'a', 3.0)".format(v=i))
 
         self._replay_batchlogs()
 
-        debug('Verify the data in the MV with CL=ONE')
-        for i in xrange(1000):
+        logger.debug('Verify the data in the MV with CL=ONE')
+        for i in range(1000):
             assert_one(
                 session,
                 "SELECT * FROM t_by_v WHERE v = {}".format(i),
                 [i, i, 'a', 3.0]
             )
 
-        debug('Verify the data in the MV with CL=ALL. All should be 
unavailable.')
-        for i in xrange(1000):
+        logger.debug('Verify the data in the MV with CL=ALL. All should be 
unavailable.')
+        for i in range(1000):
             statement = SimpleStatement(
                 "SELECT * FROM t_by_v WHERE v = {}".format(i),
                 consistency_level=ConsistencyLevel.ALL
@@ -1794,27 +1774,27 @@ class TestMaterializedViews(Tester):
                 statement
             )
 
-        debug('Start node2, and repair')
+        logger.debug('Start node2, and repair')
         node2.start(wait_other_notice=True, wait_for_binary_proto=True)
         if repair_base:
             node1.nodetool("repair ks t")
         if repair_view:
             node1.nodetool("repair ks t_by_v")
 
-        debug('Verify the data in the MV with CL=ALL. All should be available 
now and no digest mismatch')
-        for i in xrange(1000):
+        logger.debug('Verify the data in the MV with CL=ALL. All should be 
available now and no digest mismatch')
+        for i in range(1000):
             query = SimpleStatement(
                 "SELECT * FROM t_by_v WHERE v = {}".format(i),
                 consistency_level=ConsistencyLevel.ALL
             )
             result = session.execute(query, trace=True)
             self.check_trace_events(result.get_query_trace(), False)
-            self.assertEquals(self._rows_to_list(result.current_rows), [[i, i, 
'a', 3.0]])
+            assert self._rows_to_list(result.current_rows), [[i, i, 'a' == 
3.0]]
 
-    def base_replica_repair_test(self):
+    def test_base_replica_repair(self):
         self._base_replica_repair_test()
 
-    def base_replica_repair_with_contention_test(self):
+    def test_base_replica_repair_with_contention(self):
         """
         Test repair does not fail when there is MV lock contention
         @jira_ticket CASSANDRA-12905
@@ -1837,14 +1817,14 @@ class TestMaterializedViews(Tester):
 
         session.cluster.control_connection.wait_for_schema_agreement()
 
-        debug('Write initial data')
-        for i in xrange(1000):
+        logger.debug('Write initial data')
+        for i in range(1000):
             session.execute("INSERT INTO t (id, v, v2, v3) VALUES ({v}, {v}, 
'a', 3.0)".format(v=i))
 
         self._replay_batchlogs()
 
-        debug('Verify the data in the MV with CL=ALL')
-        for i in xrange(1000):
+        logger.debug('Verify the data in the MV with CL=ALL')
+        for i in range(1000):
             assert_one(
                 session,
                 "SELECT * FROM t_by_v WHERE v = {}".format(i),
@@ -1852,9 +1832,9 @@ class TestMaterializedViews(Tester):
                 cl=ConsistencyLevel.ALL
             )
 
-        debug('Shutdown node1')
+        logger.debug('Shutdown node1')
         node1.stop(wait_other_notice=True)
-        debug('Delete node1 data')
+        logger.debug('Delete node1 data')
         node1.clear(clear_all=True)
 
         jvm_args = []
@@ -1864,44 +1844,43 @@ class TestMaterializedViews(Tester):
             jvm_args.append("-Dcassandra.test.fail_mv_locks_count=1000")
             # this should not make Keyspace.apply throw WTE on failure to 
acquire lock
             
node1.set_configuration_options(values={'write_request_timeout_in_ms': 100})
-        debug('Restarting node1 with jvm_args={}'.format(jvm_args))
+        logger.debug('Restarting node1 with jvm_args={}'.format(jvm_args))
         node1.start(wait_other_notice=True, wait_for_binary_proto=True, 
jvm_args=jvm_args)
-        debug('Shutdown node2 and node3')
+        logger.debug('Shutdown node2 and node3')
         node2.stop(wait_other_notice=True)
         node3.stop(wait_other_notice=True)
 
         session = self.patient_exclusive_cql_connection(node1)
         session.execute('USE ks')
 
-        debug('Verify that there is no data on node1')
-        for i in xrange(1000):
+        logger.debug('Verify that there is no data on node1')
+        for i in range(1000):
             assert_none(
                 session,
                 "SELECT * FROM t_by_v WHERE v = {}".format(i)
             )
 
-        debug('Restarting node2 and node3')
+        logger.debug('Restarting node2 and node3')
         node2.start(wait_other_notice=True, wait_for_binary_proto=True)
         node3.start(wait_other_notice=True, wait_for_binary_proto=True)
 
         # Just repair the base replica
-        debug('Starting repair on node1')
+        logger.debug('Starting repair on node1')
         node1.nodetool("repair ks t")
 
-        debug('Verify data with cl=ALL')
-        for i in xrange(1000):
+        logger.debug('Verify data with cl=ALL')
+        for i in range(1000):
             assert_one(
                 session,
                 "SELECT * FROM t_by_v WHERE v = {}".format(i),
                 [i, i, 'a', 3.0]
             )
 
-    @attr("resource-intensive")
-    def complex_repair_test(self):
+    @pytest.mark.resource_intensive
+    def test_complex_repair(self):
         """
         Test that a materialized view are consistent after a more complex 
repair.
         """
-
         session = self.prepare(rf=5, options={'hinted_handoff_enabled': 
False}, nodes=5)
         node1, node2, node3, node4, node5 = self.cluster.nodelist()
 
@@ -1913,49 +1892,49 @@ class TestMaterializedViews(Tester):
 
         session.cluster.control_connection.wait_for_schema_agreement()
 
-        debug('Shutdown node2 and node3')
+        logger.debug('Shutdown node2 and node3')
         node2.stop()
         node3.stop(wait_other_notice=True)
 
-        debug('Write initial data to node1 (will be replicated to node4 and 
node5)')
-        for i in xrange(1000):
+        logger.debug('Write initial data to node1 (will be replicated to node4 
and node5)')
+        for i in range(1000):
             session.execute("INSERT INTO ks.t (id, v, v2, v3) VALUES ({v}, 
{v}, 'a', 3.0)".format(v=i))
 
-        debug('Verify the data in the MV on node1 with CL=ONE')
-        for i in xrange(1000):
+        logger.debug('Verify the data in the MV on node1 with CL=ONE')
+        for i in range(1000):
             assert_one(
                 session,
                 "SELECT * FROM ks.t_by_v WHERE v = {}".format(i),
                 [i, i, 'a', 3.0]
             )
 
-        debug('Close connection to node1')
+        logger.debug('Close connection to node1')
         session.cluster.shutdown()
-        debug('Shutdown node1, node4 and node5')
+        logger.debug('Shutdown node1, node4 and node5')
         node1.stop()
         node4.stop()
         node5.stop()
 
-        debug('Start nodes 2 and 3')
+        logger.debug('Start nodes 2 and 3')
         node2.start()
         node3.start(wait_other_notice=True, wait_for_binary_proto=True)
 
         session2 = self.patient_cql_connection(node2)
 
-        debug('Verify the data in the MV on node2 with CL=ONE. No rows should 
be found.')
-        for i in xrange(1000):
+        logger.debug('Verify the data in the MV on node2 with CL=ONE. No rows 
should be found.')
+        for i in range(1000):
             assert_none(
                 session2,
                 "SELECT * FROM ks.t_by_v WHERE v = {}".format(i)
             )
 
-        debug('Write new data in node2 and node3 that overlap those in node1, 
node4 and node5')
-        for i in xrange(1000):
+        logger.debug('Write new data in node2 and node3 that overlap those in 
node1, node4 and node5')
+        for i in range(1000):
             # we write i*2 as value, instead of i
             session2.execute("INSERT INTO ks.t (id, v, v2, v3) VALUES ({v}, 
{v}, 'a', 3.0)".format(v=i * 2))
 
-        debug('Verify the new data in the MV on node2 with CL=ONE')
-        for i in xrange(1000):
+        logger.debug('Verify the new data in the MV on node2 with CL=ONE')
+        for i in range(1000):
             v = i * 2
             assert_one(
                 session2,
@@ -1963,18 +1942,18 @@ class TestMaterializedViews(Tester):
                 [v, v, 'a', 3.0]
             )
 
-        debug('Wait for batchlogs to expire from node2 and node3')
+        logger.debug('Wait for batchlogs to expire from node2 and node3')
         time.sleep(5)
 
-        debug('Start remaining nodes')
+        logger.debug('Start remaining nodes')
         node1.start(wait_other_notice=True, wait_for_binary_proto=True)
         node4.start(wait_other_notice=True, wait_for_binary_proto=True)
         node5.start(wait_other_notice=True, wait_for_binary_proto=True)
 
         session = self.patient_cql_connection(node1)
 
-        debug('Read data from MV at QUORUM (old data should be returned)')
-        for i in xrange(1000):
+        logger.debug('Read data from MV at QUORUM (old data should be 
returned)')
+        for i in range(1000):
             assert_one(
                 session,
                 "SELECT * FROM ks.t_by_v WHERE v = {}".format(i),
@@ -1982,11 +1961,11 @@ class TestMaterializedViews(Tester):
                 cl=ConsistencyLevel.QUORUM
             )
 
-        debug('Run global repair on node1')
+        logger.debug('Run global repair on node1')
         node1.repair()
 
-        debug('Read data from MV at quorum (new data should be returned after 
repair)')
-        for i in xrange(1000):
+        logger.debug('Read data from MV at quorum (new data should be returned 
after repair)')
+        for i in range(1000):
             v = i * 2
             assert_one(
                 session,
@@ -1995,8 +1974,8 @@ class TestMaterializedViews(Tester):
                 cl=ConsistencyLevel.QUORUM
             )
 
-    @attr('resource-intensive')
-    def throttled_partition_update_test(self):
+    @pytest.mark.resource_intensive
+    def test_throttled_partition_update(self):
         """
         @jira_ticket: CASSANDRA-13299, test break up large partition when 
repairing base with mv.
 
@@ -2017,7 +1996,7 @@ class TestMaterializedViews(Tester):
 
         session.cluster.control_connection.wait_for_schema_agreement()
 
-        debug('Shutdown node2 and node3')
+        logger.debug('Shutdown node2 and node3')
         node2.stop(wait_other_notice=True)
         node3.stop(wait_other_notice=True)
 
@@ -2025,26 +2004,26 @@ class TestMaterializedViews(Tester):
         range_deletion_ts = 30
         partition_deletion_ts = 10
 
-        for ck1 in xrange(size):
-            for ck2 in xrange(size):
+        for ck1 in range(size):
+            for ck2 in range(size):
                 session.execute("INSERT INTO ks.t (pk, ck1, ck2, v1, v2)"
                                 " VALUES (1, {}, {}, {}, {}) USING TIMESTAMP 
{}".format(ck1, ck2, ck1, ck2, ck1))
 
         self._replay_batchlogs()
 
-        for ck1 in xrange(size):
-            for ck2 in xrange(size):
+        for ck1 in range(size):
+            for ck2 in range(size):
                 assert_one(session, "SELECT pk,ck1,ck2,v1,v2 FROM ks.t WHERE 
pk=1 AND ck1={} AND ck2={}".format(ck1, ck2),
                            [1, ck1, ck2, ck1, ck2])
                 assert_one(session, "SELECT pk,ck1,ck2,v1,v2 FROM ks.t_by_v 
WHERE pk=1 AND ck1={} AND ck2={}".format(ck1, ck2),
                            [1, ck1, ck2, ck1, ck2])
 
-        debug('Shutdown node4 and node5')
+        logger.debug('Shutdown node4 and node5')
         node4.stop(wait_other_notice=True)
         node5.stop(wait_other_notice=True)
 
-        for ck1 in xrange(size):
-            for ck2 in xrange(size):
+        for ck1 in range(size):
+            for ck2 in range(size):
                 if ck1 % 2 == 0:  # range tombstone
                     session.execute("DELETE FROM ks.t USING TIMESTAMP 50 WHERE 
pk=1 AND ck1={}".format(ck1))
                 elif ck1 == ck2:  # row tombstone
@@ -2061,27 +2040,27 @@ class TestMaterializedViews(Tester):
         self._replay_batchlogs()
 
         # start nodes with different batch size
-        debug('Starting nodes')
+        logger.debug('Starting nodes')
         node2.start(wait_other_notice=True, wait_for_binary_proto=True, 
jvm_args=["-Dcassandra.repair.mutation_repair_rows_per_batch={}".format(2)])
         node3.start(wait_other_notice=True, wait_for_binary_proto=True, 
jvm_args=["-Dcassandra.repair.mutation_repair_rows_per_batch={}".format(5)])
         node4.start(wait_other_notice=True, wait_for_binary_proto=True, 
jvm_args=["-Dcassandra.repair.mutation_repair_rows_per_batch={}".format(50)])
         node5.start(wait_other_notice=True, wait_for_binary_proto=True, 
jvm_args=["-Dcassandra.repair.mutation_repair_rows_per_batch={}".format(5000)])
         self._replay_batchlogs()
 
-        debug('repairing base table')
+        logger.debug('repairing base table')
         node1.nodetool("repair ks t")
         self._replay_batchlogs()
 
-        debug('stop cluster')
+        logger.debug('stop cluster')
         self.cluster.stop()
 
-        debug('rolling restart to check repaired data on each node')
+        logger.debug('rolling restart to check repaired data on each node')
         for node in self.cluster.nodelist():
-            debug('starting {}'.format(node.name))
+            logger.debug('starting {}'.format(node.name))
             node.start(wait_other_notice=True, wait_for_binary_proto=True)
             session = self.patient_cql_connection(node, 
consistency_level=ConsistencyLevel.ONE)
-            for ck1 in xrange(size):
-                for ck2 in xrange(size):
+            for ck1 in range(size):
+                for ck2 in range(size):
                     if (
                         ck1 <= partition_deletion_ts or  # partition deletion
                         ck1 == ck2 or ck1 % 2 == 0 or  # row deletion or range 
tombstone
@@ -2101,15 +2080,14 @@ class TestMaterializedViews(Tester):
                                             "ck1={} AND ck2={}".format(ck1, 
ck2), [1, ck1, ck2, ck1, ck2])
                         assert_one(session, "SELECT pk,ck1,ck2,v1,v2 FROM ks.t 
WHERE pk=1 AND "
                                             "ck1={} AND ck2={}".format(ck1, 
ck2), [1, ck1, ck2, ck1, ck2])
-            debug('stopping {}'.format(node.name))
+            logger.debug('stopping {}'.format(node.name))
             node.stop(wait_other_notice=True, wait_for_binary_proto=True)
 
-    @attr('resource-intensive')
-    def really_complex_repair_test(self):
+    @pytest.mark.resource_intensive
+    def test_really_complex_repair(self):
         """
         Test that a materialized view are consistent after a more complex 
repair.
         """
-
         session = self.prepare(rf=5, options={'hinted_handoff_enabled': 
False}, nodes=5)
         node1, node2, node3, node4, node5 = self.cluster.nodelist()
 
@@ -2122,40 +2100,40 @@ class TestMaterializedViews(Tester):
 
         session.cluster.control_connection.wait_for_schema_agreement()
 
-        debug('Shutdown node2 and node3')
+        logger.debug('Shutdown node2 and node3')
         node2.stop(wait_other_notice=True)
         node3.stop(wait_other_notice=True)
 
         session.execute("INSERT INTO ks.t (id, v, v2, v3) VALUES (1, 1, 'a', 
3.0)")
         session.execute("INSERT INTO ks.t (id, v, v2, v3) VALUES (2, 2, 'a', 
3.0)")
         self._replay_batchlogs()
-        debug('Verify the data in the MV on node1 with CL=ONE')
+        logger.debug('Verify the data in the MV on node1 with CL=ONE')
         assert_all(session, "SELECT * FROM ks.t_by_v WHERE v2 = 'a'", [['a', 
1, 1, 3.0], ['a', 2, 2, 3.0]])
 
         session.execute("INSERT INTO ks.t (id, v, v2, v3) VALUES (1, 1, 'b', 
3.0)")
         session.execute("INSERT INTO ks.t (id, v, v2, v3) VALUES (2, 2, 'b', 
3.0)")
         self._replay_batchlogs()
-        debug('Verify the data in the MV on node1 with CL=ONE')
+        logger.debug('Verify the data in the MV on node1 with CL=ONE')
         assert_all(session, "SELECT * FROM ks.t_by_v WHERE v2 = 'b'", [['b', 
1, 1, 3.0], ['b', 2, 2, 3.0]])
 
         session.shutdown()
 
-        debug('Shutdown node1, node4 and node5')
+        logger.debug('Shutdown node1, node4 and node5')
         node1.stop()
         node4.stop()
         node5.stop()
 
-        debug('Start nodes 2 and 3')
+        logger.debug('Start nodes 2 and 3')
         node2.start()
         node3.start(wait_other_notice=True, wait_for_binary_proto=True)
 
         session2 = self.patient_cql_connection(node2)
         session2.execute('USE ks')
 
-        debug('Verify the data in the MV on node2 with CL=ONE. No rows should 
be found.')
+        logger.debug('Verify the data in the MV on node2 with CL=ONE. No rows 
should be found.')
         assert_none(session2, "SELECT * FROM ks.t_by_v WHERE v2 = 'a'")
 
-        debug('Write new data in node2 that overlap those in node1')
+        logger.debug('Write new data in node2 that overlap those in node1')
         session2.execute("INSERT INTO ks.t (id, v, v2, v3) VALUES (1, 1, 'c', 
3.0)")
         session2.execute("INSERT INTO ks.t (id, v, v2, v3) VALUES (2, 2, 'c', 
3.0)")
         self._replay_batchlogs()
@@ -2166,17 +2144,17 @@ class TestMaterializedViews(Tester):
         self._replay_batchlogs()
         assert_all(session2, "SELECT * FROM ks.t_by_v WHERE v2 = 'd'", [['d', 
1, 1, 3.0], ['d', 2, 2, 3.0]])
 
-        debug("Composite delete of everything")
+        logger.debug("Composite delete of everything")
         session2.execute("DELETE FROM ks.t WHERE id = 1 and v = 1")
         session2.execute("DELETE FROM ks.t WHERE id = 2 and v = 2")
         self._replay_batchlogs()
         assert_none(session2, "SELECT * FROM ks.t_by_v WHERE v2 = 'c'")
         assert_none(session2, "SELECT * FROM ks.t_by_v WHERE v2 = 'd'")
 
-        debug('Wait for batchlogs to expire from node2 and node3')
+        logger.debug('Wait for batchlogs to expire from node2 and node3')
         time.sleep(5)
 
-        debug('Start remaining nodes')
+        logger.debug('Start remaining nodes')
         node1.start(wait_other_notice=True, wait_for_binary_proto=True)
         node4.start(wait_other_notice=True, wait_for_binary_proto=True)
         node5.start(wait_other_notice=True, wait_for_binary_proto=True)
@@ -2189,23 +2167,22 @@ class TestMaterializedViews(Tester):
             cl=ConsistencyLevel.QUORUM
         )
 
-        debug('Run global repair on node1')
+        logger.debug('Run global repair on node1')
         node1.repair()
 
         assert_none(session2, "SELECT * FROM ks.t_by_v WHERE v2 = 'a'", 
cl=ConsistencyLevel.QUORUM)
 
-    def complex_mv_select_statements_test(self):
+    def test_complex_mv_select_statements(self):
         """
         Test complex MV select statements
         @jira_ticket CASSANDRA-9664
         """
-
         cluster = self.cluster
         cluster.populate(3).start()
         node1 = cluster.nodelist()[0]
         session = self.patient_cql_connection(node1, 
consistency_level=ConsistencyLevel.QUORUM)
 
-        debug("Creating keyspace")
+        logger.debug("Creating keyspace")
         session.execute("CREATE KEYSPACE mvtest WITH replication = "
                         "{'class': 'SimpleStrategy', 'replication_factor': 
'3'}")
         session.execute('USE mvtest')
@@ -2240,7 +2217,7 @@ class TestMaterializedViews(Tester):
             for row in rows:
                 session.execute(insert_stmt, row)
 
-            debug("Testing MV primary key: {}".format(mv_primary_key))
+            logger.debug("Testing MV primary key: {}".format(mv_primary_key))
 
             session.execute("CREATE MATERIALIZED VIEW mv AS SELECT * FROM test 
WHERE "
                             "a = 1 AND b IS NOT NULL AND c = 1 PRIMARY KEY 
{}".format(mv_primary_key))
@@ -2352,12 +2329,12 @@ class TestMaterializedViews(Tester):
         cluster.start()
 
         # node3 should have received and ignored the creation of the MV over 
the dropped table
-        self.assertTrue(node3.grep_log('Not adding view users_by_state because 
the base table'))
+        assert node3.grep_log('Not adding view users_by_state because the base 
table')
 
-    def base_view_consistency_on_failure_after_mv_apply_test(self):
+    def test_base_view_consistency_on_failure_after_mv_apply(self):
         self._test_base_view_consistency_on_crash("after")
 
-    def base_view_consistency_on_failure_before_mv_apply_test(self):
+    def test_base_view_consistency_on_failure_before_mv_apply(self):
         self._test_base_view_consistency_on_crash("before")
 
     def _test_base_view_consistency_on_crash(self, fail_phase):
@@ -2370,7 +2347,7 @@ class TestMaterializedViews(Tester):
         """
 
         self.cluster.set_batch_commitlog(enabled=True)
-        self.ignore_log_patterns = [r'Dummy failure', r"Failed to 
force-recycle all segments"]
+        self.fixture_dtest_setup.ignore_log_patterns = [r'Dummy failure', 
r"Failed to force-recycle all segments"]
         self.prepare(rf=1, install_byteman=True)
         node1, node2, node3 = self.cluster.nodelist()
         session = self.patient_exclusive_cql_connection(node1)
@@ -2382,25 +2359,25 @@ class TestMaterializedViews(Tester):
 
         session.cluster.control_connection.wait_for_schema_agreement()
 
-        debug('Make node1 fail {} view writes'.format(fail_phase))
+        logger.debug('Make node1 fail {} view writes'.format(fail_phase))
         
node1.byteman_submit(['./byteman/fail_{}_view_write.btm'.format(fail_phase)])
 
-        debug('Write 1000 rows - all node1 writes should fail')
+        logger.debug('Write 1000 rows - all node1 writes should fail')
 
         failed = False
-        for i in xrange(1, 1000):
+        for i in range(1, 1000):
             try:
                 session.execute("INSERT INTO t (id, v, v2, v3) VALUES ({v}, 
{v}, 'a', 3.0) USING TIMESTAMP {v}".format(v=i))
             except WriteFailure:
                 failed = True
 
-        self.assertTrue(failed, "Should fail at least once.")
-        self.assertTrue(node1.grep_log("Dummy failure"), "Should throw Dummy 
failure")
+        assert failed, "Should fail at least once."
+        assert node1.grep_log("Dummy failure"), "Should throw Dummy failure"
 
         missing_entries = 0
         session = self.patient_exclusive_cql_connection(node1)
         session.execute('USE ks')
-        for i in xrange(1, 1000):
+        for i in range(1, 1000):
             view_entry = rows_to_list(session.execute(SimpleStatement("SELECT 
* FROM t_by_v WHERE id = {} AND v = {}".format(i, i),
                                                       
consistency_level=ConsistencyLevel.ONE)))
             base_entry = rows_to_list(session.execute(SimpleStatement("SELECT 
* FROM t WHERE id = {}".format(i),
@@ -2411,29 +2388,29 @@ class TestMaterializedViews(Tester):
             if not view_entry:
                 missing_entries += 1
 
-        debug("Missing entries {}".format(missing_entries))
-        self.assertTrue(missing_entries > 0, )
+        logger.debug("Missing entries {}".format(missing_entries))
+        assert missing_entries > 0
 
-        debug('Restarting node1 to ensure commit log is replayed')
+        logger.debug('Restarting node1 to ensure commit log is replayed')
         node1.stop(wait_other_notice=True)
         # Set batchlog.replay_timeout_seconds=1 so we can ensure batchlog will 
be replayed below
         node1.start(jvm_args=["-Dcassandra.batchlog.replay_timeout_in_ms=1"])
 
-        debug('Replay batchlogs')
+        logger.debug('Replay batchlogs')
         time.sleep(0.001)  # Wait batchlog.replay_timeout_in_ms=1 (ms)
         self._replay_batchlogs()
 
-        debug('Verify that both the base table entry and view are present 
after commit and batchlog replay')
+        logger.debug('Verify that both the base table entry and view are 
present after commit and batchlog replay')
         session = self.patient_exclusive_cql_connection(node1)
         session.execute('USE ks')
-        for i in xrange(1, 1000):
+        for i in range(1, 1000):
             view_entry = rows_to_list(session.execute(SimpleStatement("SELECT 
* FROM t_by_v WHERE id = {} AND v = {}".format(i, i),
                                                       
consistency_level=ConsistencyLevel.ONE)))
             base_entry = rows_to_list(session.execute(SimpleStatement("SELECT 
* FROM t WHERE id = {}".format(i),
                                                       
consistency_level=ConsistencyLevel.ONE)))
 
-            self.assertTrue(base_entry, "Both base {} and view entry {} should 
exist.".format(base_entry, view_entry))
-            self.assertTrue(view_entry, "Both base {} and view entry {} should 
exist.".format(base_entry, view_entry))
+            assert base_entry, "Both base {} and view entry {} should 
exist.".format(base_entry, view_entry)
+            assert view_entry, "Both base {} and view entry {} should 
exist.".format(base_entry, view_entry)
 
 
 # For read verification
@@ -2513,7 +2490,7 @@ SimpleRow = collections.namedtuple('SimpleRow', 'a b c d')
 
 
 def row_generate(i, num_partitions):
-    return SimpleRow(a=i % num_partitions, b=(i % 400) / num_partitions, c=i, 
d=i)
+    return SimpleRow(a=i % num_partitions, b=(i % 400) // num_partitions, c=i, 
d=i)
 
 
 # Create a threaded session and execute queries from a Queue
@@ -2547,12 +2524,12 @@ def thread_session(ip, queue, start, end, rows, 
num_partitions):
             ret = execute_query(session, select_gi, i)
             queue.put_nowait(ret)
     except Exception as e:
-        print str(e)
+        print(str(e))
         queue.close()
 
 
 @since('3.0')
-@skipIf(sys.platform == 'win32', 'Bug in python on Windows: 
https://bugs.python.org/issue10128')
+@pytest.mark.skipif(sys.platform == 'win32', reason='Bug in python on Windows: 
https://bugs.python.org/issue10128')
 class TestMaterializedViewsConsistency(Tester):
 
     def prepare(self, user_table=False):
@@ -2569,14 +2546,14 @@ class TestMaterializedViewsConsistency(Tester):
         self.rows = {}
         self.up

<TRUNCATED>

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to