This is an automated email from the ASF dual-hosted git repository. dcapwell pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new e0a6b83a02 When bootstrap fails, CassandraRoleManager may attempt to do read queries that fail with "Cannot read from a bootstrapping node", and increments unavailables counters e0a6b83a02 is described below commit e0a6b83a02804bf976fdc43718001f23818ee53d Author: David Capwell <dcapw...@apache.org> AuthorDate: Mon Jul 25 12:26:35 2022 -0700 When bootstrap fails, CassandraRoleManager may attempt to do read queries that fail with "Cannot read from a bootstrapping node", and increments unavailables counters patch by David Capwell; reviewed by Sam Tunnicliffe for CASSANDRA-17754 --- CHANGES.txt | 1 + .../cassandra/auth/CassandraRoleManager.java | 7 ++ .../org/apache/cassandra/service/StorageProxy.java | 19 ++- .../cassandra/distributed/shared/ClusterUtils.java | 46 +++++++ .../test/hostreplacement/FailedBootstrapTest.java | 138 +++++++++++++++++++++ .../test/hostreplacement/HostReplacementTest.java | 3 + 6 files changed, 212 insertions(+), 2 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 33753cb531..63e8fdd328 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.2 + * When bootstrap fails, CassandraRoleManager may attempt to do read queries that fail with "Cannot read from a bootstrapping node", and increments unavailables counters (CASSANDRA-17754) * Add guardrail to disallow DROP KEYSPACE commands (CASSANDRA-17767) * Remove ephemeral snapshot marker file and introduce a flag to SnapshotManifest (CASSANDRA-16911) * Add a virtual table that exposes currently running queries (CASSANDRA-15241) diff --git a/src/java/org/apache/cassandra/auth/CassandraRoleManager.java b/src/java/org/apache/cassandra/auth/CassandraRoleManager.java index 0344de921d..c2272707ec 100644 --- a/src/java/org/apache/cassandra/auth/CassandraRoleManager.java +++ b/src/java/org/apache/cassandra/auth/CassandraRoleManager.java @@ -43,6 +43,7 @@ import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.marshal.UTF8Type; import org.apache.cassandra.exceptions.*; import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.service.StorageProxy; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.transport.messages.ResultMessage; import org.apache.cassandra.utils.ByteBufferUtil; @@ -386,6 +387,12 @@ public class CassandraRoleManager implements IRoleManager { // The delay is to give the node a chance to see its peers before attempting the operation ScheduledExecutors.optionalTasks.scheduleSelfRecurring(() -> { + if (!StorageProxy.isSafeToPerformRead()) + { + logger.trace("Setup task may not run due to it not being safe to perform reads... rescheduling"); + scheduleSetupTask(setupTask); + return; + } try { setupTask.call(); diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 557382df54..e89bdae717 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -1823,7 +1823,7 @@ public class StorageProxy implements StorageProxyMBean public static PartitionIterator read(SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel, long queryStartNanoTime) throws UnavailableException, IsBootstrappingException, ReadFailureException, ReadTimeoutException, InvalidRequestException { - if (StorageService.instance.isBootstrapMode() && !systemKeyspaceQuery(group.queries)) + if (!isSafeToPerformRead(group.queries)) { readMetrics.unavailables.mark(); readMetricsForLevel(consistencyLevel).unavailables.mark(); @@ -1850,6 +1850,16 @@ public class StorageProxy implements StorageProxyMBean : readRegular(group, consistencyLevel, queryStartNanoTime); } + public static boolean isSafeToPerformRead(List<SinglePartitionReadCommand> queries) + { + return isSafeToPerformRead() || systemKeyspaceQuery(queries); + } + + public static boolean isSafeToPerformRead() + { + return !StorageService.instance.isBootstrapMode(); + } + private static PartitionIterator readWithPaxos(SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel, long queryStartNanoTime) throws InvalidRequestException, UnavailableException, ReadFailureException, ReadTimeoutException { @@ -2619,8 +2629,13 @@ public class StorageProxy implements StorageProxyMBean public static void logRequestException(Exception exception, Collection<? extends ReadCommand> commands) { + // Multiple different types of errors can happen, so by dedupping on the error type we can see each error + // case rather than just exposing the first error seen; this should make sure more rare issues are exposed + // rather than being hidden by more common errors such as timeout or unavailable + // see CASSANDRA-17754 + String msg = exception.getClass().getSimpleName() + " \"{}\" while executing {}"; NoSpamLogger.log(logger, NoSpamLogger.Level.INFO, FAILURE_LOGGING_INTERVAL_SECONDS, TimeUnit.SECONDS, - "\"{}\" while executing {}", + msg, () -> new Object[] { exception.getMessage(), diff --git a/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java b/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java index dc280f3085..d848d201dc 100644 --- a/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java +++ b/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java @@ -30,6 +30,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.BiConsumer; @@ -40,6 +41,10 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; import com.google.common.util.concurrent.Futures; + +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.gms.ApplicationState; +import org.apache.cassandra.gms.VersionedValue; import org.apache.cassandra.io.util.File; import org.junit.Assert; @@ -554,6 +559,47 @@ public class ClusterUtils }); } + public static void awaitGossipSchemaMatch(ICluster<? extends IInstance> cluster) + { + cluster.forEach(ClusterUtils::awaitGossipSchemaMatch); + } + + public static void awaitGossipSchemaMatch(IInstance instance) + { + if (!instance.config().has(Feature.GOSSIP)) + { + // when gosisp isn't enabled, don't bother waiting on gossip to settle... + return; + } + awaitGossip(instance, "Schema IDs did not match", all -> { + String current = null; + for (Map.Entry<String, Map<String, String>> e : all.entrySet()) + { + Map<String, String> state = e.getValue(); + // has the instance joined? + String status = state.get(ApplicationState.STATUS_WITH_PORT.name()); + if (status == null) + status = state.get(ApplicationState.STATUS.name()); + if (status == null || !status.contains(VersionedValue.STATUS_NORMAL)) + continue; // ignore instances not joined yet + String schema = state.get("SCHEMA"); + if (schema == null) + throw new AssertionError("Unable to find schema for " + e.getKey() + "; status was " + status); + schema = schema.split(":")[1]; + + if (current == null) + { + current = schema; + } + else if (!current.equals(schema)) + { + return false; + } + } + return true; + }); + } + /** * Get the gossip information from the node. Currently only address, generation, and heartbeat are returned * diff --git a/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/FailedBootstrapTest.java b/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/FailedBootstrapTest.java new file mode 100644 index 0000000000..56de092844 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/FailedBootstrapTest.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.hostreplacement; + +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import net.bytebuddy.ByteBuddy; +import net.bytebuddy.dynamic.loading.ClassLoadingStrategy; +import net.bytebuddy.implementation.MethodDelegation; +import net.bytebuddy.implementation.bind.annotation.SuperCall; +import net.bytebuddy.implementation.bind.annotation.This; +import org.apache.cassandra.auth.CassandraRoleManager; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.distributed.api.NodeToolResult; +import org.apache.cassandra.distributed.api.TokenSupplier; +import org.apache.cassandra.distributed.test.TestBaseImpl; +import org.apache.cassandra.metrics.ClientRequestsMetricsHolder; +import org.apache.cassandra.streaming.StreamException; +import org.apache.cassandra.streaming.StreamResultFuture; +import org.assertj.core.api.Assertions; +import org.awaitility.Awaitility; + +import static net.bytebuddy.matcher.ElementMatchers.named; +import static org.apache.cassandra.distributed.shared.ClusterUtils.replaceHostAndStart; +import static org.apache.cassandra.distributed.shared.ClusterUtils.stopUnchecked; +import static org.apache.cassandra.distributed.test.hostreplacement.HostReplacementTest.setupCluster; + +public class FailedBootstrapTest extends TestBaseImpl +{ + private static final Logger logger = LoggerFactory.getLogger(FailedBootstrapTest.class); + + private static final int NODE_TO_REMOVE = 2; + + @Test + public void roleSetupDoesNotProduceUnavailables() throws IOException + { + Cluster.Builder builder = Cluster.build(3) + .withConfig(c -> c.with(Feature.values())) + .withInstanceInitializer(BB::install); + TokenSupplier even = TokenSupplier.evenlyDistributedTokens(3, builder.getTokenCount()); + builder = builder.withTokenSupplier((TokenSupplier) node -> even.tokens(node == 4 ? NODE_TO_REMOVE : node)); + try (Cluster cluster = builder.start()) + { + List<IInvokableInstance> alive = Arrays.asList(cluster.get(1), cluster.get(3)); + IInvokableInstance nodeToRemove = cluster.get(NODE_TO_REMOVE); + + setupCluster(cluster); + + stopUnchecked(nodeToRemove); + + // should fail to join, but should start up! + IInvokableInstance added = replaceHostAndStart(cluster, nodeToRemove, p -> p.setProperty("cassandra.superuser_setup_delay_ms", "1")); + // log gossip for debugging + alive.forEach(i -> { + NodeToolResult result = i.nodetoolResult("gossipinfo"); + result.asserts().success(); + logger.info("gossipinfo for node{}\n{}", i.config().num(), result.getStdout()); + }); + + // CassandraRoleManager attempted to do distributed reads while bootstrap was still going (it failed, so still in bootstrap mode) + // so need to validate that is no longer happening and we incrementing org.apache.cassandra.metrics.ClientRequestMetrics.unavailables + // sleep larger than multiple retry attempts... + Awaitility.await() + .atMost(1, TimeUnit.MINUTES) + .until(() -> added.callOnInstance(() -> BB.SETUP_SCHEDULE_COUNTER.get()) >= 42); // why 42? just need something large enough to make sure multiple attempts happened + + // do we have any read metrics have unavailables? + added.runOnInstance(() -> { + Assertions.assertThat(ClientRequestsMetricsHolder.readMetrics.unavailables.getCount()).describedAs("read unavailables").isEqualTo(0); + Assertions.assertThat(ClientRequestsMetricsHolder.casReadMetrics.unavailables.getCount()).describedAs("CAS read unavailables").isEqualTo(0); + }); + } + } + + public static class BB + { + public static void install(ClassLoader classLoader, Integer num) + { + if (num != 4) + return; + + new ByteBuddy().rebase(StreamResultFuture.class) + .method(named("maybeComplete")) + .intercept(MethodDelegation.to(BB.class)) + .make() + .load(classLoader, ClassLoadingStrategy.Default.INJECTION); + + new ByteBuddy().rebase(CassandraRoleManager.class) + .method(named("scheduleSetupTask")) + .intercept(MethodDelegation.to(BB.class)) + .make() + .load(classLoader, ClassLoadingStrategy.Default.INJECTION); + } + + public static void maybeComplete(@This StreamResultFuture future) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException + { + Method method = future.getClass().getSuperclass().getSuperclass().getDeclaredMethod("tryFailure", Throwable.class); + method.setAccessible(true); + method.invoke(future, new StreamException(future.getCurrentState(), "Stream failed")); + } + + private static final AtomicInteger SETUP_SCHEDULE_COUNTER = new AtomicInteger(0); + public static void scheduleSetupTask(final Callable<?> setupTask, @SuperCall Runnable fn) + { + SETUP_SCHEDULE_COUNTER.incrementAndGet(); + fn.run(); + } + } +} diff --git a/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/HostReplacementTest.java b/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/HostReplacementTest.java index 3de0bf51d5..8219d43ad1 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/HostReplacementTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/HostReplacementTest.java @@ -35,6 +35,7 @@ import org.apache.cassandra.distributed.api.IInvokableInstance; import org.apache.cassandra.distributed.api.SimpleQueryResult; import org.apache.cassandra.distributed.api.TokenSupplier; import org.apache.cassandra.distributed.shared.AssertUtils; +import org.apache.cassandra.distributed.shared.ClusterUtils; import org.apache.cassandra.distributed.test.TestBaseImpl; import org.assertj.core.api.Assertions; @@ -210,6 +211,8 @@ public class HostReplacementTest extends TestBaseImpl fixDistributedSchemas(cluster); init(cluster); + ClusterUtils.awaitGossipSchemaMatch(cluster); + populate(cluster); cluster.forEach(i -> i.flush(KEYSPACE)); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org