This is an automated email from the ASF dual-hosted git repository. bereng pushed a commit to branch cassandra-5.0 in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-5.0 by this push: new ebe07db602 Test Failure: org.apache.cassandra.distributed.test.UpgradeSSTablesTest.truncateWhileUpgrading ebe07db602 is described below commit ebe07db6023493b4f0a1968af439fee1e664dbaf Author: Bereng <berenguerbl...@gmail.com> AuthorDate: Tue Mar 5 11:29:56 2024 +0100 Test Failure: org.apache.cassandra.distributed.test.UpgradeSSTablesTest.truncateWhileUpgrading patch by Berenguer Blasi; reviewed by Brandon Williams for CASSANDRA-19398 --- .../distributed/test/UpgradeSSTablesTest.java | 58 +++++++++++++++++++--- 1 file changed, 50 insertions(+), 8 deletions(-) diff --git a/test/distributed/org/apache/cassandra/distributed/test/UpgradeSSTablesTest.java b/test/distributed/org/apache/cassandra/distributed/test/UpgradeSSTablesTest.java index 2bdcaf16c6..76c6c189c4 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/UpgradeSSTablesTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/UpgradeSSTablesTest.java @@ -50,10 +50,11 @@ import static org.apache.cassandra.utils.concurrent.CountDownLatch.newCountDownL public class UpgradeSSTablesTest extends TestBaseImpl { + @Test public void upgradeSSTablesInterruptsOngoingCompaction() throws Throwable { - try (ICluster<IInvokableInstance> cluster = init(builder().withNodes(1).start())) + try (ICluster<IInvokableInstance> cluster = init(builder().withNodes(1).withInstanceInitializer(CompactionLatchByteman::install).start())) { cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v text, PRIMARY KEY (pk, ck));"); cluster.get(1).acceptsOnInstance((String ks) -> { @@ -79,10 +80,15 @@ public class UpgradeSSTablesTest extends TestBaseImpl LogAction logAction = cluster.get(1).logs(); logAction.mark(); + Future<?> future = cluster.get(1).asyncAcceptsOnInstance((String ks) -> { ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore("tbl"); CompactionManager.instance.submitMaximal(cfs, FBUtilities.nowInSeconds(), false, OperationType.COMPACTION); }).apply(KEYSPACE); + + Assert.assertTrue(cluster.get(1).callOnInstance(() -> CompactionLatchByteman.starting.awaitUninterruptibly(1, TimeUnit.MINUTES))); + cluster.get(1).runOnInstance(() -> { + CompactionLatchByteman.start.decrement();}); Assert.assertEquals(0, cluster.get(1).nodetool("upgradesstables", "-a", KEYSPACE, "tbl")); future.get(); Assert.assertFalse(logAction.grep("Compaction interrupted").getResult().isEmpty()); @@ -136,7 +142,7 @@ public class UpgradeSSTablesTest extends TestBaseImpl @Test public void cleanupDoesNotInterruptUpgradeSSTables() throws Throwable { - try (ICluster<IInvokableInstance> cluster = init(builder().withNodes(1).withInstanceInitializer(BB::install).start())) + try (ICluster<IInvokableInstance> cluster = init(builder().withNodes(1).withInstanceInitializer(UpgradeSStablesLatchByteman::install).start())) { cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v text, PRIMARY KEY (pk, ck));"); @@ -160,12 +166,12 @@ public class UpgradeSSTablesTest extends TestBaseImpl LogAction logAction = cluster.get(1).logs(); logAction.mark(); - // Start upgradingsstables - use BB to pause once inside ActiveCompactions.beginCompaction + // Start upgradingsstables - use UpgradeSStablesLatchByteman to pause once inside ActiveCompactions.beginCompaction Thread upgradeThread = new Thread(() -> { cluster.get(1).nodetool("upgradesstables", "-a", KEYSPACE, "tbl"); }); upgradeThread.start(); - Assert.assertTrue(cluster.get(1).callOnInstance(() -> BB.starting.awaitUninterruptibly(1, TimeUnit.MINUTES))); + Assert.assertTrue(cluster.get(1).callOnInstance(() -> UpgradeSStablesLatchByteman.starting.awaitUninterruptibly(1, TimeUnit.MINUTES))); // Start a scrub and make sure that it fails, log check later to make sure it was // because it cannot cancel the active upgrade sstables @@ -173,7 +179,7 @@ public class UpgradeSSTablesTest extends TestBaseImpl // Now resume the upgrade sstables so test can shut down cluster.get(1).runOnInstance(() -> { - BB.start.decrement(); + UpgradeSStablesLatchByteman.start.decrement(); }); upgradeThread.join(); @@ -186,7 +192,7 @@ public class UpgradeSSTablesTest extends TestBaseImpl @Test public void truncateWhileUpgrading() throws Throwable { - try (ICluster<IInvokableInstance> cluster = init(builder().withNodes(1).start())) + try (ICluster<IInvokableInstance> cluster = init(builder().withNodes(1).withInstanceInitializer(UpgradeSStablesLatchByteman::install).start())) { cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int, ck int, v text, PRIMARY KEY (pk, ck)) ")); cluster.get(1).acceptsOnInstance((String ks) -> { @@ -215,6 +221,8 @@ public class UpgradeSSTablesTest extends TestBaseImpl cluster.get(1).nodetool("upgradesstables", "-a", KEYSPACE, "tbl"); }); + Assert.assertTrue(cluster.get(1).callOnInstance(() -> UpgradeSStablesLatchByteman.starting.awaitUninterruptibly(1, TimeUnit.MINUTES))); + cluster.get(1).runOnInstance(() -> {UpgradeSStablesLatchByteman.start.decrement();}); cluster.schemaChange(withKeyspace("TRUNCATE %s.tbl")); upgrade.get(); Assert.assertFalse(logAction.grep("Compaction interrupted").getResult().isEmpty()); @@ -303,7 +311,7 @@ public class UpgradeSSTablesTest extends TestBaseImpl } } - public static class BB + public static class UpgradeSStablesLatchByteman { // Will be initialized in the context of the instance class loader static CountDownLatch starting = newCountDownLatch(1); @@ -313,7 +321,7 @@ public class UpgradeSSTablesTest extends TestBaseImpl { new ByteBuddy().rebase(ActiveCompactions.class) .method(named("beginCompaction")) - .intercept(MethodDelegation.to(BB.class)) + .intercept(MethodDelegation.to(UpgradeSStablesLatchByteman.class)) .make() .load(classLoader, ClassLoadingStrategy.Default.INJECTION); } @@ -336,4 +344,38 @@ public class UpgradeSSTablesTest extends TestBaseImpl } } } + + public static class CompactionLatchByteman + { + // Will be initialized in the context of the instance class loader + static CountDownLatch starting = newCountDownLatch(1); + static CountDownLatch start = newCountDownLatch(1); + + public static void install(ClassLoader classLoader, Integer num) + { + new ByteBuddy().rebase(ActiveCompactions.class) + .method(named("beginCompaction")) + .intercept(MethodDelegation.to(CompactionLatchByteman.class)) + .make() + .load(classLoader, ClassLoadingStrategy.Default.INJECTION); + } + + @SuppressWarnings("unused") + public static void beginCompaction(CompactionInfo.Holder ci, @SuperCall Callable<Void> zuperCall) + { + try + { + zuperCall.call(); + if (ci.getCompactionInfo().getTaskType() == OperationType.COMPACTION) + { + starting.decrement(); + Assert.assertTrue(start.awaitUninterruptibly(1, TimeUnit.MINUTES)); + } + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org