This is an automated email from the ASF dual-hosted git repository.

adelapena pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 2c11dfcb5bc2dc25390a4b312911b8401d4a46e6
Merge: 3b1d7c3 d9cdf2d
Author: Andrés de la Peña <a.penya.gar...@gmail.com>
AuthorDate: Fri Apr 23 19:39:14 2021 +0100

    Merge branch 'cassandra-3.11' into trunk

 CHANGES.txt                                        |   2 +
 .../org/apache/cassandra/db/ColumnFamilyStore.java |   5 +
 .../org/apache/cassandra/db/view/TableViews.java   |  11 +-
 src/java/org/apache/cassandra/db/view/View.java    |   2 +-
 test/unit/org/apache/cassandra/cql3/ViewTest.java  | 321 ++++++++++++---------
 5 files changed, 209 insertions(+), 132 deletions(-)

diff --cc CHANGES.txt
index d660200,899cb79..f5331f8
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,74 -1,12 +1,76 @@@
 -3.11.11
 +4.0-rc2
 + * Test org.apache.cassandra.net.AsyncPromiseTest FAILED (CASSANDRA-16596)
 +Merged from 3.11:
   * Ignore stale acks received in the shadow round (CASSANDRA-16588)
++Merged from 3.0:
++ * Fix materialized view builders inserting truncated data (CASSANDRA-16567)
 +
 +4.0-rc1
 + * Allow for setting buffer max capacity to increase it dynamically as needed 
(CASSANDRA-16524)
 + * Harden internode message resource limit accounting against serialization 
failures (CASSANDRA-16616)
 + * Add back the source release of python driver in tree to avoid fetching 
from GitHub APIs (CASSANDRA-16599)
 + * Fix false unavailable for queries due to cluster topology changes 
(CASSANDRA-16545)
 + * Fixed a race condition issue in nodetool repair where we poll for the 
error before seeing the error notification, leading to a less meaningful 
message (CASSANDRA-16585)
 + * Fix mixed cluster GROUP BY queries (CASSANDRA-16582)
 + * Upgrade jflex to 1.8.2 (CASSANDRA-16576)
 + * Binary releases no longer bundle the apidocs (javadoc) (CASSANDRA-15561)
 + * Fix Streaming Repair metrics (CASSANDRA-16190)
 + * Scheduled (delayed) schema pull tasks should not run after MIGRATION stage 
shutdown during decommission (CASSANDRA-16495)
 + * When behind a firewall trunk is not buildable, need to allow overriding 
URLs (CASSANDRA-16563)
 + * Make sure sstables with moved starts are removed correctly in 
LeveledGenerations (CASSANDRA-16552)
 + * Fix race between secondary index building and active compactions tracking 
(CASSANDRA-16554)
 + * Migrate dependency handling from maven-ant-tasks to resolver-ant-tasks, 
removing lib/ directory from version control (CASSANDRA-16391)
 + * Fix 4.0 node sending a repair prepare message to a 3.x node breaking the 
connection (CASSANDRA-16542)
 + * Removed synchronized modifier from StreamSession#onChannelClose to prevent 
deadlocking on flush (CASSANDRA-15892)
 + * Throw IOE in AbstractType.writeValue if value has wrong fixed length 
(CASSANDRA-16500)
 + * Execute background refreshing of auth caches on a dedicated executor 
(CASSANDRA-15177)
 + * Update bundled java and python drivers to 3.11.0 and 3.25.0 respectively 
(CASSANDRA-13951)
 + * Add io.netty.tryReflectionSetAccessible=true to j11 server options in 
order to enable netty to use Unsafe direct byte buffer construction 
(CASSANDRA-16493)
 + * Make cassandra-stress -node support host:port notation (CASSANDRA-16529)
 + * Better handle legacy gossip application states during (and after) upgrades 
(CASSANDRA-16525)
 + * Mark StreamingMetrics.ActiveOutboundStreams as deprecated (CASSANDRA-11174)
 + * Increase the cqlsh version number (CASSANDRA-16509)
 + * Fix the CQL generated for the views.where_clause column when some 
identifiers require quoting (CASSANDRA-16479)
 + * Send FAILED_SESSION_MSG on shutdown and on in-progress repairs during 
startup (CASSANDRA-16425)
 + * Reinstate removed ApplicationState padding (CASSANDRA-16484)
 + * Expose data dirs to ColumnFamilyStoreMBean (CASSANDRA-16335)
 + * Add possibility to copy SSTables in SSTableImporter instead of moving them 
(CASSANDRA-16407)
 + * Fix DESCRIBE statement for CUSTOM indices with options (CASSANDRA-16482)
 + * Fix cassandra-stress JMX connection (CASSANDRA-16473)
 + * Avoid processing redundant application states on endpoint changes 
(CASSANDRA-16381)
 + * Prevent parent repair sessions leak (CASSANDRA-16446)
 + * Fix timestamp issue in SinglePartitionSliceCommandTest 
testPartitionD…eletionRowDeletionTie (CASSANDRA-16443)
 + * Promote protocol V5 out of beta (CASSANDRA-14973)
 + * Fix incorrect encoding for strings can be UTF8 (CASSANDRA-16429)
 + * Fix node unable to join when RF > N in multi-DC with added warning 
(CASSANDRA-16296)
 + * Add an option to nodetool tablestats to check sstable location correctness 
(CASSANDRA-16344) 
 + * Unable to ALTER KEYSPACE while decommissioned/assassinated nodes are in 
gossip (CASSANDRA-16422)
 + * Metrics backward compatibility restored after CASSANDRA-15066 
(CASSANDRA-16083)
 + * Reduce new reserved keywords introduced since 3.0 (CASSANDRA-16439)
 + * Improve system tables handling in case of disk failures (CASSANDRA-14793)
 + * Add access and datacenters to unreserved keywords (CASSANDRA-16398)
 + * Fix nodetool ring, status output when DNS resolution or port printing are 
in use (CASSANDRA-16283)
 + * Upgrade Jacoco to 0.8.6 (for Java 11 support) (CASSANDRA-16365)
 + * Move excessive repair debug loggings to trace level (CASSANDRA-16406)
 + * Restore validation of each message's protocol version (CASSANDRA-16374)
 + * Upgrade netty and chronicle-queue dependencies to get Auditing and native 
library loading working on arm64 architectures (CASSANDRA-16384,CASSANDRA-16392)
 + * Release StreamingTombstoneHistogramBuilder spool when switching writers 
(CASSANDRA-14834)
 + * Correct memtable on-heap size calculations to match actual use 
(CASSANDRA-16318)
 + * Fix client notifications in CQL protocol v5 (CASSANDRA-16353)
 + * Too defensive check when picking sstables for preview repair 
(CASSANDRA-16284)
 + * Ensure pre-negotiation native protocol responses have correct stream id 
(CASSANDRA-16376)
 + * Fix check for -Xlog in cassandra-env.sh (CASSANDRA-16279)
 + * SSLFactory should initialize SSLContext before setting protocols 
(CASSANDRA-16362)
 + * Restore sasi dependencies jflex, snowball-stemmer, and concurrent-trees, 
in the cassandra-all pom (CASSANDRA-16303)
 + * Fix DecimalDeserializer#toString OOM (CASSANDRA-14925)
 +Merged from 3.11:
   * Add autocomplete and error messages for provide_overlapping_tombstones 
(CASSANDRA-16350)
 - * Add StorageServiceMBean.getKeyspaceReplicationInfo(keyspaceName) 
(CASSANDRA-16447)
   * Upgrade jackson-databind to 2.9.10.8 (CASSANDRA-16462)
 + * Fix digest computation for queries with fetched but non queried columns 
(CASSANDRA-15962)
 + * Reduce amount of allocations during batch statement execution 
(CASSANDRA-16201)
 + * Update jflex-1.6.0.jar to match upstream (CASSANDRA-16393)
  Merged from 3.0:
 - * Fix materialized view builders inserting truncated data (CASSANDRA-16567)
   * Don't wait for schema migrations from removed nodes (CASSANDRA-16577)
 - * Scheduled (delayed) schema pull tasks should not run after MIGRATION stage 
shutdown during decommission (CASSANDRA-16495)
   * Ignore trailing zeros in hint files (CASSANDRA-16523)
   * Refuse DROP COMPACT STORAGE if some 2.x sstables are in use 
(CASSANDRA-15897)
   * Fix ColumnFilter::toString not returning a valid CQL fragment 
(CASSANDRA-16483)
diff --cc src/java/org/apache/cassandra/db/view/TableViews.java
index 5b88fc7,5636192..cc58dc1
--- a/src/java/org/apache/cassandra/db/view/TableViews.java
+++ b/src/java/org/apache/cassandra/db/view/TableViews.java
@@@ -21,9 -21,7 +21,8 @@@ import java.util.*
  import java.util.concurrent.CopyOnWriteArrayList;
  import java.util.concurrent.TimeUnit;
  import java.util.concurrent.atomic.AtomicLong;
 +import java.util.stream.Collectors;
  
- import com.google.common.collect.ImmutableMap;
  import com.google.common.collect.Iterables;
  import com.google.common.collect.Iterators;
  import com.google.common.collect.PeekingIterator;
@@@ -89,10 -79,20 +88,20 @@@ public class TableViews extends Abstrac
  
      public Iterable<ColumnFamilyStore> allViewsCfs()
      {
 -        Keyspace keyspace = Keyspace.open(baseTableMetadata.ksName);
 -        return Iterables.transform(views, view -> 
keyspace.getColumnFamilyStore(view.getDefinition().viewName));
 +        Keyspace keyspace = Keyspace.open(baseTableMetadata.keyspace);
 +        return Iterables.transform(views, view -> 
keyspace.getColumnFamilyStore(view.getDefinition().name()));
      }
  
+     public void build()
+     {
+         views.forEach(View::build);
+     }
+ 
+     public void stopBuild()
+     {
+         views.forEach(View::stopBuild);
+     }
+ 
      public void forceBlockingFlush()
      {
          for (ColumnFamilyStore viewCfs : allViewsCfs())
diff --cc src/java/org/apache/cassandra/db/view/View.java
index 3920b70,6653836..d813d0e
--- a/src/java/org/apache/cassandra/db/view/View.java
+++ b/src/java/org/apache/cassandra/db/view/View.java
@@@ -220,8 -214,9 +220,8 @@@ public class Vie
      {
          if (builder != null)
          {
-             logger.debug("Stopping current view builder due to schema 
change");
+             logger.debug("Stopping current view builder due to schema change 
or truncate");
              builder.stop();
 -            builder.waitForCompletion();
              builder = null;
          }
      }
diff --cc test/unit/org/apache/cassandra/cql3/ViewTest.java
index f792cfd,38b1a35..963a4b6
--- a/test/unit/org/apache/cassandra/cql3/ViewTest.java
+++ b/test/unit/org/apache/cassandra/cql3/ViewTest.java
@@@ -36,25 -36,43 +36,36 @@@ import org.junit.runner.RunWith
  import com.datastax.driver.core.ResultSet;
  import com.datastax.driver.core.Row;
  import com.datastax.driver.core.exceptions.InvalidQueryException;
 -import org.apache.cassandra.SchemaLoader;
 -import org.apache.cassandra.Util;
 -import org.apache.cassandra.concurrent.SEPExecutor;
 +import com.datastax.driver.core.exceptions.OperationTimedOutException;
- import org.apache.cassandra.concurrent.SEPExecutor;
  import org.apache.cassandra.concurrent.Stage;
 -import org.apache.cassandra.concurrent.StageManager;
 -import org.apache.cassandra.config.CFMetaData;
 -import org.apache.cassandra.config.ColumnDefinition;
  import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.db.view.View;
 +import org.apache.cassandra.schema.TableMetadata;
 +import org.apache.cassandra.schema.ColumnMetadata;
  import org.apache.cassandra.db.ColumnFamilyStore;
  import org.apache.cassandra.db.Keyspace;
  import org.apache.cassandra.db.SystemKeyspace;
  import org.apache.cassandra.db.compaction.CompactionManager;
 -import org.apache.cassandra.db.marshal.AsciiType;
 -import org.apache.cassandra.db.view.View;
 -import org.apache.cassandra.exceptions.SyntaxException;
 -import org.apache.cassandra.schema.KeyspaceParams;
  import org.apache.cassandra.service.ClientWarn;
--import org.apache.cassandra.transport.ProtocolVersion;
  import org.apache.cassandra.utils.FBUtilities;
- 
- 
++import org.awaitility.Awaitility;
+ import org.jboss.byteman.contrib.bmunit.BMRule;
+ import org.jboss.byteman.contrib.bmunit.BMRules;
+ import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+ 
 -import static junit.framework.Assert.fail;
++import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
+ import static org.assertj.core.api.Assertions.assertThat;
+ import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.assertFalse;
+ import static org.junit.Assert.assertTrue;
+ 
+ @RunWith(BMUnitRunner.class)
  public class ViewTest extends CQLTester
  {
-     ProtocolVersion protocolVersion = ProtocolVersion.V4;
+     /** Latch used by {@link #testTruncateWhileBuilding()} Byteman 
injections. */
+     @SuppressWarnings("unused")
+     private static final CountDownLatch blockViewBuild = new 
CountDownLatch(1);
+ 
 -    private static final ProtocolVersion protocolVersion = 
ProtocolVersion.CURRENT;
      private final List<String> views = new ArrayList<>();
  
      @BeforeClass
@@@ -72,34 -90,27 +83,38 @@@
      public void end() throws Throwable
      {
          for (String viewName : views)
--            executeNet(protocolVersion, "DROP MATERIALIZED VIEW " + viewName);
++            executeNet("DROP MATERIALIZED VIEW " + viewName);
      }
  
      private void createView(String name, String query) throws Throwable
      {
 -        executeNet(protocolVersion, String.format(query, name));
 -        // If exception is thrown, the view will not be added to the list; 
since it shouldn't have been created, this is
 -        // the desired behavior
 -        views.add(name);
 +        try
 +        {
-             executeNet(protocolVersion, String.format(query, name));
++            executeNet(String.format(query, name));
 +            // If exception is thrown, the view will not be added to the 
list; since it shouldn't have been created, this is
 +            // the desired behavior
 +            views.add(name);
 +        }
 +        catch (OperationTimedOutException ex)
 +        {
 +            // ... except for timeout, when we actually do not know whether 
the view was created or not
 +            views.add(name);
 +            throw ex;
 +        }
      }
  
      private void updateView(String query, Object... params) throws Throwable
      {
--        executeNet(protocolVersion, query, params);
-         while (!(((SEPExecutor) 
Stage.VIEW_MUTATION.executor()).getPendingTaskCount() == 0
-                 && ((SEPExecutor) 
Stage.VIEW_MUTATION.executor()).getActiveTaskCount() == 0))
-         {
-             Thread.sleep(1);
-         }
++        executeNet(query, params);
+         waitForViewMutations();
+     }
+ 
+     private void waitForViewMutations()
+     {
 -        SEPExecutor executor = (SEPExecutor) 
StageManager.getStage(Stage.VIEW_MUTATION);
 -        Util.spinAssertEquals(0L, () -> executor.getPendingTasks() + 
executor.getActiveCount(), 60);
++        Awaitility.await()
++                  .atMost(5, TimeUnit.MINUTES)
++                  .until(() -> 
Stage.VIEW_MUTATION.executor().getPendingTaskCount() == 0
++                               && 
Stage.VIEW_MUTATION.executor().getActiveTaskCount() == 0);
      }
  
      @Test
@@@ -129,7 -140,7 +144,7 @@@
          createTable("CREATE TABLE %s (k1 int, c1 int, c2 int, v1 int, v2 int, 
PRIMARY KEY (k1, c1, c2))");
  
          execute("USE " + keyspace());
--        executeNet(protocolVersion, "USE " + keyspace());
++        executeNet("USE " + keyspace());
  
          createView("view1",
                     "CREATE MATERIALIZED VIEW view1 AS SELECT * FROM %%s WHERE 
k1 IS NOT NULL AND c1 IS NOT NULL AND c2 IS NOT NULL PRIMARY KEY (k1, c2, c1)");
@@@ -166,9 -177,9 +181,9 @@@
          createTable("CREATE TABLE %s (k1 int, c1 int , val int, PRIMARY KEY 
(k1, c1))");
  
          execute("USE " + keyspace());
--        executeNet(protocolVersion, "USE " + keyspace());
++        executeNet("USE " + keyspace());
  
 -        createView("view1", "CREATE MATERIALIZED VIEW view1 AS SELECT k1 FROM 
%%s WHERE k1 IS NOT NULL AND c1 IS NOT NULL AND val IS NOT NULL PRIMARY KEY 
(val, k1, c1)");
 +        createView("view1", "CREATE MATERIALIZED VIEW view1 AS SELECT k1, c1, 
val FROM %%s WHERE k1 IS NOT NULL AND c1 IS NOT NULL AND val IS NOT NULL 
PRIMARY KEY (val, k1, c1)");
  
          updateView("INSERT INTO %s (k1, c1, val) VALUES (1, 2, 200)");
          updateView("INSERT INTO %s (k1, c1, val) VALUES (1, 3, 300)");
@@@ -188,9 -200,9 +203,9 @@@
          createTable("CREATE TABLE %s (k1 int, c1 int , val int, PRIMARY KEY 
(k1, c1))");
  
          execute("USE " + keyspace());
--        executeNet(protocolVersion, "USE " + keyspace());
++        executeNet("USE " + keyspace());
  
 -        createView("view1", "CREATE MATERIALIZED VIEW view1 AS SELECT k1 FROM 
%%s WHERE k1 IS NOT NULL AND c1 IS NOT NULL AND val IS NOT NULL PRIMARY KEY 
(val, k1, c1)");
 +        createView("view1", "CREATE MATERIALIZED VIEW view1 AS SELECT val, 
k1, c1 FROM %%s WHERE k1 IS NOT NULL AND c1 IS NOT NULL AND val IS NOT NULL 
PRIMARY KEY (val, k1, c1)");
  
      }
  
@@@ -200,9 -212,9 +215,9 @@@
          createTable("CREATE TABLE %s (k1 int, c1 int , val int, PRIMARY KEY 
(k1, c1))");
  
          execute("USE " + keyspace());
--        executeNet(protocolVersion, "USE " + keyspace());
++        executeNet("USE " + keyspace());
  
 -        createView("view1", "CREATE MATERIALIZED VIEW view1 AS SELECT k1 FROM 
%%s WHERE k1 IS NOT NULL AND c1 IS NOT NULL AND val IS NOT NULL PRIMARY KEY 
(val, k1, c1)");
 +        createView("view1", "CREATE MATERIALIZED VIEW view1 AS SELECT k1, c1, 
val FROM %%s WHERE k1 IS NOT NULL AND c1 IS NOT NULL AND val IS NOT NULL 
PRIMARY KEY (val, k1, c1)");
  
          updateView("INSERT INTO %s (k1, c1, val) VALUES (1, 2, 200)");
          updateView("INSERT INTO %s (k1, c1, val) VALUES (1, 3, 300)");
@@@ -226,7 -238,7 +241,7 @@@
                      "PRIMARY KEY((k, asciival)))");
  
          execute("USE " + keyspace());
--        executeNet(protocolVersion, "USE " + keyspace());
++        executeNet("USE " + keyspace());
  
          // Must include "IS NOT NULL" for primary keys
          try
@@@ -286,7 -291,7 +301,7 @@@
                      "PRIMARY KEY(k,c))");
  
          execute("USE " + keyspace());
--        executeNet(protocolVersion, "USE " + keyspace());
++        executeNet("USE " + keyspace());
  
          try
          {
@@@ -340,7 -345,7 +355,7 @@@
                      "PRIMARY KEY(k,c))");
  
          execute("USE " + keyspace());
--        executeNet(protocolVersion, "USE " + keyspace());
++        executeNet("USE " + keyspace());
  
          createView("mv_tstest", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM 
%%s WHERE val IS NOT NULL AND k IS NOT NULL AND c IS NOT NULL PRIMARY KEY 
(val,k,c)");
  
@@@ -379,7 -384,7 +394,7 @@@
                      "val int)");
  
          execute("USE " + keyspace());
--        executeNet(protocolVersion, "USE " + keyspace());
++        executeNet("USE " + keyspace());
  
          createView("mv_rctstest", "CREATE MATERIALIZED VIEW %s AS SELECT * 
FROM %%s WHERE k IS NOT NULL AND c IS NOT NULL PRIMARY KEY (k,c)");
  
@@@ -408,7 -413,7 +423,7 @@@
                      "count counter)");
  
          execute("USE " + keyspace());
--        executeNet(protocolVersion, "USE " + keyspace());
++        executeNet("USE " + keyspace());
  
          try
          {
@@@ -428,7 -456,7 +443,7 @@@
                      "result duration)");
  
          execute("USE " + keyspace());
--        executeNet(protocolVersion, "USE " + keyspace());
++        executeNet("USE " + keyspace());
  
          try
          {
@@@ -458,25 -486,28 +473,25 @@@
          createTable("CREATE TABLE %s (a int, b int, c int, d int, e int, 
PRIMARY KEY (a, b))");
  
          execute("USE " + keyspace());
--        executeNet(protocolVersion, "USE " + keyspace());
++        executeNet("USE " + keyspace());
          Keyspace ks = Keyspace.open(keyspace());
  
          createView("mv", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s 
WHERE a IS NOT NULL AND b IS NOT NULL AND c IS NOT NULL PRIMARY KEY (c, a, b)");
          ks.getColumnFamilyStore("mv").disableAutoCompaction();
  
          //Set initial values TS=0, leaving e null and verify view
--        executeNet(protocolVersion, "INSERT INTO %s (a, b, c, d) VALUES (0, 
0, 1, 0) USING TIMESTAMP 0");
++        executeNet("INSERT INTO %s (a, b, c, d) VALUES (0, 0, 1, 0) USING 
TIMESTAMP 0");
          assertRows(execute("SELECT d from mv WHERE c = ? and a = ? and b = 
?", 1, 0, 0), row(0));
  
 -        if (flush)
 -            FBUtilities.waitOnFutures(ks.flush());
 -
          //update c's timestamp TS=2
--        executeNet(protocolVersion, "UPDATE %s USING TIMESTAMP 2 SET c = ? 
WHERE a = ? and b = ? ", 1, 0, 0);
++        executeNet("UPDATE %s USING TIMESTAMP 2 SET c = ? WHERE a = ? and b = 
? ", 1, 0, 0);
          assertRows(execute("SELECT d from mv WHERE c = ? and a = ? and b = 
?", 1, 0, 0), row(0));
  
          if (flush)
              FBUtilities.waitOnFutures(ks.flush());
  
          // change c's value and TS=3, tombstones c=1 and adds c=0 record
--        executeNet(protocolVersion, "UPDATE %s USING TIMESTAMP 3 SET c = ? 
WHERE a = ? and b = ? ", 0, 0, 0);
++        executeNet("UPDATE %s USING TIMESTAMP 3 SET c = ? WHERE a = ? and b = 
? ", 0, 0, 0);
          if (flush)
              FBUtilities.waitOnFutures(ks.flush());
          assertRows(execute("SELECT d from mv WHERE c = ? and a = ? and b = 
?", 1, 0, 0));
@@@ -489,7 -520,7 +504,7 @@@
  
  
          //change c's value back to 1 with TS=4, check we can see d
--        executeNet(protocolVersion, "UPDATE %s USING TIMESTAMP 4 SET c = ? 
WHERE a = ? and b = ? ", 1, 0, 0);
++        executeNet("UPDATE %s USING TIMESTAMP 4 SET c = ? WHERE a = ? and b = 
? ", 1, 0, 0);
          if (flush)
          {
              ks.getColumnFamilyStore("mv").forceMajorCompaction();
@@@ -500,7 -531,7 +515,7 @@@
  
  
          //Add e value @ TS=1
--        executeNet(protocolVersion, "UPDATE %s USING TIMESTAMP 1 SET e = ? 
WHERE a = ? and b = ? ", 1, 0, 0);
++        executeNet("UPDATE %s USING TIMESTAMP 1 SET e = ? WHERE a = ? and b = 
? ", 1, 0, 0);
          assertRows(execute("SELECT d,e from mv WHERE c = ? and a = ? and b = 
?", 1, 0, 0), row(0, 1));
  
          if (flush)
@@@ -508,7 -539,7 +523,7 @@@
  
  
          //Change d value @ TS=2
--        executeNet(protocolVersion, "UPDATE %s USING TIMESTAMP 2 SET d = ? 
WHERE a = ? and b = ? ", 2, 0, 0);
++        executeNet("UPDATE %s USING TIMESTAMP 2 SET d = ? WHERE a = ? and b = 
? ", 2, 0, 0);
          assertRows(execute("SELECT d from mv WHERE c = ? and a = ? and b = 
?", 1, 0, 0), row(2));
  
          if (flush)
@@@ -516,16 -547,16 +531,16 @@@
  
  
          //Change d value @ TS=3
--        executeNet(protocolVersion, "UPDATE %s USING TIMESTAMP 3 SET d = ? 
WHERE a = ? and b = ? ", 1, 0, 0);
++        executeNet("UPDATE %s USING TIMESTAMP 3 SET d = ? WHERE a = ? and b = 
? ", 1, 0, 0);
          assertRows(execute("SELECT d from mv WHERE c = ? and a = ? and b = 
?", 1, 0, 0), row(1));
  
  
          //Tombstone c
--        executeNet(protocolVersion, "DELETE FROM %s WHERE a = ? and b = ?", 
0, 0);
++        executeNet("DELETE FROM %s WHERE a = ? and b = ?", 0, 0);
          assertRows(execute("SELECT d from mv"));
  
          //Add back without D
--        executeNet(protocolVersion, "INSERT INTO %s (a, b, c) VALUES (0, 0, 
1)");
++        executeNet("INSERT INTO %s (a, b, c) VALUES (0, 0, 1)");
  
          //Make sure D doesn't pop back in.
          assertRows(execute("SELECT d from mv WHERE c = ? and a = ? and b = 
?", 1, 0, 0), row((Object) null));
@@@ -533,24 -564,24 +548,24 @@@
  
          //New partition
          // insert a row with timestamp 0
--        executeNet(protocolVersion, "INSERT INTO %s (a, b, c, d, e) VALUES 
(?, ?, ?, ?, ?) USING TIMESTAMP 0", 1, 0, 0, 0, 0);
++        executeNet("INSERT INTO %s (a, b, c, d, e) VALUES (?, ?, ?, ?, ?) 
USING TIMESTAMP 0", 1, 0, 0, 0, 0);
  
          // overwrite pk and e with timestamp 1, but don't overwrite d
--        executeNet(protocolVersion, "INSERT INTO %s (a, b, c, e) VALUES (?, 
?, ?, ?) USING TIMESTAMP 1", 1, 0, 0, 0);
++        executeNet("INSERT INTO %s (a, b, c, e) VALUES (?, ?, ?, ?) USING 
TIMESTAMP 1", 1, 0, 0, 0);
  
          // delete with timestamp 0 (which should only delete d)
--        executeNet(protocolVersion, "DELETE FROM %s USING TIMESTAMP 0 WHERE a 
= ? AND b = ?", 1, 0);
++        executeNet("DELETE FROM %s USING TIMESTAMP 0 WHERE a = ? AND b = ?", 
1, 0);
          assertRows(execute("SELECT a, b, c, d, e from mv WHERE c = ? and a = 
? and b = ?", 0, 1, 0),
                     row(1, 0, 0, null, 0)
          );
  
--        executeNet(protocolVersion, "UPDATE %s USING TIMESTAMP 2 SET c = ? 
WHERE a = ? AND b = ?", 1, 1, 0);
--        executeNet(protocolVersion, "UPDATE %s USING TIMESTAMP 3 SET c = ? 
WHERE a = ? AND b = ?", 0, 1, 0);
++        executeNet("UPDATE %s USING TIMESTAMP 2 SET c = ? WHERE a = ? AND b = 
?", 1, 1, 0);
++        executeNet("UPDATE %s USING TIMESTAMP 3 SET c = ? WHERE a = ? AND b = 
?", 0, 1, 0);
          assertRows(execute("SELECT a, b, c, d, e from mv WHERE c = ? and a = 
? and b = ?", 0, 1, 0),
                     row(1, 0, 0, null, 0)
          );
  
--        executeNet(protocolVersion, "UPDATE %s USING TIMESTAMP 3 SET d = ? 
WHERE a = ? AND b = ?", 0, 1, 0);
++        executeNet("UPDATE %s USING TIMESTAMP 3 SET d = ? WHERE a = ? AND b = 
?", 0, 1, 0);
          assertRows(execute("SELECT a, b, c, d, e from mv WHERE c = ? and a = 
? and b = ?", 0, 1, 0),
                     row(1, 0, 0, 0, 0)
          );
@@@ -568,7 -599,7 +583,7 @@@
                      "PRIMARY KEY (k, c))");
  
          execute("USE " + keyspace());
--        executeNet(protocolVersion, "USE " + keyspace());
++        executeNet("USE " + keyspace());
  
  
          for(int i = 0; i < 1024; i++)
@@@ -597,7 -628,7 +612,7 @@@
                      ")");
  
          execute("USE " + keyspace());
--        executeNet(protocolVersion, "USE " + keyspace());
++        executeNet("USE " + keyspace());
  
          createView("mv_test1", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM 
%%s WHERE textval2 IS NOT NULL AND k IS NOT NULL AND asciival IS NOT NULL AND 
bigintval IS NOT NULL AND textval1 IS NOT NULL PRIMARY KEY ((textval2, k), 
asciival, bigintval, textval1)");
  
@@@ -644,7 -675,7 +659,7 @@@
                      ")");
  
          execute("USE " + keyspace());
--        executeNet(protocolVersion, "USE " + keyspace());
++        executeNet("USE " + keyspace());
  
          createView("mv", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s 
WHERE textval1 IS NOT NULL AND k IS NOT NULL AND asciival IS NOT NULL AND 
bigintval IS NOT NULL PRIMARY KEY ((textval1, k), asciival, bigintval)");
  
@@@ -677,7 -708,7 +692,7 @@@
                      ")");
  
          execute("USE " + keyspace());
--        executeNet(protocolVersion, "USE " + keyspace());
++        executeNet("USE " + keyspace());
  
          createView("mv", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s 
WHERE textval1 IS NOT NULL AND k IS NOT NULL AND asciival IS NOT NULL AND 
bigintval IS NOT NULL PRIMARY KEY ((textval1, k), asciival, bigintval)");
  
@@@ -707,12 -738,12 +722,12 @@@
                      "bigintval bigint, " +
                      "PRIMARY KEY((k, asciival)))");
  
 -        CFMetaData metadata = currentTableMetadata();
 +        TableMetadata metadata = currentTableMetadata();
  
          execute("USE " + keyspace());
--        executeNet(protocolVersion, "USE " + keyspace());
++        executeNet("USE " + keyspace());
  
 -        for (ColumnDefinition def : new HashSet<>(metadata.allColumns()))
 +        for (ColumnMetadata def : new HashSet<>(metadata.columns()))
          {
              try
              {
@@@ -829,7 -860,7 +844,7 @@@
                      "PRIMARY KEY (k))");
  
          execute("USE " + keyspace());
--        executeNet(protocolVersion, "USE " + keyspace());
++        executeNet("USE " + keyspace());
  
          createView("mv", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s 
WHERE k IS NOT NULL AND intval IS NOT NULL PRIMARY KEY (intval, k)");
  
@@@ -849,7 -880,7 +864,7 @@@
          createTable("CREATE TABLE %s (k int, intval int,  listval 
frozen<list<tuple<text,text>>>, PRIMARY KEY (k))");
  
          execute("USE " + keyspace());
--        executeNet(protocolVersion, "USE " + keyspace());
++        executeNet("USE " + keyspace());
  
          createView("mv",
                     "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s WHERE k 
IS NOT NULL AND listval IS NOT NULL PRIMARY KEY (k, listval)");
@@@ -885,7 -916,7 +900,7 @@@
                      "PRIMARY KEY (k))");
  
          execute("USE " + keyspace());
--        executeNet(protocolVersion, "USE " + keyspace());
++        executeNet("USE " + keyspace());
  
          createView("mv", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s 
WHERE k IS NOT NULL AND intval IS NOT NULL PRIMARY KEY (intval, k)");
  
@@@ -911,7 -942,7 +926,7 @@@
                      "PRIMARY KEY (a, b))");
  
          execute("USE " + keyspace());
--        executeNet(protocolVersion, "USE " + keyspace());
++        executeNet("USE " + keyspace());
  
          createView("mv", "CREATE MATERIALIZED VIEW %s AS SELECT a, b, c FROM 
%%s WHERE a IS NOT NULL AND b IS NOT NULL PRIMARY KEY (b, a)");
  
@@@ -949,7 -980,7 +964,7 @@@
                      "d int," +
                      "PRIMARY KEY (a, b))");
  
--        executeNet(protocolVersion, "USE " + keyspace());
++        executeNet("USE " + keyspace());
  
          createView("mv", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s 
WHERE c IS NOT NULL AND a IS NOT NULL AND b IS NOT NULL PRIMARY KEY (c, a, b)");
  
@@@ -959,7 -990,7 +974,7 @@@
          updateView("INSERT INTO %s (a, b, c) VALUES (?, ?, ?)", 1, 1, 2);
  
          Thread.sleep(TimeUnit.SECONDS.toMillis(5));
--        List<Row> results = executeNet(protocolVersion, "SELECT d FROM mv 
WHERE c = 2 AND a = 1 AND b = 1").all();
++        List<Row> results = executeNet("SELECT d FROM mv WHERE c = 2 AND a = 
1 AND b = 1").all();
          Assert.assertEquals(1, results.size());
          Assert.assertTrue("There should be a null result given back due to 
ttl expiry", results.get(0).isNull(0));
      }
@@@ -974,14 -1005,14 +989,14 @@@
                      "d int," +
                      "PRIMARY KEY (a, b))");
  
--        executeNet(protocolVersion, "USE " + keyspace());
++        executeNet("USE " + keyspace());
  
          createView("mv", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s 
WHERE c IS NOT NULL AND a IS NOT NULL AND b IS NOT NULL PRIMARY KEY (c, a, b)");
  
          updateView("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?) USING TTL 
3", 1, 1, 1, 1);
  
          Thread.sleep(TimeUnit.SECONDS.toMillis(4));
--        Assert.assertEquals(0, executeNet(protocolVersion, "SELECT * FROM mv 
WHERE c = 1 AND a = 1 AND b = 1").all().size());
++        Assert.assertEquals(0, executeNet("SELECT * FROM mv WHERE c = 1 AND a 
= 1 AND b = 1").all().size());
      }
  
      @Test
@@@ -994,14 -1025,14 +1009,14 @@@
                      "d int," +
                      "PRIMARY KEY (a, b))");
  
--        executeNet(protocolVersion, "USE " + keyspace());
++        executeNet("USE " + keyspace());
  
          createView("mv", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s 
WHERE c IS NOT NULL AND a IS NOT NULL AND b IS NOT NULL PRIMARY KEY (c, a, b)");
  
          String table = keyspace() + "." + currentTable();
          updateView("DELETE FROM " + table + " USING TIMESTAMP 6 WHERE a = 1 
AND b = 1;");
          updateView("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?) USING 
TIMESTAMP 3", 1, 1, 1, 1);
--        Assert.assertEquals(0, executeNet(protocolVersion, "SELECT * FROM mv 
WHERE c = 1 AND a = 1 AND b = 1").all().size());
++        Assert.assertEquals(0, executeNet("SELECT * FROM mv WHERE c = 1 AND a 
= 1 AND b = 1").all().size());
      }
  
      @Test
@@@ -1013,7 -1044,7 +1028,7 @@@
                      "c int," +
                      "PRIMARY KEY (a, b))");
  
--        executeNet(protocolVersion, "USE " + keyspace());
++        executeNet("USE " + keyspace());
  
          createView("mv", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s 
WHERE c IS NOT NULL AND a IS NOT NULL AND b IS NOT NULL PRIMARY KEY (c, a, b)");
  
@@@ -1022,11 -1053,11 +1037,11 @@@
              updateView("INSERT INTO %s (a, b, c) VALUES (?, ?, ?) USING 
TIMESTAMP 1", 1, 1, i);
          }
  
--        ResultSet mvRows = executeNet(protocolVersion, "SELECT c FROM mv");
--        List<Row> rows = executeNet(protocolVersion, "SELECT c FROM 
%s").all();
++        ResultSet mvRows = executeNet("SELECT c FROM mv");
++        List<Row> rows = executeNet("SELECT c FROM %s").all();
          Assert.assertEquals("There should be exactly one row in base", 1, 
rows.size());
          int expected = rows.get(0).getInt("c");
--        assertRowsNet(protocolVersion, mvRows, row(expected));
++        assertRowsNet(mvRows, row(expected));
      }
  
      @Test
@@@ -1040,35 -1071,35 +1055,27 @@@
                      "PRIMARY KEY (a, b, c))" +
                      "WITH CLUSTERING ORDER BY (b ASC, c DESC)");
  
--        executeNet(protocolVersion, "USE " + keyspace());
++        executeNet("USE " + keyspace());
  
 -        createView("mv1", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s 
WHERE b IS NOT NULL AND c IS NOT NULL PRIMARY KEY (a, b, c) WITH CLUSTERING 
ORDER BY (b DESC)");
 -        createView("mv2", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s 
WHERE b IS NOT NULL AND c IS NOT NULL PRIMARY KEY (a, c, b) WITH CLUSTERING 
ORDER BY (c ASC)");
 -        createView("mv3", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s 
WHERE b IS NOT NULL AND c IS NOT NULL PRIMARY KEY (a, b, c)");
 -        createView("mv4", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s 
WHERE b IS NOT NULL AND c IS NOT NULL PRIMARY KEY (a, c, b) WITH CLUSTERING 
ORDER BY (c DESC)");
 +        createView("mv1", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s 
WHERE a IS NOT NULL AND b IS NOT NULL AND c IS NOT NULL PRIMARY KEY (a, b, c) 
WITH CLUSTERING ORDER BY (b DESC, c ASC)");
 +        createView("mv2", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s 
WHERE a IS NOT NULL AND b IS NOT NULL AND c IS NOT NULL PRIMARY KEY (a, c, b) 
WITH CLUSTERING ORDER BY (c ASC, b ASC)");
 +        createView("mv3", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s 
WHERE a IS NOT NULL AND b IS NOT NULL AND c IS NOT NULL PRIMARY KEY (a, b, c)");
 +        createView("mv4", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s 
WHERE a IS NOT NULL AND b IS NOT NULL AND c IS NOT NULL PRIMARY KEY (a, c, b) 
WITH CLUSTERING ORDER BY (c DESC, b ASC)");
  
          updateView("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 1, 1, 
1, 1);
          updateView("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 1, 2, 
2, 2);
  
--        ResultSet mvRows = executeNet(protocolVersion, "SELECT b FROM mv1");
--        assertRowsNet(protocolVersion, mvRows,
--                      row(2),
--                      row(1));
--
--        mvRows = executeNet(protocolVersion, "SELECT c FROM mv2");
--        assertRowsNet(protocolVersion, mvRows,
--                      row(1),
--                      row(2));
--
--        mvRows = executeNet(protocolVersion, "SELECT b FROM mv3");
--        assertRowsNet(protocolVersion, mvRows,
--                      row(1),
--                      row(2));
--
--        mvRows = executeNet(protocolVersion, "SELECT c FROM mv4");
--        assertRowsNet(protocolVersion, mvRows,
--                      row(2),
--                      row(1));
++        ResultSet mvRows = executeNet("SELECT b FROM mv1");
++        assertRowsNet(mvRows, row(2), row(1));
++
++        mvRows = executeNet("SELECT c FROM mv2");
++        assertRowsNet(mvRows, row(1), row(2));
++
++        mvRows = executeNet("SELECT b FROM mv3");
++        assertRowsNet(mvRows, row(1), row(2));
++
++        mvRows = executeNet("SELECT c FROM mv4");
++        assertRowsNet(mvRows, row(2), row(1));
      }
  
      @Test
@@@ -1079,27 -1110,27 +1086,24 @@@
                      "b int," +
                      "PRIMARY KEY (a, b))");
  
--        executeNet(protocolVersion, "USE " + keyspace());
++        executeNet("USE " + keyspace());
  
 -        createView("mv1", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s 
WHERE b IS NOT NULL PRIMARY KEY (b, a)");
 +        createView("mv1", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s 
WHERE a IS NOT NULL AND b IS NOT NULL PRIMARY KEY (b, a)");
  
          updateView("INSERT INTO %s (a, b) VALUES (?, ?)", 1, 1);
          updateView("INSERT INTO %s (a, b) VALUES (?, ?)", 1, 2);
          updateView("INSERT INTO %s (a, b) VALUES (?, ?)", 1, 3);
  
--        ResultSet mvRows = executeNet(protocolVersion, "SELECT a, b FROM 
mv1");
--        assertRowsNet(protocolVersion, mvRows,
--                      row(1, 1),
--                      row(1, 2),
--                      row(1, 3));
++        ResultSet mvRows = executeNet("SELECT a, b FROM mv1");
++        assertRowsNet(mvRows, row(1, 1), row(1, 2), row(1, 3));
  
          updateView(String.format("BEGIN UNLOGGED BATCH " +
                                   "DELETE FROM %s WHERE a = 1 AND b > 1 AND b 
< 3;" +
                                   "DELETE FROM %s WHERE a = 1;" +
                                   "APPLY BATCH", currentTable(), 
currentTable()));
  
--        mvRows = executeNet(protocolVersion, "SELECT a, b FROM mv1");
--        assertRowsNet(protocolVersion, mvRows);
++        mvRows = executeNet("SELECT a, b FROM mv1");
++        assertRowsNet(mvRows);
      }
  
      @Test
@@@ -1110,15 -1141,15 +1114,15 @@@
                      "b int," +
                      "PRIMARY KEY (a, b))");
  
--        executeNet(protocolVersion, "USE " + keyspace());
++        executeNet("USE " + keyspace());
  
          // Cannot use SELECT *, as those are always handled by the includeAll 
shortcut in View.updateAffectsView
 -        createView("mv1", "CREATE MATERIALIZED VIEW %s AS SELECT a, b FROM 
%%s WHERE b IS NOT NULL PRIMARY KEY (b, a)");
 +        createView("mv1", "CREATE MATERIALIZED VIEW %s AS SELECT a, b FROM 
%%s WHERE a IS NOT NULL AND b IS NOT NULL PRIMARY KEY (b, a)");
  
          updateView("INSERT INTO %s (a, b) VALUES (?, ?)", 1, 1);
  
--        ResultSet mvRows = executeNet(protocolVersion, "SELECT a, b FROM 
mv1");
--        assertRowsNet(protocolVersion, mvRows, row(1, 1));
++        ResultSet mvRows = executeNet("SELECT a, b FROM mv1");
++        assertRowsNet(mvRows, row(1, 1));
      }
  
      @Test
@@@ -1129,15 -1160,15 +1133,15 @@@
                      "b int," +
                      "PRIMARY KEY ((a, b)))");
  
--        executeNet(protocolVersion, "USE " + keyspace());
++        executeNet("USE " + keyspace());
  
          // Cannot use SELECT *, as those are always handled by the includeAll 
shortcut in View.updateAffectsView
          createView("mv1", "CREATE MATERIALIZED VIEW %s AS SELECT a, b FROM 
%%s WHERE a IS NOT NULL AND b IS NOT NULL PRIMARY KEY (b, a)");
  
          updateView("INSERT INTO %s (a, b) VALUES (?, ?)", 1, 1);
  
--        ResultSet mvRows = executeNet(protocolVersion, "SELECT a, b FROM 
mv1");
--        assertRowsNet(protocolVersion, mvRows, row(1, 1));
++        ResultSet mvRows = executeNet("SELECT a, b FROM mv1");
++        assertRowsNet(mvRows, row(1, 1));
      }
  
      @Test
@@@ -1150,19 -1181,19 +1154,19 @@@
                      "d int," +
                      "PRIMARY KEY (a, b))");
  
--        executeNet(protocolVersion, "USE " + keyspace());
++        executeNet("USE " + keyspace());
          createView("mv1", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s 
WHERE a IS NOT NULL AND b IS NOT NULL AND d IS NOT NULL PRIMARY KEY (a, d, b)");
  
          updateView("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 0, 0, 
0, 0);
--        ResultSet mvRows = executeNet(protocolVersion, "SELECT a, d, b, c 
FROM mv1");
--        assertRowsNet(protocolVersion, mvRows, row(0, 0, 0, 0));
++        ResultSet mvRows = executeNet("SELECT a, d, b, c FROM mv1");
++        assertRowsNet(mvRows, row(0, 0, 0, 0));
  
          updateView("DELETE c FROM %s WHERE a = ? AND b = ?", 0, 0);
--        mvRows = executeNet(protocolVersion, "SELECT a, d, b, c FROM mv1");
--        assertRowsNet(protocolVersion, mvRows, row(0, 0, 0, null));
++        mvRows = executeNet("SELECT a, d, b, c FROM mv1");
++        assertRowsNet(mvRows, row(0, 0, 0, null));
  
          updateView("DELETE d FROM %s WHERE a = ? AND b = ?", 0, 0);
--        mvRows = executeNet(protocolVersion, "SELECT a, d, b FROM mv1");
++        mvRows = executeNet("SELECT a, d, b FROM mv1");
          assertTrue(mvRows.isExhausted());
      }
  
@@@ -1176,19 -1207,19 +1180,19 @@@
                      "d int," +
                      "PRIMARY KEY (a, b))");
  
--        executeNet(protocolVersion, "USE " + keyspace());
++        executeNet("USE " + keyspace());
          createView("mv1", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s 
WHERE a IS NOT NULL AND b IS NOT NULL AND d IS NOT NULL PRIMARY KEY (d, a, b)");
  
          updateView("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 0, 0, 
0, 0);
--        ResultSet mvRows = executeNet(protocolVersion, "SELECT a, d, b, c 
FROM mv1");
--        assertRowsNet(protocolVersion, mvRows, row(0, 0, 0, 0));
++        ResultSet mvRows = executeNet("SELECT a, d, b, c FROM mv1");
++        assertRowsNet(mvRows, row(0, 0, 0, 0));
  
          updateView("DELETE c FROM %s WHERE a = ? AND b = ?", 0, 0);
--        mvRows = executeNet(protocolVersion, "SELECT a, d, b, c FROM mv1");
--        assertRowsNet(protocolVersion, mvRows, row(0, 0, 0, null));
++        mvRows = executeNet("SELECT a, d, b, c FROM mv1");
++        assertRowsNet(mvRows, row(0, 0, 0, null));
  
          updateView("DELETE d FROM %s WHERE a = ? AND b = ?", 0, 0);
--        mvRows = executeNet(protocolVersion, "SELECT a, d, b FROM mv1");
++        mvRows = executeNet("SELECT a, d, b FROM mv1");
          assertTrue(mvRows.isExhausted());
      }
  
@@@ -1201,20 -1232,20 +1205,20 @@@
                      "c map<int, text>," +
                      "PRIMARY KEY (a))");
  
--        executeNet(protocolVersion, "USE " + keyspace());
 -        createView("mvmap", "CREATE MATERIALIZED VIEW %s AS SELECT a, b FROM 
%%s WHERE b IS NOT NULL PRIMARY KEY (b, a)");
++        executeNet("USE " + keyspace());
 +        createView("mvmap", "CREATE MATERIALIZED VIEW %s AS SELECT a, b FROM 
%%s WHERE a IS NOT NULL AND b IS NOT NULL PRIMARY KEY (b, a)");
  
          updateView("INSERT INTO %s (a, b) VALUES (?, ?)", 0, 0);
--        ResultSet mvRows = executeNet(protocolVersion, "SELECT a, b FROM 
mvmap WHERE b = ?", 0);
--        assertRowsNet(protocolVersion, mvRows, row(0, 0));
++        ResultSet mvRows = executeNet("SELECT a, b FROM mvmap WHERE b = ?", 
0);
++        assertRowsNet(mvRows, row(0, 0));
  
          updateView("INSERT INTO %s (a, b, c) VALUES (?, ?, ?)", 1, 1, map(1, 
"1"));
--        mvRows = executeNet(protocolVersion, "SELECT a, b FROM mvmap WHERE b 
= ?", 1);
--        assertRowsNet(protocolVersion, mvRows, row(1, 1));
++        mvRows = executeNet("SELECT a, b FROM mvmap WHERE b = ?", 1);
++        assertRowsNet(mvRows, row(1, 1));
  
          updateView("INSERT INTO %s (a, b, c) VALUES (?, ?, ?)", 0, 0, map(0, 
"0"));
--        mvRows = executeNet(protocolVersion, "SELECT a, b FROM mvmap WHERE b 
= ?", 0);
--        assertRowsNet(protocolVersion, mvRows, row(0, 0));
++        mvRows = executeNet("SELECT a, b FROM mvmap WHERE b = ?", 0);
++        assertRowsNet(mvRows, row(0, 0));
      }
  
      @Test
@@@ -1253,7 -1283,7 +1257,7 @@@
      {
          createTable("CREATE TABLE %s (id1 int, id2 int, v1 text, v2 text, 
PRIMARY KEY (id1, id2))");
  
--        executeNet(protocolVersion, "USE " + keyspace());
++        executeNet("USE " + keyspace());
  
          createView("mv",
                     "CREATE MATERIALIZED VIEW %s AS" +
@@@ -1265,16 -1295,16 +1269,16 @@@
  
          execute("INSERT INTO %s (id1, id2, v1, v2) VALUES (?, ?, ?, ?)", 0, 
1, "foo", "bar");
  
--        assertRowsNet(protocolVersion, executeNet(protocolVersion, "SELECT * 
FROM %s"), row(0, 1, "foo", "bar"));
--        assertRowsNet(protocolVersion, executeNet(protocolVersion, "SELECT * 
FROM mv"), row(0, "foo", 1, "bar"));
++        assertRowsNet(executeNet("SELECT * FROM %s"), row(0, 1, "foo", 
"bar"));
++        assertRowsNet(executeNet("SELECT * FROM mv"), row(0, "foo", 1, 
"bar"));
  
--        executeNet(protocolVersion, "UPDATE %s SET v1=? WHERE id1=? AND 
id2=?", null, 0, 1);
--        assertRowsNet(protocolVersion, executeNet(protocolVersion, "SELECT * 
FROM %s"), row(0, 1, null, "bar"));
--        assertRowsNet(protocolVersion, executeNet(protocolVersion, "SELECT * 
FROM mv"));
++        executeNet("UPDATE %s SET v1=? WHERE id1=? AND id2=?", null, 0, 1);
++        assertRowsNet(executeNet("SELECT * FROM %s"), row(0, 1, null, "bar"));
++        assertRowsNet(executeNet("SELECT * FROM mv"));
  
--        executeNet(protocolVersion, "UPDATE %s SET v2=? WHERE id1=? AND 
id2=?", "rab", 0, 1);
--        assertRowsNet(protocolVersion, executeNet(protocolVersion, "SELECT * 
FROM %s"), row(0, 1, null, "rab"));
--        assertRowsNet(protocolVersion, executeNet(protocolVersion, "SELECT * 
FROM mv"));
++        executeNet("UPDATE %s SET v2=? WHERE id1=? AND id2=?", "rab", 0, 1);
++        assertRowsNet(executeNet("SELECT * FROM %s"), row(0, 1, null, "rab"));
++        assertRowsNet(executeNet("SELECT * FROM mv"));
      }
  
      @Test
@@@ -1282,7 -1312,7 +1286,7 @@@
      {
          createTable("CREATE TABLE %s (\"token\" int PRIMARY KEY, \"keyspace\" 
int)");
  
--        executeNet(protocolVersion, "USE " + keyspace());
++        executeNet("USE " + keyspace());
  
          createView("mv",
                     "CREATE MATERIALIZED VIEW %s AS" +
@@@ -1293,19 -1323,19 +1297,19 @@@
  
          execute("INSERT INTO %s (\"token\", \"keyspace\") VALUES (?, ?)", 0, 
1);
  
--        assertRowsNet(protocolVersion, executeNet(protocolVersion, "SELECT * 
FROM %s"), row(0, 1));
--        assertRowsNet(protocolVersion, executeNet(protocolVersion, "SELECT * 
FROM mv"), row(1, 0));
++        assertRowsNet(executeNet("SELECT * FROM %s"), row(0, 1));
++        assertRowsNet(executeNet("SELECT * FROM mv"), row(1, 0));
      }
  
      public void testCreateMvWithTTL() throws Throwable
      {
          createTable("CREATE TABLE %s (" +
 -                "k int PRIMARY KEY, " +
 -                "c int, " +
 -                "val int) WITH default_time_to_live = 60");
 +                    "k int PRIMARY KEY, " +
 +                    "c int, " +
 +                    "val int) WITH default_time_to_live = 60");
  
          execute("USE " + keyspace());
--        executeNet(protocolVersion, "USE " + keyspace());
++        executeNet("USE " + keyspace());
  
          // Must NOT include "default_time_to_live" for Materialized View 
creation
          try
@@@ -1331,8 -1361,8 +1335,8 @@@
          // Must NOT include "default_time_to_live" on alter Materialized View
          try
          {
--            executeNet(protocolVersion, "ALTER MATERIALIZED VIEW %s WITH 
default_time_to_live = 30");
 -            fail("Should fail if TTL is provided while altering materialized 
view");
++            executeNet("ALTER MATERIALIZED VIEW %s WITH default_time_to_live 
= 30");
 +            Assert.fail("Should fail if TTL is provided while altering 
materialized view");
          }
          catch (Exception e)
          {
@@@ -1348,9 -1379,8 +1352,9 @@@
                      "PRIMARY KEY(k,c))");
  
          execute("USE " + keyspace());
--        executeNet(protocolVersion, "USE " + keyspace());
++        executeNet("USE " + keyspace());
  
 +        
CompactionManager.instance.setConcurrentViewBuilders(concurrentViewBuilders);
          CompactionManager.instance.setCoreCompactorThreads(1);
          CompactionManager.instance.setMaximumCompactorThreads(1);
          ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
@@@ -1433,7 -1465,7 +1437,7 @@@
      {
          createTable("CREATE TABLE %s (k int PRIMARY KEY, v int)");
  
--        executeNet(protocolVersion, "USE " + keyspace());
++        executeNet("USE " + keyspace());
  
          boolean enableMaterializedViews = 
DatabaseDescriptor.getEnableMaterializedViews();
          try
@@@ -1454,19 -1486,75 +1458,76 @@@
      }
  
      @Test
 -    public void viewOnCompactTableTest() throws Throwable
 +    public void testQuotedIdentifiersInWhereClause() throws Throwable
      {
 -        createTable("CREATE TABLE %s (a int, b int, v int, PRIMARY KEY (a, 
b)) WITH COMPACT STORAGE");
 -        executeNet(protocolVersion, "USE " + keyspace());
 -        try
 -        {
 -            createView("mv",
 -                       "CREATE MATERIALIZED VIEW %s AS SELECT a, b, value 
FROM %%s WHERE b IS NOT NULL PRIMARY KEY (b, a)");
 -            fail("Should have thrown an exception");
 -        }
 -        catch (Throwable t)
 -        {
 -            Assert.assertEquals("Undefined column name value",
 -                                t.getMessage());
 -        }
 +        createTable("CREATE TABLE %s (\"theKey\" int, \"theClustering_1\" 
int, \"theClustering_2\" int, \"theValue\" int, PRIMARY KEY (\"theKey\", 
\"theClustering_1\", \"theClustering_2\"))");
 +
-         executeNet(protocolVersion, "USE " + keyspace());
++        executeNet("USE " + keyspace());
 +
 +        createView("view1", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s 
WHERE \"theKey\" IS NOT NULL AND \"theClustering_1\" IS NOT NULL AND 
\"theClustering_2\" IS NOT NULL AND \"theValue\" IS NOT NULL  PRIMARY KEY 
(\"theKey\", \"theClustering_1\", \"theClustering_2\");");
 +        createView("view2", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s 
WHERE \"theKey\" IS NOT NULL AND (\"theClustering_1\", \"theClustering_2\") = 
(1, 2) AND \"theValue\" IS NOT NULL  PRIMARY KEY (\"theKey\", 
\"theClustering_1\", \"theClustering_2\");");
 +        createView("view3", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s 
WHERE token(\"theKey\") > token(1) AND \"theClustering_1\" = 1 AND 
\"theClustering_2\" > 2 AND \"theValue\" IS NOT NULL  PRIMARY KEY (\"theKey\", 
\"theClustering_1\", \"theClustering_2\");");
 +
 +        assertRows(execute("SELECT where_clause FROM system_schema.views"),
 +                   row("\"theKey\" IS NOT NULL AND \"theClustering_1\" IS NOT 
NULL AND \"theClustering_2\" IS NOT NULL AND \"theValue\" IS NOT NULL"),
 +                   row("\"theKey\" IS NOT NULL AND (\"theClustering_1\", 
\"theClustering_2\") = (1, 2) AND \"theValue\" IS NOT NULL"),
 +                   row("token(\"theKey\") > token(1) AND \"theClustering_1\" 
= 1 AND \"theClustering_2\" > 2 AND \"theValue\" IS NOT NULL"));
      }
+ 
+     /**
+      * Tests that truncating a table stops the ongoing builds of its 
materialized views,
+      * so they don't write into the MV data that has been truncated in the 
base table.
+      *
+      * See CASSANDRA-16567 for further details.
+      */
+     @Test
+     @BMRules(rules = {
 -    @BMRule(name = "Block view builder",
 -    targetClass = "ViewBuilder",
++    @BMRule(name = "Block view builder tasks",
++    targetClass = "ViewBuilderTask",
+     targetMethod = "buildKey",
+     action = 
"com.google.common.util.concurrent.Uninterruptibles.awaitUninterruptibly" +
+              "(org.apache.cassandra.cql3.ViewTest.blockViewBuild);"),
 -    @BMRule(name = "Unblock view builder",
++    @BMRule(name = "Unblock view builder tasks",
+     targetClass = "ColumnFamilyStore",
+     targetMethod = "truncateBlocking",
+     action = "org.apache.cassandra.cql3.ViewTest.blockViewBuild.countDown();")
+     })
+     public void testTruncateWhileBuilding() throws Throwable
+     {
+         createTable("CREATE TABLE %s (k int, c int, v int, PRIMARY KEY(k, 
c))");
+         execute("USE " + keyspace());
 -        executeNet(protocolVersion, "USE " + keyspace());
++        executeNet("USE " + keyspace());
+         execute("INSERT INTO %s (k, c, v) VALUES (?, ?, ?)", 0, 0, 0);
+         createView("mv",
+                    "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s " +
+                    "WHERE k IS NOT NULL AND c IS NOT NULL AND v IS NOT NULL " 
+
+                    "PRIMARY KEY (v, c, k)");
+ 
 -        // check that the delayed view builder is either running or pending,
 -        // and that it hasn't written anything yet
 -        assertThat(runningCompactions()).isPositive();
++        // check that the delayed view builder tasks are either running or 
pending,
++        // and that they haven't written anything yet
++        assertThat(runningViewBuilds()).isPositive();
+         assertFalse(SystemKeyspace.isViewBuilt(KEYSPACE, "mv"));
+         waitForViewMutations();
+         assertRows(execute("SELECT * FROM mv"));
+ 
 -        // truncate the view, this should unblock the view builder, wait for 
its cancellation,
++        // truncate the view, this should unblock the view builders, wait for 
their cancellation,
+         // drop the sstables and, finally, start a new view build
+         updateView("TRUNCATE %s");
+ 
+         // check that there aren't any rows after truncating
+         assertRows(execute("SELECT * FROM mv"));
+ 
 -        // check that the view builder finishes and that the view is still 
empty after that
 -        Util.spinAssertEquals(0, ViewTest::runningCompactions, 60);
++        // check that the view builder tasks finish and that the view is 
still empty after that
++        Awaitility.await().untilAsserted(() -> assertEquals(0, 
runningViewBuilds()));
+         assertTrue(SystemKeyspace.isViewBuilt(KEYSPACE, "mv"));
+         waitForViewMutations();
+         assertRows(execute("SELECT * FROM mv"));
+     }
+ 
 -    private static int runningCompactions()
++    private static int runningViewBuilds()
+     {
 -        return CompactionManager.instance.getPendingTasks() + 
CompactionManager.instance.getActiveCompactions();
++        return Metrics.getThreadPoolMetrics("ViewBuildExecutor")
++                      .map(p -> p.activeTasks.getValue() + 
p.pendingTasks.getValue())
++                      .orElse(0);
+     }
 -}
 +}

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to