This is an automated email from the ASF dual-hosted git repository. dcapwell pushed a commit to branch cassandra-4.1 in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-4.1 by this push: new d3ce825bf2 Fix org.apache.cassandra.distributed.test.trackwarnings.TombstoneCountWarningTest.failThresholdSinglePartition d3ce825bf2 is described below commit d3ce825bf2b376fd2516e4b594ddb69037c13159 Author: David Capwell <dcapw...@apache.org> AuthorDate: Fri Jun 3 14:37:39 2022 -0700 Fix org.apache.cassandra.distributed.test.trackwarnings.TombstoneCountWarningTest.failThresholdSinglePartition patch by David Capwell; reviewed by Caleb Rackliffe for CASSANDRA-17244 --- .../test/thresholds/TombstoneCountWarningTest.java | 113 +++++++++++++++++++++ 1 file changed, 113 insertions(+) diff --git a/test/distributed/org/apache/cassandra/distributed/test/thresholds/TombstoneCountWarningTest.java b/test/distributed/org/apache/cassandra/distributed/test/thresholds/TombstoneCountWarningTest.java index 04668a9589..5e409cbc31 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/thresholds/TombstoneCountWarningTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/thresholds/TombstoneCountWarningTest.java @@ -20,10 +20,12 @@ package org.apache.cassandra.distributed.test.thresholds; import java.io.IOException; import java.net.InetAddress; +import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; import com.google.common.collect.ImmutableSet; @@ -33,9 +35,17 @@ import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.SimpleStatement; +import net.bytebuddy.ByteBuddy; +import net.bytebuddy.dynamic.loading.ClassLoadingStrategy; +import net.bytebuddy.implementation.MethodDelegation; +import net.bytebuddy.implementation.bind.annotation.SuperCall; +import net.bytebuddy.implementation.bind.annotation.This; +import org.apache.cassandra.concurrent.SEPExecutor; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.distributed.Cluster; @@ -49,16 +59,22 @@ import org.apache.cassandra.distributed.test.TestBaseImpl; import org.apache.cassandra.exceptions.ReadFailureException; import org.apache.cassandra.exceptions.RequestFailureReason; import org.apache.cassandra.exceptions.TombstoneAbortException; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.service.ClientWarn; import org.apache.cassandra.service.QueryState; +import org.apache.cassandra.service.reads.ReadCallback; import org.apache.cassandra.service.reads.thresholds.CoordinatorWarnings; +import org.apache.cassandra.utils.Shared; import org.assertj.core.api.Assertions; import org.assertj.core.api.Condition; +import static net.bytebuddy.matcher.ElementMatchers.named; import static org.assertj.core.api.Assertions.assertThat; public class TombstoneCountWarningTest extends TestBaseImpl { + private static final Logger logger = LoggerFactory.getLogger(TombstoneCountWarningTest.class); + private static final int TOMBSTONE_WARN = 50; private static final int TOMBSTONE_FAIL = 100; private static ICluster<IInvokableInstance> CLUSTER; @@ -68,10 +84,12 @@ public class TombstoneCountWarningTest extends TestBaseImpl @BeforeClass public static void setupClass() throws IOException { + logger.info("[test step : @BeforeClass] setupClass"); Cluster.Builder builder = Cluster.build(3); builder.withConfig(c -> c.set("tombstone_warn_threshold", TOMBSTONE_WARN) .set("tombstone_failure_threshold", TOMBSTONE_FAIL) .with(Feature.NATIVE_PROTOCOL, Feature.GOSSIP)); + builder.withInstanceInitializer(BB::install); CLUSTER = builder.start(); JAVA_DRIVER = JavaDriverUtils.create(CLUSTER); JAVA_DRIVER_SESSION = JAVA_DRIVER.connect(); @@ -80,6 +98,7 @@ public class TombstoneCountWarningTest extends TestBaseImpl @AfterClass public static void teardown() { + logger.info("[test step : @AfterClass] teardown"); if (JAVA_DRIVER_SESSION != null) JAVA_DRIVER_SESSION.close(); if (JAVA_DRIVER != null) @@ -89,6 +108,7 @@ public class TombstoneCountWarningTest extends TestBaseImpl @Before public void setup() { + logger.info("[test step : @Before] setup"); CLUSTER.schemaChange("DROP KEYSPACE IF EXISTS " + KEYSPACE); init(CLUSTER); CLUSTER.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); @@ -102,12 +122,14 @@ public class TombstoneCountWarningTest extends TestBaseImpl @Test public void noWarningsSinglePartition() { + logger.info("[test step : @Test] noWarningsSinglePartition"); noWarnings("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk=1"); } @Test public void noWarningsScan() { + logger.info("[test step : @Test] noWarningsScan"); noWarnings("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk=1"); } @@ -134,12 +156,14 @@ public class TombstoneCountWarningTest extends TestBaseImpl @Test public void warnThresholdSinglePartition() { + logger.info("[test step : @Test] warnThresholdSinglePartition"); warnThreshold("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", false); } @Test public void warnThresholdScan() { + logger.info("[test step : @Test] warnThresholdScan"); warnThreshold("SELECT * FROM " + KEYSPACE + ".tbl", true); } @@ -183,12 +207,14 @@ public class TombstoneCountWarningTest extends TestBaseImpl @Test public void failThresholdSinglePartition() throws UnknownHostException { + logger.info("[test step : @Test] failThresholdSinglePartition"); failThreshold("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", false); } @Test public void failThresholdScan() throws UnknownHostException { + logger.info("[test step : @Test] failThresholdScan"); failThreshold("SELECT * FROM " + KEYSPACE + ".tbl", true); } @@ -248,7 +274,10 @@ public class TombstoneCountWarningTest extends TestBaseImpl assertWarnAborts(0, 2, 1); + // when disabled warnings only happen if on the coordinator, and coordinator may not be the one replying + // to every query enable(false); + State.blockFor(CLUSTER.get(1).config().broadcastAddress()); warnings = CLUSTER.get(1).callsOnInstance(() -> { ClientWarn.instance.captureWarnings(); try @@ -259,7 +288,12 @@ public class TombstoneCountWarningTest extends TestBaseImpl catch (ReadFailureException e) { Assertions.assertThat(e).isNotInstanceOf(TombstoneAbortException.class); + Assertions.assertThat(e.failureReasonByEndpoint).isNotEmpty(); + Assertions.assertThat(e.failureReasonByEndpoint.values()) + .as("Non READ_TOO_MANY_TOMBSTONES exists") + .allMatch(RequestFailureReason.READ_TOO_MANY_TOMBSTONES::equals); } + logger.warn("Checking warnings..."); return ClientWarn.instance.getWarnings(); }).call(); // client warnings are currently coordinator only, so if present only 1 is expected @@ -276,6 +310,7 @@ public class TombstoneCountWarningTest extends TestBaseImpl assertWarnAborts(0, 2, 0); + State.blockFor(CLUSTER.get(1).config().broadcastAddress()); try { driverQueryAll(cql); @@ -327,4 +362,82 @@ public class TombstoneCountWarningTest extends TestBaseImpl { return JAVA_DRIVER_SESSION.execute(new SimpleStatement(cql).setConsistencyLevel(com.datastax.driver.core.ConsistencyLevel.ALL)); } + + @Shared + public static class State + { + // use InetSocketAddress as InetAddressAndPort is @Isolated which means equality doesn't work due to different + // ClassLoaders; InetSocketAddress is @Shared so safe to use between app and cluster class loaders + public static volatile InetSocketAddress blockFor = null; + public static volatile CompletableFuture<Void> promise = null; + + // called on main thread + public static void blockFor(InetSocketAddress address) + { + blockFor = address; + promise = new CompletableFuture<>(); + } + + // called in C* threads; non-test threads + public static void onFailure(InetSocketAddress address) + { + if (address.equals(blockFor)) + promise.complete(null); + } + + // called on main thread + public static void syncAndClear() + { + if (blockFor != null) + { + promise.join(); + blockFor = null; + promise = null; + } + } + } + + public static class BB + { + private static void install(ClassLoader cl, int instanceId) + { + if (instanceId != 1) + return; + new ByteBuddy().rebase(ReadCallback.class) + .method(named("awaitResults")) + .intercept(MethodDelegation.to(BB.class)) + .method(named("onFailure")) + .intercept(MethodDelegation.to(BB.class)) + .make() + .load(cl, ClassLoadingStrategy.Default.INJECTION); + new ByteBuddy().rebase(SEPExecutor.class) + .method(named("maybeExecuteImmediately")) + .intercept(MethodDelegation.to(BB.class)) + .make() + .load(cl, ClassLoadingStrategy.Default.INJECTION); + } + + @SuppressWarnings("unused") + public static void awaitResults(@SuperCall Runnable zuper) + { + State.syncAndClear(); + zuper.run(); + } + + @SuppressWarnings("unused") + public static void onFailure(InetAddressAndPort from, RequestFailureReason failureReason, @SuperCall Runnable zuper) throws Exception + { + State.onFailure(new InetSocketAddress(from.getAddress(), from.getPort())); + zuper.run(); + } + + // make sure to schedule the task rather than running inline... + // this is imporant as the read may block on the local version which can get the test to include it rather than + // block waiting, so by scheduling we make sure its always fair + @SuppressWarnings("unused") + public static void maybeExecuteImmediately(Runnable task, @This SEPExecutor executor) + { + executor.execute(task); + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org