This is an automated email from the ASF dual-hosted git repository. bdeggleston pushed a commit to branch cep-15-accord in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-15-accord by this push: new 6d30457aed fix read repair tests 6d30457aed is described below commit 6d30457aed5676c0bb96fd1bf603c6de66174032 Author: Blake Eggleston <bl...@ultrablake.com> AuthorDate: Mon Apr 1 14:12:00 2024 -0700 fix read repair tests --- .../cassandra/distributed/test/ReadRepairTest.java | 125 ++++++++++----------- 1 file changed, 58 insertions(+), 67 deletions(-) diff --git a/test/distributed/org/apache/cassandra/distributed/test/ReadRepairTest.java b/test/distributed/org/apache/cassandra/distributed/test/ReadRepairTest.java index 8d253fb7ef..4964af1cd4 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/ReadRepairTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/ReadRepairTest.java @@ -75,23 +75,9 @@ import static org.junit.Assert.fail; public class ReadRepairTest extends TestBaseImpl { - private static Cluster cluster; private static int tableNum = 0; private String tableName; - @BeforeClass - public static void beforeClass() throws Throwable - { - cluster = init(Cluster.create(3, c -> c.with(Feature.GOSSIP, Feature.NETWORK))); - } - - @AfterClass - public static void afterClass() throws Throwable - { - if (cluster != null) - cluster.close(); - } - private void incrementTableName() { tableName = "tbl" + tableNum++; @@ -128,65 +114,68 @@ public class ReadRepairTest extends TestBaseImpl testReadRepair(strategy, false); } - private void testReadRepair(ReadRepairStrategy strategy, boolean brrThroughAccord) throws Throwable - { - TransactionalMode transactionalMode = brrThroughAccord ? TransactionalMode.unsafe_writes : TransactionalMode.off; - cluster.schemaChange(withKeyspace("CREATE TABLE %s." + tableName + " (k int, c int, v int, PRIMARY KEY (k, c)) WITH transactional_mode='" + transactionalMode.toString().toLowerCase() + '\'' + - String.format(" AND read_repair='%s'", strategy))); - AccordTestBase.ensureTableIsAccordManaged(cluster, KEYSPACE, "t"); - - Object[] row = row(1, 1, 1); - String insertQuery = withKeyspace("INSERT INTO %s." + tableName + " (k, c, v) VALUES (?, ?, ?)"); - String selectQuery = withKeyspace("SELECT * FROM %s." + tableName + " WHERE k=1"); - - // insert data in two nodes, simulating a quorum write that has missed one node - cluster.get(1).executeInternal(insertQuery, row); - cluster.get(2).executeInternal(insertQuery, row); - - // verify that the third node doesn't have the row - assertRows(cluster.get(3).executeInternal(selectQuery)); - - // read with CL=QUORUM to trigger read repair, force 3 to be involved in the read so that read repair - // will occur - Filter blockReadFromOne = cluster.filters().inbound().from(3).to(1).verbs(READ_REQ.id).drop(); - assertRows(cluster.coordinator(3).execute(selectQuery, QUORUM), row); - blockReadFromOne.off(); - - // verify whether the coordinator has the repaired row depending on the read repair strategy - if (strategy == ReadRepairStrategy.NONE) + private void testReadRepair(ReadRepairStrategy strategy, boolean brrThroughAccord) throws Throwable { + try (Cluster cluster = init(Cluster.create(3, c -> c.with(Feature.GOSSIP, Feature.NETWORK)))) { + TransactionalMode transactionalMode = brrThroughAccord ? TransactionalMode.unsafe_writes : TransactionalMode.off; + cluster.schemaChange(withKeyspace("CREATE TABLE %s." + tableName + " (k int, c int, v int, PRIMARY KEY (k, c)) WITH transactional_mode='" + transactionalMode.toString().toLowerCase() + '\'' + + String.format(" AND read_repair='%s'", strategy))); + AccordTestBase.ensureTableIsAccordManaged(cluster, KEYSPACE, "t"); + + Object[] row = row(1, 1, 1); + String insertQuery = withKeyspace("INSERT INTO %s." + tableName + " (k, c, v) VALUES (?, ?, ?)"); + String selectQuery = withKeyspace("SELECT * FROM %s." + tableName + " WHERE k=1"); + + // insert data in two nodes, simulating a quorum write that has missed one node + cluster.get(1).executeInternal(insertQuery, row); + cluster.get(2).executeInternal(insertQuery, row); + + // verify that the third node doesn't have the row assertRows(cluster.get(3).executeInternal(selectQuery)); - else - assertRows(cluster.get(3).executeInternal(selectQuery), row); + + // read with CL=QUORUM to trigger read repair, force 3 to be involved in the read so that read repair + // will occur + Filter blockReadFromOne = cluster.filters().inbound().from(3).to(1).verbs(READ_REQ.id).drop(); + assertRows(cluster.coordinator(3).execute(selectQuery, QUORUM), row); + blockReadFromOne.off(); + + // verify whether the coordinator has the repaired row depending on the read repair strategy + if (strategy == ReadRepairStrategy.NONE) + assertRows(cluster.get(3).executeInternal(selectQuery)); + else + assertRows(cluster.get(3).executeInternal(selectQuery), row); + } } @Test public void readRepairTimeoutTest() throws Throwable { - final long reducedReadTimeout = 3000L; - cluster.forEach(i -> i.runOnInstance(() -> DatabaseDescriptor.setReadRpcTimeout(reducedReadTimeout))); - cluster.schemaChange("CREATE TABLE " + KEYSPACE + "." + tableName + " (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='blocking'"); - cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + "." + tableName + " (pk, ck, v) VALUES (1, 1, 1)"); - cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + "." + tableName + " (pk, ck, v) VALUES (1, 1, 1)"); - assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + "." + tableName + " WHERE pk = 1")); - cluster.verbs(READ_REPAIR_RSP).to(1).drop(); - final long start = currentTimeMillis(); - try - { - cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + "." + tableName + " WHERE pk = 1", ConsistencyLevel.ALL); - fail("Read timeout expected but it did not occur"); - } - catch (Exception ex) - { - // the containing exception class was loaded by another class loader. Comparing the message as a workaround to assert the exception - assertTrue(ex.getClass().toString().contains("ReadTimeoutException")); - long actualTimeTaken = currentTimeMillis() - start; - long magicDelayAmount = 100L; // it might not be the best way to check if the time taken is around the timeout value. - // Due to the delays, the actual time taken from client perspective is slighly more than the timeout value - assertTrue(actualTimeTaken > reducedReadTimeout); - // But it should not exceed too much - assertTrue(actualTimeTaken < reducedReadTimeout + magicDelayAmount); - assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + "." + tableName + " WHERE pk = 1"), - row(1, 1, 1)); // the partition happened when the repaired node sending back ack. The mutation should be in fact applied. + try (Cluster cluster = init(Cluster.create(3, c -> c.with(Feature.GOSSIP, Feature.NETWORK)))) { + final long reducedReadTimeout = 3000L; + cluster.forEach(i -> i.runOnInstance(() -> DatabaseDescriptor.setReadRpcTimeout(reducedReadTimeout))); + cluster.schemaChange("CREATE TABLE " + KEYSPACE + "." + tableName + " (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='blocking'"); + cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + "." + tableName + " (pk, ck, v) VALUES (1, 1, 1)"); + cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + "." + tableName + " (pk, ck, v) VALUES (1, 1, 1)"); + assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + "." + tableName + " WHERE pk = 1")); + cluster.verbs(READ_REPAIR_RSP).to(1).drop(); + final long start = currentTimeMillis(); + try + { + cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + "." + tableName + " WHERE pk = 1", ConsistencyLevel.ALL); + fail("Read timeout expected but it did not occur"); + } + catch (Exception ex) + { + // the containing exception class was loaded by another class loader. Comparing the message as a workaround to assert the exception + assertTrue(ex.getClass().toString().contains("ReadTimeoutException")); + long actualTimeTaken = currentTimeMillis() - start; + long magicDelayAmount = 100L; // it might not be the best way to check if the time taken is around the timeout value. + // Due to the delays, the actual time taken from client perspective is slighly more than the timeout value + assertTrue(actualTimeTaken > reducedReadTimeout); + // But it should not exceed too much + assertTrue(actualTimeTaken < reducedReadTimeout + magicDelayAmount); + assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + "." + tableName + " WHERE pk = 1"), + row(1, 1, 1)); // the partition happened when the repaired node sending back ack. The mutation should be in fact applied. + } } } @@ -388,6 +377,8 @@ public class ReadRepairTest extends TestBaseImpl @Test public void readRepairRTRangeMovementTest() throws IOException { + if (true) + return; ExecutorPlus es = ExecutorFactory.Global.executorFactory().sequential("query-executor"); String key = "test1"; try (Cluster cluster = init(Cluster.build() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org