This is an automated email from the ASF dual-hosted git repository. marcuse pushed a commit to branch cassandra-4.0 in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-4.0 by this push: new 5beab63b55 Improve the way we handle repair message timeouts to avoid hanging repairs 5beab63b55 is described below commit 5beab63b5550efb5e31e5005f90649661a9fe595 Author: Marcus Eriksson <marc...@apache.org> AuthorDate: Mon Aug 29 13:27:16 2022 +0200 Improve the way we handle repair message timeouts to avoid hanging repairs Patch by marcuse; reviewed by David Capwell for CASSANDRA-17613 --- CHANGES.txt | 1 + src/java/org/apache/cassandra/config/Config.java | 2 + .../cassandra/config/DatabaseDescriptor.java | 10 +++ src/java/org/apache/cassandra/net/Verb.java | 33 +++---- .../cassandra/repair/AsymmetricRemoteSyncTask.java | 7 +- .../cassandra/repair/RepairMessageVerbHandler.java | 4 + .../cassandra/repair/StreamingRepairTask.java | 9 +- .../cassandra/repair/SymmetricRemoteSyncTask.java | 11 --- src/java/org/apache/cassandra/repair/SyncTask.java | 12 +++ .../apache/cassandra/repair/ValidationTask.java | 9 +- .../cassandra/repair/messages/RepairMessage.java | 63 +++++++++++++ .../apache/cassandra/service/StorageService.java | 13 +++ .../cassandra/service/StorageServiceMBean.java | 3 + .../distributed/test/RepairRequestTimeoutTest.java | 100 +++++++++++++++++++++ 14 files changed, 240 insertions(+), 37 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index e1f8d19c30..96a37f53e9 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0.7 + * Avoid getting hanging repairs due to repair message timeouts (CASSANDRA-17613) * Prevent infinite loop in repair coordinator on FailSession (CASSANDRA-17834) Merged from 3.11: * Fix potential IndexOutOfBoundsException in PagingState in mixed mode clusters (CASSANDRA-17840) diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index e8aa297e77..f8d8d46db8 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -134,6 +134,8 @@ public class Config public volatile Integer repair_session_max_tree_depth = null; public volatile Integer repair_session_space_in_mb = null; + public volatile long repair_request_timeout_in_ms = TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES); + public volatile boolean use_offheap_merkle_trees = true; public int storage_port = 7000; diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 82a5ec73ba..f78a5b668a 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -1723,6 +1723,16 @@ public class DatabaseDescriptor return unit.convert(getBlockForPeersTimeoutInSeconds(), TimeUnit.SECONDS); } + public static long getRepairRpcTimeout(TimeUnit unit) + { + return unit.convert(conf.repair_request_timeout_in_ms, MILLISECONDS); + } + + public static void setRepairRpcTimeout(long time, TimeUnit unit) + { + conf.repair_request_timeout_in_ms = MILLISECONDS.convert(time, unit); + } + public static double getPhiConvictThreshold() { return conf.phi_convict_threshold; diff --git a/src/java/org/apache/cassandra/net/Verb.java b/src/java/org/apache/cassandra/net/Verb.java index fad2fbf6a9..9d8b76dd35 100644 --- a/src/java/org/apache/cassandra/net/Verb.java +++ b/src/java/org/apache/cassandra/net/Verb.java @@ -147,22 +147,22 @@ public enum Verb SCHEMA_VERSION_REQ (20, P1, rpcTimeout, MIGRATION, () -> NoPayload.serializer, () -> SchemaVersionVerbHandler.instance, SCHEMA_VERSION_RSP ), // repair; mostly doesn't use callbacks and sends responses as their own request messages, with matching sessions by uuid; should eventually harmonize and make idiomatic - REPAIR_RSP (100, P1, rpcTimeout, REQUEST_RESPONSE, () -> NoPayload.serializer, () -> ResponseVerbHandler.instance ), - VALIDATION_RSP (102, P1, rpcTimeout, ANTI_ENTROPY, () -> ValidationResponse.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), - VALIDATION_REQ (101, P1, rpcTimeout, ANTI_ENTROPY, () -> ValidationRequest.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), - SYNC_RSP (104, P1, rpcTimeout, ANTI_ENTROPY, () -> SyncResponse.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), - SYNC_REQ (103, P1, rpcTimeout, ANTI_ENTROPY, () -> SyncRequest.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), - PREPARE_MSG (105, P1, rpcTimeout, ANTI_ENTROPY, () -> PrepareMessage.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), - SNAPSHOT_MSG (106, P1, rpcTimeout, ANTI_ENTROPY, () -> SnapshotMessage.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), - CLEANUP_MSG (107, P1, rpcTimeout, ANTI_ENTROPY, () -> CleanupMessage.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), - PREPARE_CONSISTENT_RSP (109, P1, rpcTimeout, ANTI_ENTROPY, () -> PrepareConsistentResponse.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), - PREPARE_CONSISTENT_REQ (108, P1, rpcTimeout, ANTI_ENTROPY, () -> PrepareConsistentRequest.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), - FINALIZE_PROPOSE_MSG (110, P1, rpcTimeout, ANTI_ENTROPY, () -> FinalizePropose.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), - FINALIZE_PROMISE_MSG (111, P1, rpcTimeout, ANTI_ENTROPY, () -> FinalizePromise.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), - FINALIZE_COMMIT_MSG (112, P1, rpcTimeout, ANTI_ENTROPY, () -> FinalizeCommit.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), - FAILED_SESSION_MSG (113, P1, rpcTimeout, ANTI_ENTROPY, () -> FailSession.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), - STATUS_RSP (115, P1, rpcTimeout, ANTI_ENTROPY, () -> StatusResponse.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), - STATUS_REQ (114, P1, rpcTimeout, ANTI_ENTROPY, () -> StatusRequest.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), + REPAIR_RSP (100, P1, repairMsgTimeout,REQUEST_RESPONSE, () -> NoPayload.serializer, () -> ResponseVerbHandler.instance ), + VALIDATION_RSP (102, P1, longTimeout ,ANTI_ENTROPY, () -> ValidationResponse.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), + VALIDATION_REQ (101, P1, repairMsgTimeout,ANTI_ENTROPY, () -> ValidationRequest.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), + SYNC_RSP (104, P1, repairMsgTimeout,ANTI_ENTROPY, () -> SyncResponse.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), + SYNC_REQ (103, P1, repairMsgTimeout,ANTI_ENTROPY, () -> SyncRequest.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), + PREPARE_MSG (105, P1, repairMsgTimeout,ANTI_ENTROPY, () -> PrepareMessage.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), + SNAPSHOT_MSG (106, P1, repairMsgTimeout,ANTI_ENTROPY, () -> SnapshotMessage.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), + CLEANUP_MSG (107, P1, repairMsgTimeout,ANTI_ENTROPY, () -> CleanupMessage.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), + PREPARE_CONSISTENT_RSP (109, P1, repairMsgTimeout,ANTI_ENTROPY, () -> PrepareConsistentResponse.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), + PREPARE_CONSISTENT_REQ (108, P1, repairMsgTimeout,ANTI_ENTROPY, () -> PrepareConsistentRequest.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), + FINALIZE_PROPOSE_MSG (110, P1, repairMsgTimeout,ANTI_ENTROPY, () -> FinalizePropose.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), + FINALIZE_PROMISE_MSG (111, P1, repairMsgTimeout,ANTI_ENTROPY, () -> FinalizePromise.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), + FINALIZE_COMMIT_MSG (112, P1, repairMsgTimeout,ANTI_ENTROPY, () -> FinalizeCommit.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), + FAILED_SESSION_MSG (113, P1, repairMsgTimeout,ANTI_ENTROPY, () -> FailSession.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), + STATUS_RSP (115, P1, repairMsgTimeout,ANTI_ENTROPY, () -> StatusResponse.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), + STATUS_REQ (114, P1, repairMsgTimeout,ANTI_ENTROPY, () -> StatusRequest.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), REPLICATION_DONE_RSP (82, P0, rpcTimeout, MISC, () -> NoPayload.serializer, () -> ResponseVerbHandler.instance ), REPLICATION_DONE_REQ (22, P0, rpcTimeout, MISC, () -> NoPayload.serializer, () -> ReplicationDoneVerbHandler.instance, REPLICATION_DONE_RSP), @@ -450,4 +450,5 @@ class VerbTimeouts static final ToLongFunction<TimeUnit> pingTimeout = DatabaseDescriptor::getPingTimeout; static final ToLongFunction<TimeUnit> longTimeout = units -> Math.max(DatabaseDescriptor.getRpcTimeout(units), units.convert(5L, TimeUnit.MINUTES)); static final ToLongFunction<TimeUnit> noTimeout = units -> { throw new IllegalStateException(); }; + static final ToLongFunction<TimeUnit> repairMsgTimeout= DatabaseDescriptor::getRepairRpcTimeout; } \ No newline at end of file diff --git a/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java b/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java index 40a1f514be..9762c9f08f 100644 --- a/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java +++ b/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java @@ -20,20 +20,17 @@ package org.apache.cassandra.repair; import java.util.List; + import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.RepairException; import org.apache.cassandra.locator.InetAddressAndPort; -import org.apache.cassandra.net.Message; -import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.repair.messages.SyncRequest; import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.streaming.SessionSummary; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.FBUtilities; -import static org.apache.cassandra.net.Verb.SYNC_REQ; - /** * AsymmetricRemoteSyncTask sends {@link SyncRequest} to target node to repair(stream) * data with other target replica. @@ -53,7 +50,7 @@ public class AsymmetricRemoteSyncTask extends SyncTask implements CompletableRem SyncRequest request = new SyncRequest(desc, local, nodePair.coordinator, nodePair.peer, rangesToSync, previewKind, true); String message = String.format("Forwarding streaming repair of %d ranges to %s (to be streamed with %s)", request.ranges.size(), request.src, request.dst); Tracing.traceRepair(message); - MessagingService.instance().send(Message.out(SYNC_REQ, request), request.src); + sendRequest(request, request.src); } public void syncComplete(boolean success, List<SessionSummary> summaries) diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java index bfc2657ab4..7488f2ebc1 100644 --- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java +++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java @@ -124,6 +124,8 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage> break; case VALIDATION_REQ: + // notify initiator that the message has been received, allowing this method to take as long as it needs to + MessagingService.instance().send(message.emptyResponse(), message.from()); ValidationRequest validationRequest = (ValidationRequest) message.payload; logger.debug("Validating {}", validationRequest); // trigger read-only compaction @@ -143,6 +145,8 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage> break; case SYNC_REQ: + // notify initiator that the message has been received, allowing this method to take as long as it needs to + MessagingService.instance().send(message.emptyResponse(), message.from()); // forwarded sync request SyncRequest request = (SyncRequest) message.payload; logger.debug("Syncing {}", request); diff --git a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java index fbfbac8748..9c6caf46f5 100644 --- a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java +++ b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java @@ -39,7 +39,11 @@ import org.apache.cassandra.streaming.StreamPlan; import org.apache.cassandra.streaming.StreamState; import org.apache.cassandra.streaming.StreamOperation; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.NANOSECONDS; import static org.apache.cassandra.net.Verb.SYNC_RSP; +import static org.apache.cassandra.utils.MonotonicClock.approxTime; + /** * StreamingRepairTask performs data streaming between two remote replicas, neither of which is repair coordinator. @@ -73,7 +77,10 @@ public class StreamingRepairTask implements Runnable, StreamEventHandler public void run() { logger.info("[streaming task #{}] Performing {}streaming repair of {} ranges with {}", desc.sessionId, asymmetric ? "asymmetric " : "", ranges.size(), dst); - createStreamPlan(dst).execute(); + long start = approxTime.now(); + StreamPlan streamPlan = createStreamPlan(dst); + logger.info("[streaming task #{}] Stream plan created in {}ms", desc.sessionId, MILLISECONDS.convert(approxTime.now() - start, NANOSECONDS)); + streamPlan.execute(); } @VisibleForTesting diff --git a/src/java/org/apache/cassandra/repair/SymmetricRemoteSyncTask.java b/src/java/org/apache/cassandra/repair/SymmetricRemoteSyncTask.java index b4e2d9c5a1..629f4bb8aa 100644 --- a/src/java/org/apache/cassandra/repair/SymmetricRemoteSyncTask.java +++ b/src/java/org/apache/cassandra/repair/SymmetricRemoteSyncTask.java @@ -27,17 +27,12 @@ import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.RepairException; import org.apache.cassandra.locator.InetAddressAndPort; -import org.apache.cassandra.net.Message; -import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.repair.messages.RepairMessage; import org.apache.cassandra.repair.messages.SyncRequest; import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.streaming.SessionSummary; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.FBUtilities; -import static org.apache.cassandra.net.Verb.SYNC_REQ; - /** * SymmetricRemoteSyncTask sends {@link SyncRequest} to remote(non-coordinator) node * to repair(stream) data with other replica. @@ -53,16 +48,10 @@ public class SymmetricRemoteSyncTask extends SyncTask implements CompletableRemo super(desc, r1, r2, differences, previewKind); } - void sendRequest(SyncRequest request, InetAddressAndPort to) - { - MessagingService.instance().send(Message.out(SYNC_REQ, request), to); - } - @Override protected void startSync() { InetAddressAndPort local = FBUtilities.getBroadcastAddressAndPort(); - SyncRequest request = new SyncRequest(desc, local, nodePair.coordinator, nodePair.peer, rangesToSync, previewKind, false); Preconditions.checkArgument(nodePair.coordinator.equals(request.src)); String message = String.format("Forwarding streaming repair of %d ranges to %s (to be streamed with %s)", request.ranges.size(), request.src, request.dst); diff --git a/src/java/org/apache/cassandra/repair/SyncTask.java b/src/java/org/apache/cassandra/repair/SyncTask.java index fe9f09ed7e..24e206828d 100644 --- a/src/java/org/apache/cassandra/repair/SyncTask.java +++ b/src/java/org/apache/cassandra/repair/SyncTask.java @@ -32,9 +32,13 @@ import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.repair.messages.RepairMessage; +import org.apache.cassandra.repair.messages.SyncRequest; import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.tracing.Tracing; +import static org.apache.cassandra.net.Verb.SYNC_REQ; + public abstract class SyncTask extends AbstractFuture<SyncStat> implements Runnable { private static final Logger logger = LoggerFactory.getLogger(SyncTask.class); @@ -101,4 +105,12 @@ public abstract class SyncTask extends AbstractFuture<SyncStat> implements Runna } public void abort() {} + + void sendRequest(SyncRequest request, InetAddressAndPort to) + { + RepairMessage.sendMessageWithFailureCB(request, + SYNC_REQ, + to, + this::setException); + } } diff --git a/src/java/org/apache/cassandra/repair/ValidationTask.java b/src/java/org/apache/cassandra/repair/ValidationTask.java index 0161acf8d8..b4aef249c5 100644 --- a/src/java/org/apache/cassandra/repair/ValidationTask.java +++ b/src/java/org/apache/cassandra/repair/ValidationTask.java @@ -21,8 +21,7 @@ import com.google.common.util.concurrent.AbstractFuture; import org.apache.cassandra.exceptions.RepairException; import org.apache.cassandra.locator.InetAddressAndPort; -import org.apache.cassandra.net.Message; -import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.repair.messages.RepairMessage; import org.apache.cassandra.repair.messages.ValidationRequest; import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.utils.MerkleTrees; @@ -53,8 +52,10 @@ public class ValidationTask extends AbstractFuture<TreeResponse> implements Runn */ public void run() { - ValidationRequest request = new ValidationRequest(desc, nowInSec); - MessagingService.instance().send(Message.out(VALIDATION_REQ, request), endpoint); + RepairMessage.sendMessageWithFailureCB(new ValidationRequest(desc, nowInSec), + VALIDATION_REQ, + endpoint, + this::setException); } /** diff --git a/src/java/org/apache/cassandra/repair/messages/RepairMessage.java b/src/java/org/apache/cassandra/repair/messages/RepairMessage.java index 3137b4e474..165911dc9b 100644 --- a/src/java/org/apache/cassandra/repair/messages/RepairMessage.java +++ b/src/java/org/apache/cassandra/repair/messages/RepairMessage.java @@ -17,7 +17,23 @@ */ package org.apache.cassandra.repair.messages; +import java.util.UUID; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.exceptions.RepairException; +import org.apache.cassandra.exceptions.RequestFailureReason; +import org.apache.cassandra.gms.Gossiper; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.net.Message; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.net.RequestCallback; +import org.apache.cassandra.net.Verb; import org.apache.cassandra.repair.RepairJobDesc; +import org.apache.cassandra.utils.CassandraVersion; + +import static org.apache.cassandra.net.MessageFlag.CALL_BACK_ON_FAILURE; /** * Base class of all repair related request/response messages. @@ -26,10 +42,57 @@ import org.apache.cassandra.repair.RepairJobDesc; */ public abstract class RepairMessage { + private static final CassandraVersion SUPPORTS_TIMEOUTS = new CassandraVersion("4.0.7-SNAPSHOT"); + private static final Logger logger = LoggerFactory.getLogger(RepairMessage.class); public final RepairJobDesc desc; protected RepairMessage(RepairJobDesc desc) { this.desc = desc; } + + public interface RepairFailureCallback + { + void onFailure(Exception e); + } + + public static void sendMessageWithFailureCB(RepairMessage request, Verb verb, InetAddressAndPort endpoint, RepairFailureCallback failureCallback) + { + RequestCallback<?> callback = new RequestCallback<Object>() + { + @Override + public void onResponse(Message<Object> msg) + { + logger.info("[#{}] {} received by {}", request.desc.parentSessionId, verb, endpoint); + // todo: at some point we should make repair messages follow the normal path, actually using this + } + + @Override + public boolean invokeOnFailure() + { + return true; + } + + public void onFailure(InetAddressAndPort from, RequestFailureReason failureReason) + { + logger.error("[#{}] {} failed on {}: {}", request.desc.parentSessionId, verb, from, failureReason); + + if (supportsTimeouts(from, request.desc.parentSessionId)) + failureCallback.onFailure(new RepairException(request.desc, String.format("Got %s failure from %s: %s", verb, from, failureReason))); + } + }; + + MessagingService.instance().sendWithCallback(Message.outWithFlag(verb, request, CALL_BACK_ON_FAILURE), + endpoint, + callback); + } + + private static boolean supportsTimeouts(InetAddressAndPort from, UUID parentSessionId) + { + CassandraVersion remoteVersion = Gossiper.instance.getReleaseVersion(from); + if (remoteVersion != null && remoteVersion.compareTo(SUPPORTS_TIMEOUTS) >= 0) + return true; + logger.warn("[#{}] Not failing repair due to remote host {} not supporting repair message timeouts (version = {})", parentSessionId, from, remoteVersion); + return false; + } } diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 4568c5af8f..4721113ad3 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -6043,4 +6043,17 @@ public class StorageService extends NotificationBroadcasterSupport implements IE logger.info("Changing keyspace count warn threshold from {} to {}", getKeyspaceCountWarnThreshold(), value); DatabaseDescriptor.setKeyspaceCountWarnThreshold(value); } + + public Long getRepairRpcTimeout() + { + return DatabaseDescriptor.getRepairRpcTimeout(MILLISECONDS); + } + + public void setRepairRpcTimeout(Long timeoutInMillis) + { + Preconditions.checkState(timeoutInMillis > 0); + DatabaseDescriptor.setRepairRpcTimeout(timeoutInMillis, MILLISECONDS); + logger.info("RepairRpcTimeout set to {}ms via JMX", timeoutInMillis); + } + } diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java index 30e643bc1d..c61e45e7a4 100644 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@ -862,4 +862,7 @@ public interface StorageServiceMBean extends NotificationEmitter void setTableCountWarnThreshold(int value); int getKeyspaceCountWarnThreshold(); void setKeyspaceCountWarnThreshold(int value); + + public Long getRepairRpcTimeout(); + public void setRepairRpcTimeout(Long timeoutInMillis); } diff --git a/test/distributed/org/apache/cassandra/distributed/test/RepairRequestTimeoutTest.java b/test/distributed/org/apache/cassandra/distributed/test/RepairRequestTimeoutTest.java new file mode 100644 index 0000000000..33e0e78724 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/RepairRequestTimeoutTest.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test; + +import java.io.IOException; + +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.net.Verb; + +import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.apache.cassandra.distributed.api.Feature.GOSSIP; +import static org.apache.cassandra.distributed.api.Feature.NETWORK; +import static org.apache.cassandra.net.Verb.SYNC_REQ; +import static org.apache.cassandra.net.Verb.VALIDATION_REQ; +import static org.junit.Assert.assertTrue; + +public class RepairRequestTimeoutTest extends TestBaseImpl +{ + static Cluster CLUSTER; + static final long timeoutMillis = 1000; + @BeforeClass + public static void setup() throws IOException + { + CLUSTER = init(Cluster.build(3) + .withConfig(config -> config.with(GOSSIP, NETWORK).set("repair_request_timeout_in_ms", timeoutMillis)) + .start()); + CLUSTER.schemaChange(withKeyspace("create table %s.tbl (id int primary key)")); + } + + @Before + public void before() + { + CLUSTER.filters().reset(); + } + + @Test + public void testLostSYNC_REQ() + { + testLostMessageHelper(SYNC_REQ); + } + + @Test + public void testLostVALIDATION_REQ() + { + testLostMessageHelper(VALIDATION_REQ); + } + + public void testLostMessageHelper(Verb verb) + { + for (int i = 0; i < 10; i++) + CLUSTER.coordinator(1).execute(withKeyspace("insert into %s.tbl (id) values (?)"), ConsistencyLevel.ALL, i); + for (int i = 10; i < 20; i++) + CLUSTER.get((i % 3) + 1).executeInternal(withKeyspace("insert into %s.tbl (id) values (?)"), i); + CLUSTER.forEach(i -> i.flush(KEYSPACE)); + CLUSTER.filters().verbs(verb.id).drop(); + // symmetric vs asymmetric: + CLUSTER.get(1).nodetoolResult("repair", "-full", KEYSPACE, "tbl").asserts().failure().notificationContains(verb + " failure from"); + CLUSTER.get(1).nodetoolResult("repair", "-full", "-os", KEYSPACE, "tbl").asserts().failure().notificationContains(verb + " failure from"); + + // and success + CLUSTER.filters().reset(); + long mark = CLUSTER.get(1).logs().mark(); + + CLUSTER.get(1).nodetoolResult("repair", "-full", KEYSPACE, "tbl").asserts().success(); + for (int i = 10; i < 20; i++) + CLUSTER.get((i % 3) + 1).executeInternal(withKeyspace("insert into %s.tbl (id) values (?)"), i); + + CLUSTER.get(1).nodetoolResult("repair", "-full", "-os", KEYSPACE, "tbl").asserts().success(); + CLUSTER.get(1).runOnInstance(() -> { + // make sure we don't get any expirations after the repair has finished + long expirationInterval = DatabaseDescriptor.getMinRpcTimeout(MILLISECONDS) / 2; // see RequestCallbacks.java + sleepUninterruptibly((timeoutMillis + expirationInterval) * 2, MILLISECONDS); + }); + + assertTrue(CLUSTER.get(1).logs().grep(mark, "failure from").getResult().isEmpty()); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org