This is an automated email from the ASF dual-hosted git repository.

adelapena pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 2723c91878cfd7005a53f6118015c484dacc0f32
Merge: b3842de5cf 2873c91269
Author: Andrés de la Peña <a.penya.gar...@gmail.com>
AuthorDate: Fri Apr 22 17:30:22 2022 +0100

    Merge branch 'cassandra-4.0' into trunk

 .../test/ReadRepairCollectionQueriesTest.java      |  236 ++++
 .../distributed/test/ReadRepairInQueriesTest.java  |  247 ++++
 .../test/ReadRepairPointQueriesTest.java           |   79 ++
 .../distributed/test/ReadRepairQueryTester.java    |  279 +++++
 .../distributed/test/ReadRepairQueryTypesTest.java | 1191 --------------------
 .../test/ReadRepairRangeQueriesTest.java           |  261 +++++
 .../test/ReadRepairSliceQueriesTest.java           |  145 +++
 .../test/ReadRepairUnrestrictedQueriesTest.java    |  116 ++
 8 files changed, 1363 insertions(+), 1191 deletions(-)

diff --cc 
test/distributed/org/apache/cassandra/distributed/test/ReadRepairQueryTester.java
index 0000000000,10bf05021b..26516104fb
mode 000000,100644..100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/ReadRepairQueryTester.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/ReadRepairQueryTester.java
@@@ -1,0 -1,280 +1,279 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.distributed.test;
+ 
+ import java.io.IOException;
+ import java.util.ArrayList;
+ import java.util.Collection;
+ import java.util.List;
+ 
+ import org.junit.AfterClass;
+ import org.junit.BeforeClass;
+ import org.junit.runner.RunWith;
+ import org.junit.runners.Parameterized;
+ 
+ import org.apache.cassandra.distributed.Cluster;
+ import org.apache.cassandra.service.reads.repair.ReadRepairStrategy;
+ 
 -import static java.util.concurrent.TimeUnit.MINUTES;
+ import static 
org.apache.cassandra.distributed.shared.AssertUtils.assertEquals;
+ import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
+ import static 
org.apache.cassandra.service.reads.repair.ReadRepairStrategy.NONE;
+ 
+ /**
+  * Base class for tests around read repair functionality with different query 
types and schemas.
+  * <p>
+  * Each test verifies that its tested query triggers read repair propagating 
mismatched rows/columns and row/column
+  * deletions. They also verify that the selected rows and columns are 
propagated through read repair on missmatch,
+  * and that unselected rows/columns are not repaired.
+  * <p>
+  * The tests are parameterized for:
+  * <ul>
+  *     <li><Both {@code NONE} and {@code BLOCKING} read repair stratregies/li>
+  *     <li>Data to be repaired residing on the query coordinator or a 
replica</li>
+  *     <li>Data to be repaired residing on memtables or flushed to 
sstables</li>
+  * </ul>
+  * <p>
+  * All derived tests follow a similar pattern:
+  * <ul>
+  *     <li>Create a keyspace with RF=2 and a table</li>
+  *     <li>Insert some data in only one of the nodes</li>
+  *     <li>Run the tested read query selecting a subset of the inserted 
columns with CL=ALL</li>
+  *     <li>Verify that the previous read has triggered read repair 
propagating only the queried columns</li>
+  *     <li>Run the tested read query again but this time selecting all the 
columns</li>
+  *     <li>Verify that the previous read has triggered read repair 
propagating the rest of the queried rows</li>
+  *     <li>Delete one of the involved columns in just one node</li>
+  *     <li>Run the tested read query again but this time selecting a column 
different to the deleted one</li>
+  *     <li>Verify that the previous read hasn't propagated the column 
deletion</li>
+  *     <li>Run the tested read query again selecting all the columns</li>
+  *     <li>Verify that the previous read has triggered read repair 
propagating the column deletion</li>
+  *     <li>Delete one of the involved rows in just one node</li>
+  *     <li>Run the tested read query again selecting all the columns</li>
+  *     <li>Verify that the previous read has triggered read repair 
propagating the row deletions</li>
+  *     <li>Verify the final status of each node and drop the table</li>
+  * </ul>
+  */
+ @RunWith(Parameterized.class)
+ public abstract class ReadRepairQueryTester extends TestBaseImpl
+ {
+     private static final int NUM_NODES = 2;
+ 
+     /**
+      * The read repair strategy to be used
+      */
+     @Parameterized.Parameter
+     public ReadRepairStrategy strategy;
+ 
+     /**
+      * The node to be used as coordinator
+      */
+     @Parameterized.Parameter(1)
+     public int coordinator;
+ 
+     /**
+      * Whether to flush data after mutations
+      */
+     @Parameterized.Parameter(2)
+     public boolean flush;
+ 
+     /**
+      * Whether paging is used for the distributed queries
+      */
+     @Parameterized.Parameter(3)
+     public boolean paging;
+ 
+     @Parameterized.Parameters(name = "{index}: strategy={0} coordinator={1} 
flush={2} paging={3}")
+     public static Collection<Object[]> data()
+     {
+         List<Object[]> result = new ArrayList<>();
+         for (int coordinator = 1; coordinator <= NUM_NODES; coordinator++)
+             for (boolean flush : BOOLEANS)
+                 for (boolean paging : BOOLEANS)
+                     result.add(new Object[]{ ReadRepairStrategy.BLOCKING, 
coordinator, flush, paging });
+         result.add(new Object[]{ ReadRepairStrategy.NONE, 1, false, false });
+         return result;
+     }
+ 
+     private static Cluster cluster;
+ 
+     @BeforeClass
+     public static void setupCluster() throws IOException
+     {
+         cluster = init(Cluster.build(NUM_NODES)
 -                              .withConfig(config -> 
config.set("read_request_timeout_in_ms", MINUTES.toMillis(1))
 -                                                          
.set("write_request_timeout_in_ms", MINUTES.toMillis(1)))
++                              .withConfig(config -> 
config.set("read_request_timeout", "1m")
++                                                          
.set("write_request_timeout", "1m"))
+                               .start());
+         cluster.schemaChange(withKeyspace("CREATE TYPE %s.udt (x int, y 
int)"));
+     }
+ 
+     @AfterClass
+     public static void teardownCluster()
+     {
+         if (cluster != null)
+             cluster.close();
+     }
+ 
+     protected Tester tester(String restriction)
+     {
+         return new Tester(restriction, cluster, strategy, coordinator, flush, 
paging);
+     }
+ 
+     protected static class Tester extends ReadRepairTester<Tester>
+     {
+         private final String restriction; // the tested CQL query WHERE 
restriction
+         private final String allColumnsQuery; // a SELECT * query for the 
table using the tested restriction
+ 
+         Tester(String restriction, Cluster cluster, ReadRepairStrategy 
strategy, int coordinator, boolean flush, boolean paging)
+         {
+             super(cluster, strategy, coordinator, flush, paging, false);
+             this.restriction = restriction;
+ 
+             allColumnsQuery = String.format("SELECT * FROM %s %s", 
qualifiedTableName, restriction);
+         }
+ 
+         @Override
+         Tester self()
+         {
+             return this;
+         }
+ 
+         /**
+          * Runs the tested query with CL=ALL selectig only the specified 
columns and verifies that it returns the
+          * specified rows. Then, it runs the query again selecting all the 
columns, and verifies that the first query
+          * execution only propagated the selected columns, and that the 
second execution propagated everything.
+          *
+          * @param columns                  the selected columns
+          * @param columnsQueryRepairedRows the expected number of repaired 
rows when querying only the selected columns
+          * @param rowsQueryRepairedRows    the expected number of repaired 
rows when querying all the columns
+          * @param columnsQueryResults      the rows returned by the query for 
a subset of columns
+          * @param node1Rows                the rows in the first node, which 
is the one with the most updated data
+          * @param node2Rows                the rows in the second node, which 
is the one meant to receive the RR writes
+          */
+         Tester queryColumns(String columns,
+                             long columnsQueryRepairedRows,
+                             long rowsQueryRepairedRows,
+                             Object[][] columnsQueryResults,
+                             Object[][] node1Rows,
+                             Object[][] node2Rows)
+         {
+             // query only the selected columns with CL=ALL to trigger partial 
read repair on that column
+             String columnsQuery = String.format("SELECT %s FROM %s %s", 
columns, qualifiedTableName, restriction);
+             assertRowsDistributed(columnsQuery, columnsQueryRepairedRows, 
columnsQueryResults);
+ 
+             // query entire rows to repair the rest of the columns, that 
might trigger new repairs for those columns
+             return verifyQuery(allColumnsQuery, rowsQueryRepairedRows, 
node1Rows, node2Rows);
+         }
+ 
+         /**
+          * Executes the specified column deletion on just one node. Then it 
runs the tested query with CL=ALL selectig
+          * only the specified columns (which are expected to be different to 
the deleted one) and verifies that it
+          * returns the specified rows. Then it runs the tested query again, 
this time selecting all the columns, to
+          * verify that the previous query didn't propagate the column 
deletion.
+          *
+          * @param columnDeletion           the deletion query for a first node
+          * @param columns                  a subset of the table columns for 
the first distributed query
+          * @param columnsQueryRepairedRows the expected number of repaired 
rows when querying only the selected columns
+          * @param rowsQueryRepairedRows    the expected number of repaired 
rows when querying all the columns
+          * @param columnsQueryResults      the rows returned by the query for 
a subset of columns
+          * @param node1Rows                the rows in the first node, which 
is the one with the most updated data
+          * @param node2Rows                the rows in the second node, which 
is the one meant to receive the RR writes
+          */
+         Tester deleteColumn(String columnDeletion,
+                             String columns,
+                             long columnsQueryRepairedRows,
+                             long rowsQueryRepairedRows,
+                             Object[][] columnsQueryResults,
+                             Object[][] node1Rows,
+                             Object[][] node2Rows)
+         {
+             assert restriction != null;
+ 
+             // execute the column deletion on just one node
+             mutate(1, columnDeletion);
+ 
+             // verify the columns read with CL=ALL, in most cases this won't 
propagate the previous column deletion if
+             // the deleted and read columns don't overlap
+             return queryColumns(columns,
+                                 columnsQueryRepairedRows,
+                                 rowsQueryRepairedRows,
+                                 columnsQueryResults,
+                                 node1Rows,
+                                 node2Rows);
+         }
+ 
+         /**
+          * Executes the specified row deletion on just one node and verifies 
the tested query, to ensure that the tested
+          * query propagates the row deletion.
+          */
+         Tester deleteRows(String rowDeletion, long repairedRows, Object[][] 
node1Rows, Object[][] node2Rows)
+         {
+             mutate(1, rowDeletion);
+             return verifyQuery(allColumnsQuery, repairedRows, node1Rows, 
node2Rows);
+         }
+ 
+         Tester mutate(String... queries)
+         {
+             return mutate(1, queries);
+         }
+ 
+         private Tester verifyQuery(String query, long expectedRepairedRows, 
Object[][] node1Rows, Object[][] node2Rows)
+         {
+             // verify the per-replica status before running the query 
distributedly
+             assertRows(cluster.get(1).executeInternal(query), node1Rows);
+             assertRows(cluster.get(2).executeInternal(query), strategy == 
NONE ? EMPTY_ROWS : node2Rows);
+ 
+             // now, run the query with CL=ALL to reconcile and repair the 
replicas
+             assertRowsDistributed(query, expectedRepairedRows, node1Rows);
+ 
+             // run the query locally again to verify that the distributed 
query has repaired everything
+             assertRows(cluster.get(1).executeInternal(query), node1Rows);
+             assertRows(cluster.get(2).executeInternal(query), strategy == 
NONE ? EMPTY_ROWS : node1Rows);
+ 
+             return this;
+         }
+ 
+         /**
+          * Verifies that the replicas are empty and drop the table.
+          */
+         void tearDown()
+         {
+             tearDown(0, rows(), rows());
+         }
+ 
+         /**
+          * Verifies the final status of the nodes with an unrestricted query, 
to ensure that the main tested query
+          * hasn't triggered any unexpected repairs. Then, it verifies that 
the node that hasn't been used as coordinator
+          * hasn't triggered any unexpected repairs. Finally, it drops the 
table.
+          */
+         void tearDown(long repairedRows, Object[][] node1Rows, Object[][] 
node2Rows)
+         {
+             verifyQuery("SELECT * FROM " + qualifiedTableName, repairedRows, 
node1Rows, node2Rows);
+             for (int n = 1; n <= cluster.size(); n++)
+             {
+                 if (n == coordinator)
+                     continue;
+ 
+                 long requests = readRepairRequestsCount(n);
+                 String message = String.format("No read repair requests were 
expected in not-coordinator nodes, " +
+                                                "but found %d requests in node 
%d", requests, n);
+                 assertEquals(message, 0, requests);
+             }
+             schemaChange("DROP TABLE " + qualifiedTableName);
+         }
+     }
+ }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to