Repository: cassandra Updated Branches: refs/heads/trunk 1b82de8c9 -> b86801e95
Add optional startup delay to wait until peers are ready patch by jasobrown; reviewed by Ariel Weisberg for CASSANDRA-13993 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b86801e9 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b86801e9 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b86801e9 Branch: refs/heads/trunk Commit: b86801e95a58c5f1a9c779b21fa57136e0225d61 Parents: 1b82de8 Author: Jason Brown <jasedbr...@gmail.com> Authored: Mon Feb 26 06:38:33 2018 -0800 Committer: Jason Brown <jasedbr...@gmail.com> Committed: Mon Feb 26 06:40:18 2018 -0800 ---------------------------------------------------------------------- .circleci/config.yml | 12 +- CHANGES.txt | 1 + .../org/apache/cassandra/config/Config.java | 5 + .../cassandra/config/DatabaseDescriptor.java | 10 ++ .../org/apache/cassandra/net/MessageOut.java | 29 +++- .../apache/cassandra/net/MessagingService.java | 16 +- .../org/apache/cassandra/net/PingMessage.java | 82 +++++++++ .../apache/cassandra/net/PingVerbHandler.java | 31 ++++ .../org/apache/cassandra/net/PongMessage.java | 50 ++++++ .../net/StartupClusterConnectivityChecker.java | 171 +++++++++++++++++++ .../net/async/OutboundConnectionIdentifier.java | 34 +++- .../net/async/OutboundMessagingConnection.java | 2 +- .../net/async/OutboundMessagingPool.java | 33 ++-- .../cassandra/service/CassandraDaemon.java | 10 ++ .../cassandra/service/EchoVerbHandler.java | 6 +- .../cassandra/service/StorageService.java | 3 + .../StartupClusterConnectivityCheckerTest.java | 129 ++++++++++++++ .../apache/cassandra/service/RemoveTest.java | 2 +- 18 files changed, 587 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/b86801e9/.circleci/config.yml ---------------------------------------------------------------------- diff --git a/.circleci/config.yml b/.circleci/config.yml index f881b70..13bc11d 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -58,16 +58,16 @@ with_dtest_jobs_only: &with_dtest_jobs_only - build # Set env_settings, env_vars, and workflows/build_and_run_tests based on environment env_settings: &env_settings - <<: *default_env_settings - #<<: *high_capacity_env_settings + #<<: *default_env_settings + <<: *high_capacity_env_settings env_vars: &env_vars - <<: *resource_constrained_env_vars - #<<: *high_capacity_env_vars + #<<: *resource_constrained_env_vars + <<: *high_capacity_env_vars workflows: version: 2 - build_and_run_tests: *default_jobs + #build_and_run_tests: *default_jobs #build_and_run_tests: *with_dtest_jobs_only - #build_and_run_tests: *with_dtest_jobs + build_and_run_tests: *with_dtest_jobs docker_image: &docker_image kjellman/cassandra-test:0.4.3 version: 2 jobs: http://git-wip-us.apache.org/repos/asf/cassandra/blob/b86801e9/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 40b18ae..9e7a599 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Add optional startup delay to wait until peers are ready (CASSANDRA-13993) * Add a few options to nodetool verify (CASSANDRA-14201) * CVE-2017-5929 Security vulnerability and redefine default log rotation policy (CASSANDRA-14183) * Use JVM default SSL validation algorithm instead of custom default (CASSANDRA-13259) http://git-wip-us.apache.org/repos/asf/cassandra/blob/b86801e9/src/java/org/apache/cassandra/config/Config.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index 875751b..ad91a9b 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -373,6 +373,11 @@ public class Config public String full_query_log_dir = null; + // parameters to adjust how much to delay startup until a certain amount of the cluster is connect to and marked alive + public int block_for_peers_percentage = 70; + public int block_for_peers_timeout_in_secs = 10; + + /** * @deprecated migrate to {@link DatabaseDescriptor#isClientInitialized()} */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/b86801e9/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index ccb0a30..2e772c5 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -2522,4 +2522,14 @@ public class DatabaseDescriptor { return conf.full_query_log_dir; } + + public static int getBlockForPeersPercentage() + { + return conf.block_for_peers_percentage; + } + + public static int getBlockForPeersTimeoutInSeconds() + { + return conf.block_for_peers_timeout_in_secs; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/b86801e9/src/java/org/apache/cassandra/net/MessageOut.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessageOut.java b/src/java/org/apache/cassandra/net/MessageOut.java index 7d3c0af..236a770 100644 --- a/src/java/org/apache/cassandra/net/MessageOut.java +++ b/src/java/org/apache/cassandra/net/MessageOut.java @@ -31,6 +31,7 @@ import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; @@ -97,6 +98,11 @@ public class MessageOut<T> public final List<Object> parameters; /** + * Allows sender to explicitly state which connection type the message should be sent on. + */ + public final ConnectionType connectionType; + + /** * Memoization of the serialized size of the just the payload. */ private int payloadSerializedSize = -1; @@ -122,24 +128,33 @@ public class MessageOut<T> this(verb, payload, serializer, - isTracing() - ? Tracing.instance.getTraceHeaders() - : ImmutableList.of()); + isTracing() ? Tracing.instance.getTraceHeaders() : ImmutableList.of(), + null); + } + + public MessageOut(MessagingService.Verb verb, T payload, IVersionedSerializer<T> serializer, ConnectionType connectionType) + { + this(verb, + payload, + serializer, + isTracing() ? Tracing.instance.getTraceHeaders() : ImmutableList.of(), + connectionType); } - private MessageOut(MessagingService.Verb verb, T payload, IVersionedSerializer<T> serializer, List<Object> parameters) + private MessageOut(MessagingService.Verb verb, T payload, IVersionedSerializer<T> serializer, List<Object> parameters, ConnectionType connectionType) { - this(FBUtilities.getBroadcastAddressAndPort(), verb, payload, serializer, parameters); + this(FBUtilities.getBroadcastAddressAndPort(), verb, payload, serializer, parameters, connectionType); } @VisibleForTesting - public MessageOut(InetAddressAndPort from, MessagingService.Verb verb, T payload, IVersionedSerializer<T> serializer, List<Object> parameters) + public MessageOut(InetAddressAndPort from, MessagingService.Verb verb, T payload, IVersionedSerializer<T> serializer, List<Object> parameters, ConnectionType connectionType) { this.from = from; this.verb = verb; this.payload = payload; this.serializer = serializer; this.parameters = parameters; + this.connectionType = connectionType; } public <VT> MessageOut<T> withParameter(ParameterType type, VT value) @@ -148,7 +163,7 @@ public class MessageOut<T> newParameters.addAll(parameters); newParameters.add(type); newParameters.add(value); - return new MessageOut<T>(verb, payload, serializer, newParameters); + return new MessageOut<T>(verb, payload, serializer, newParameters, connectionType); } public Stage getStage() http://git-wip-us.apache.org/repos/asf/cassandra/blob/b86801e9/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index 8fdb395..573cf7d 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -35,7 +35,6 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; - import javax.management.MBeanServer; import javax.management.ObjectName; @@ -255,7 +254,10 @@ public final class MessagingService implements MessagingServiceMBean return DatabaseDescriptor.getRangeRpcTimeout(); } }, - // remember to add new verbs at the end, since we serialize by ordinal + PING(), + + // add new verbs after the existing verbs, but *before* the UNUSED verbs, since we serialize by ordinal. + // UNUSED verbs serve as padding for backwards compatability where a previous version needs to validate a verb from the future. UNUSED_1, UNUSED_2, UNUSED_3, @@ -263,7 +265,7 @@ public final class MessagingService implements MessagingServiceMBean UNUSED_5, ; - private int id; + private final int id; Verb() { id = ordinal(); @@ -291,7 +293,11 @@ public final class MessagingService implements MessagingServiceMBean static { for (Verb v : values()) + { + if (idToVerbMap.containsKey(v.getId())) + throw new IllegalArgumentException("cannot have two verbs that map to the same id: " + v + " and " + v.getId()); idToVerbMap.put(v.getId(), v); + } } public static Verb fromId(int id) @@ -347,6 +353,8 @@ public final class MessagingService implements MessagingServiceMBean put(Verb.UNUSED_1, Stage.INTERNAL_RESPONSE); put(Verb.UNUSED_2, Stage.INTERNAL_RESPONSE); put(Verb.UNUSED_3, Stage.INTERNAL_RESPONSE); + + put(Verb.PING, Stage.READ); }}; /** @@ -385,6 +393,8 @@ public final class MessagingService implements MessagingServiceMBean put(Verb.HINT, HintMessage.serializer); put(Verb.BATCH_STORE, Batch.serializer); put(Verb.BATCH_REMOVE, UUIDSerializer.serializer); + + put(Verb.PING, PingMessage.serializer); }}; /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/b86801e9/src/java/org/apache/cassandra/net/PingMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/PingMessage.java b/src/java/org/apache/cassandra/net/PingMessage.java new file mode 100644 index 0000000..4a19f22 --- /dev/null +++ b/src/java/org/apache/cassandra/net/PingMessage.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.net; + +import java.io.IOException; + +import org.apache.cassandra.hints.HintResponse; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.net.async.OutboundConnectionIdentifier; +import org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType; + +/** + * Conceptually the same as {@link org.apache.cassandra.gms.EchoMessage}, but indicates to the recipient which + * {@link ConnectionType} should be used for the response. + */ +public class PingMessage +{ + public static IVersionedSerializer<PingMessage> serializer = new PingMessageSerializer(); + + public static final PingMessage smallChannelMessage = new PingMessage(ConnectionType.SMALL_MESSAGE); + public static final PingMessage largeChannelMessage = new PingMessage(ConnectionType.LARGE_MESSAGE); + public static final PingMessage gossipChannelMessage = new PingMessage(ConnectionType.GOSSIP); + + public final ConnectionType connectionType; + + public PingMessage(ConnectionType connectionType) + { + this.connectionType = connectionType; + } + + public static class PingMessageSerializer implements IVersionedSerializer<PingMessage> + { + public void serialize(PingMessage t, DataOutputPlus out, int version) throws IOException + { + out.writeByte(t.connectionType.getId()); + } + + public PingMessage deserialize(DataInputPlus in, int version) throws IOException + { + ConnectionType connectionType = ConnectionType.fromId(in.readByte()); + + // if we ever create a new connection type, then during a rolling upgrade, the old nodes won't know about + // the new connection type (as it won't recognize the id), so just default to the small message type. + if (connectionType == null) + connectionType = ConnectionType.SMALL_MESSAGE; + + switch (connectionType) + { + case LARGE_MESSAGE: + return largeChannelMessage; + case GOSSIP: + return gossipChannelMessage; + case SMALL_MESSAGE: + default: + return smallChannelMessage; + } + } + + public long serializedSize(PingMessage t, int version) + { + return 1; + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/b86801e9/src/java/org/apache/cassandra/net/PingVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/PingVerbHandler.java b/src/java/org/apache/cassandra/net/PingVerbHandler.java new file mode 100644 index 0000000..d959b91 --- /dev/null +++ b/src/java/org/apache/cassandra/net/PingVerbHandler.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.net; + +public class PingVerbHandler implements IVerbHandler<PingMessage> +{ + @Override + public void doVerb(MessageIn<PingMessage> message, int id) + { + MessageOut<PongMessage> msg = new MessageOut<>(MessagingService.Verb.REQUEST_RESPONSE, PongMessage.instance, + PongMessage.serializer, + message.payload.connectionType); + MessagingService.instance().sendReply(msg, id, message.from); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/b86801e9/src/java/org/apache/cassandra/net/PongMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/PongMessage.java b/src/java/org/apache/cassandra/net/PongMessage.java new file mode 100644 index 0000000..bb89cdf --- /dev/null +++ b/src/java/org/apache/cassandra/net/PongMessage.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.net; + +import java.io.IOException; + +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; + +public class PongMessage +{ + public static final PongMessage instance = new PongMessage(); + public static IVersionedSerializer<PongMessage> serializer = new PongMessage.PongMessageSerializer(); + + private PongMessage() + { } + + public static class PongMessageSerializer implements IVersionedSerializer<PongMessage> + { + public void serialize(PongMessage t, DataOutputPlus out, int version) throws IOException + { } + + public PongMessage deserialize(DataInputPlus in, int version) throws IOException + { + return instance; + } + + public long serializedSize(PongMessage t, int version) + { + return 0; + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/b86801e9/src/java/org/apache/cassandra/net/StartupClusterConnectivityChecker.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/StartupClusterConnectivityChecker.java b/src/java/org/apache/cassandra/net/StartupClusterConnectivityChecker.java new file mode 100644 index 0000000..f22ab48 --- /dev/null +++ b/src/java/org/apache/cassandra/net/StartupClusterConnectivityChecker.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.net; + +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +import com.google.common.util.concurrent.Uninterruptibles; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.net.async.OutboundConnectionIdentifier; +import org.apache.cassandra.utils.FBUtilities; + +import static org.apache.cassandra.net.MessagingService.Verb.PING; + +public class StartupClusterConnectivityChecker +{ + private static final Logger logger = LoggerFactory.getLogger(StartupClusterConnectivityChecker.class); + + enum State { CONTINUE, FINISH_SUCCESS, FINISH_TIMEOUT } + + private final int targetPercent; + private final int timeoutSecs; + private final Predicate<InetAddressAndPort> gossipStatus; + + public StartupClusterConnectivityChecker(int targetPercent, int timeoutSecs, Predicate<InetAddressAndPort> gossipStatus) + { + if (targetPercent < 0) + { + targetPercent = 0; + } + else if (targetPercent > 100) + { + targetPercent = 100; + } + this.targetPercent = targetPercent; + + if (timeoutSecs < 0) + { + timeoutSecs = 1; + } + else if (timeoutSecs > 100) + { + logger.warn("setting the block-for-peers timeout (in seconds) to {} might be a bit excessive, but using it nonetheless", timeoutSecs); + } + this.timeoutSecs = timeoutSecs; + + this.gossipStatus = gossipStatus; + } + + public void execute(Set<InetAddressAndPort> peers) + { + if (peers == null || targetPercent == 0) + return; + + // remove current node from the set + peers = peers.stream() + .filter(peer -> !peer.equals(FBUtilities.getBroadcastAddressAndPort())) + .collect(Collectors.toSet()); + + // don't block if there's no other nodes in the cluster (or we don't know about them) + if (peers.size() <= 0) + return; + + logger.info("choosing to block until {}% of peers are marked alive and connections are established; max time to wait = {} seconds", + targetPercent, timeoutSecs); + + // first, send out a ping message to open up the non-gossip connections + final AtomicInteger connectedCount = sendPingMessages(peers); + + final long startNanos = System.nanoTime(); + final long expirationNanos = startNanos + TimeUnit.SECONDS.toNanos(timeoutSecs); + int completedRounds = 0; + while (checkStatus(peers, connectedCount, startNanos, expirationNanos < System.nanoTime(), completedRounds) == State.CONTINUE) + { + completedRounds++; + Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MICROSECONDS); + } + } + + State checkStatus(Set<InetAddressAndPort> peers, AtomicInteger connectedCount, final long startNanos, boolean beyondExpiration, final int completedRounds) + { + long currentAlive = peers.stream().filter(gossipStatus).count(); + float currentAlivePercent = ((float) currentAlive / (float) peers.size()) * 100; + + // assume two connections to remote host that we care to track here (small msg & large msg) + final int totalConnectionsSize = peers.size() * 2; + final int connectionsCount = connectedCount.get(); + float currentConnectedPercent = ((float) connectionsCount / (float) totalConnectionsSize) * 100; + + if (currentAlivePercent >= targetPercent && currentConnectedPercent >= targetPercent) + { + logger.info("after {} milliseconds, found {}% ({} / {}) of peers as marked alive, " + + "and {}% ({} / {}) of peers as connected, " + + "both of which are above the desired threshold of {}%", + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos), + currentAlivePercent, currentAlive, peers.size(), + currentConnectedPercent, connectionsCount, totalConnectionsSize, + targetPercent); + return State.FINISH_SUCCESS; + } + + // perform at least two rounds of checking, else this is kinda useless (and the operator set the aliveTimeoutSecs too low) + if (completedRounds >= 2 && beyondExpiration) + { + logger.info("after {} milliseconds, found {}% ({} / {}) of peers as marked alive, " + + "and {}% ({} / {}) of peers as connected, " + + "one or both of which is below the desired threshold of {}%", + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos), + currentAlivePercent, currentAlive, peers.size(), + currentConnectedPercent, connectionsCount, totalConnectionsSize, + targetPercent); + return State.FINISH_TIMEOUT; + } + return State.CONTINUE; + } + + /** + * Sends a "connection warmup" message to each peer in the collection, on every {@link OutboundConnectionIdentifier.ConnectionType} + * used for internode messaging. + */ + private AtomicInteger sendPingMessages(Set<InetAddressAndPort> peers) + { + AtomicInteger connectedCount = new AtomicInteger(0); + IAsyncCallback responseHandler = new IAsyncCallback() + { + @Override + public boolean isLatencyForSnitch() + { + return false; + } + + @Override + public void response(MessageIn msg) + { + connectedCount.incrementAndGet(); + } + }; + + MessageOut<PingMessage> smallChannelMessageOut = new MessageOut<>(PING, PingMessage.smallChannelMessage, PingMessage.serializer); + MessageOut<PingMessage> largeChannelMessageOut = new MessageOut<>(PING, PingMessage.largeChannelMessage, PingMessage.serializer); + for (InetAddressAndPort peer : peers) + { + MessagingService.instance().sendRR(smallChannelMessageOut, peer, responseHandler); + MessagingService.instance().sendRR(largeChannelMessageOut, peer, responseHandler); + } + + return connectedCount; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/b86801e9/src/java/org/apache/cassandra/net/async/OutboundConnectionIdentifier.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/async/OutboundConnectionIdentifier.java b/src/java/org/apache/cassandra/net/async/OutboundConnectionIdentifier.java index f3cb554..e309065 100644 --- a/src/java/org/apache/cassandra/net/async/OutboundConnectionIdentifier.java +++ b/src/java/org/apache/cassandra/net/async/OutboundConnectionIdentifier.java @@ -18,9 +18,8 @@ package org.apache.cassandra.net.async; -import java.net.InetAddress; -import java.net.InetSocketAddress; - +import com.carrotsearch.hppc.IntObjectMap; +import com.carrotsearch.hppc.IntObjectOpenHashMap; import org.apache.cassandra.locator.InetAddressAndPort; /** @@ -32,9 +31,34 @@ import org.apache.cassandra.locator.InetAddressAndPort; */ public class OutboundConnectionIdentifier { - enum ConnectionType + public enum ConnectionType { - GOSSIP, LARGE_MESSAGE, SMALL_MESSAGE, STREAM + GOSSIP (0), LARGE_MESSAGE (1), SMALL_MESSAGE (2), STREAM (3); + + private final int id; + + ConnectionType(int id) + { + this.id = id; + } + + public int getId() + { + return id; + } + + private static final IntObjectMap<ConnectionType> idMap = new IntObjectOpenHashMap<>(values().length); + static + { + for (ConnectionType type : values()) + idMap.put(type.id, type); + } + + public static ConnectionType fromId(int id) + { + return idMap.get(id); + } + } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/b86801e9/src/java/org/apache/cassandra/net/async/OutboundMessagingConnection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/async/OutboundMessagingConnection.java b/src/java/org/apache/cassandra/net/async/OutboundMessagingConnection.java index 28775ef..064131b 100644 --- a/src/java/org/apache/cassandra/net/async/OutboundMessagingConnection.java +++ b/src/java/org/apache/cassandra/net/async/OutboundMessagingConnection.java @@ -479,7 +479,7 @@ public class OutboundMessagingConnection { case SUCCESS: assert result.channelWriter != null; - logger.debug("successfully connected to {}, conmpress = {}, coalescing = {}", connectionId, + logger.debug("successfully connected to {}, compress = {}, coalescing = {}", connectionId, shouldCompressConnection(connectionId.local(), connectionId.remote()), coalescingStrategy.isPresent() ? coalescingStrategy.get() : CoalescingStrategies.Strategy.DISABLED); if (state.get() == State.CLOSED) http://git-wip-us.apache.org/repos/asf/cassandra/blob/b86801e9/src/java/org/apache/cassandra/net/async/OutboundMessagingPool.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/async/OutboundMessagingPool.java b/src/java/org/apache/cassandra/net/async/OutboundMessagingPool.java index c701229..14650a7 100644 --- a/src/java/org/apache/cassandra/net/async/OutboundMessagingPool.java +++ b/src/java/org/apache/cassandra/net/async/OutboundMessagingPool.java @@ -18,7 +18,6 @@ package org.apache.cassandra.net.async; -import java.net.InetSocketAddress; import java.util.Optional; import com.google.common.annotations.VisibleForTesting; @@ -101,14 +100,21 @@ public class OutboundMessagingPool @VisibleForTesting public OutboundMessagingConnection getConnection(MessageOut msg) { - // optimize for the common path (the small message channel) - if (Stage.GOSSIP != msg.getStage()) + if (msg.connectionType == null) { - return msg.serializedSize(smallMessageChannel.getTargetVersion()) < LARGE_MESSAGE_THRESHOLD - ? smallMessageChannel - : largeMessageChannel; + // optimize for the common path (the small message channel) + if (Stage.GOSSIP != msg.getStage()) + { + return msg.serializedSize(smallMessageChannel.getTargetVersion()) < LARGE_MESSAGE_THRESHOLD + ? smallMessageChannel + : largeMessageChannel; + } + return gossipChannel; + } + else + { + return getConnection(msg.connectionType); } - return gossipChannel; } /** @@ -138,20 +144,17 @@ public class OutboundMessagingPool smallMessageChannel.close(softClose); } - /** - * For testing purposes only. - */ @VisibleForTesting - OutboundMessagingConnection getConnection(ConnectionType connectionType) + final OutboundMessagingConnection getConnection(ConnectionType connectionType) { switch (connectionType) { - case GOSSIP: - return gossipChannel; - case LARGE_MESSAGE: - return largeMessageChannel; case SMALL_MESSAGE: return smallMessageChannel; + case LARGE_MESSAGE: + return largeMessageChannel; + case GOSSIP: + return gossipChannel; default: throw new IllegalArgumentException("unsupported connection type: " + connectionType); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/b86801e9/src/java/org/apache/cassandra/service/CassandraDaemon.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java index 130f3fd..295a33b 100644 --- a/src/java/org/apache/cassandra/service/CassandraDaemon.java +++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java @@ -25,7 +25,10 @@ import java.net.InetAddress; import java.net.URL; import java.net.UnknownHostException; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import javax.management.MBeanServer; import javax.management.ObjectName; import javax.management.StandardMBean; @@ -47,6 +50,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.net.StartupClusterConnectivityChecker; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.schema.Schema; @@ -491,6 +495,12 @@ public class CassandraDaemon */ public void start() { + StartupClusterConnectivityChecker connectivityChecker = new StartupClusterConnectivityChecker(DatabaseDescriptor.getBlockForPeersPercentage(), + DatabaseDescriptor.getBlockForPeersTimeoutInSeconds(), + Gossiper.instance::isAlive); + Set<InetAddressAndPort> peers = Gossiper.instance.getEndpointStates().stream().map(Map.Entry::getKey).collect(Collectors.toSet()); + connectivityChecker.execute(peers); + String nativeFlag = System.getProperty("cassandra.start_native_transport"); if ((nativeFlag != null && Boolean.parseBoolean(nativeFlag)) || (nativeFlag == null && DatabaseDescriptor.startNativeTransport())) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/b86801e9/src/java/org/apache/cassandra/service/EchoVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/EchoVerbHandler.java b/src/java/org/apache/cassandra/service/EchoVerbHandler.java index d0c435e..1cc52e9 100644 --- a/src/java/org/apache/cassandra/service/EchoVerbHandler.java +++ b/src/java/org/apache/cassandra/service/EchoVerbHandler.java @@ -26,16 +26,20 @@ import org.apache.cassandra.net.IVerbHandler; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType; + public class EchoVerbHandler implements IVerbHandler<EchoMessage> { private static final Logger logger = LoggerFactory.getLogger(EchoVerbHandler.class); public void doVerb(MessageIn<EchoMessage> message, int id) { - MessageOut<EchoMessage> echoMessage = new MessageOut<EchoMessage>(MessagingService.Verb.REQUEST_RESPONSE, EchoMessage.instance, EchoMessage.serializer); + MessageOut<EchoMessage> echoMessage = new MessageOut<EchoMessage>(MessagingService.Verb.REQUEST_RESPONSE, EchoMessage.instance, + EchoMessage.serializer, ConnectionType.GOSSIP); logger.trace("Sending a EchoMessage reply {}", message.from); MessagingService.instance().sendReply(echoMessage, id, message.from); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/b86801e9/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index d465431..51b77a6 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -92,6 +92,7 @@ import org.apache.cassandra.schema.SchemaVersionVerbHandler; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.schema.TableMetadataRef; import org.apache.cassandra.schema.ViewMetadata; +import org.apache.cassandra.repair.RepairMessageVerbHandler; import org.apache.cassandra.service.paxos.CommitVerbHandler; import org.apache.cassandra.service.paxos.PrepareVerbHandler; import org.apache.cassandra.service.paxos.ProposeVerbHandler; @@ -301,6 +302,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE MessagingService.instance().registerVerbHandlers(MessagingService.Verb.BATCH_STORE, new BatchStoreVerbHandler()); MessagingService.instance().registerVerbHandlers(MessagingService.Verb.BATCH_REMOVE, new BatchRemoveVerbHandler()); + + MessagingService.instance().registerVerbHandlers(MessagingService.Verb.PING, new PingVerbHandler()); } public void registerDaemon(CassandraDaemon daemon) http://git-wip-us.apache.org/repos/asf/cassandra/blob/b86801e9/test/unit/org/apache/cassandra/net/StartupClusterConnectivityCheckerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/net/StartupClusterConnectivityCheckerTest.java b/test/unit/org/apache/cassandra/net/StartupClusterConnectivityCheckerTest.java new file mode 100644 index 0000000..12f54c6 --- /dev/null +++ b/test/unit/org/apache/cassandra/net/StartupClusterConnectivityCheckerTest.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.net; + +import java.net.UnknownHostException; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Predicate; + +import com.google.common.net.InetAddresses; +import org.junit.Assert; +import org.junit.Test; + +import org.apache.cassandra.locator.InetAddressAndPort; + +public class StartupClusterConnectivityCheckerTest +{ + @Test + public void testConnectivity_SimpleHappyPath() throws UnknownHostException + { + StartupClusterConnectivityChecker connectivityChecker = new StartupClusterConnectivityChecker(70, 10, addr -> true); + int count = 10; + Set<InetAddressAndPort> peers = createNodes(count); + Assert.assertEquals(StartupClusterConnectivityChecker.State.FINISH_SUCCESS, + connectivityChecker.checkStatus(peers, new AtomicInteger(count * 2), System.nanoTime(), false, 0)); + } + + @Test + public void testConnectivity_SimpleContinue() throws UnknownHostException + { + StartupClusterConnectivityChecker connectivityChecker = new StartupClusterConnectivityChecker(70, 10, addr -> true); + int count = 10; + Set<InetAddressAndPort> peers = createNodes(count); + Assert.assertEquals(StartupClusterConnectivityChecker.State.CONTINUE, + connectivityChecker.checkStatus(peers, new AtomicInteger(0), System.nanoTime(), false, 0)); + } + + @Test + public void testConnectivity_Timeout() throws UnknownHostException + { + StartupClusterConnectivityChecker connectivityChecker = new StartupClusterConnectivityChecker(70, 10, addr -> true); + int count = 10; + Set<InetAddressAndPort> peers = createNodes(count); + Assert.assertEquals(StartupClusterConnectivityChecker.State.CONTINUE, + connectivityChecker.checkStatus(peers, new AtomicInteger(0), System.nanoTime(), false, 4)); + Assert.assertEquals(StartupClusterConnectivityChecker.State.FINISH_TIMEOUT, + connectivityChecker.checkStatus(peers, new AtomicInteger(0), System.nanoTime(), true, 5)); + } + + @Test + public void testConnectivity_SimpleUpdating() throws UnknownHostException + { + UpdatablePredicate predicate = new UpdatablePredicate(); + final int count = 100; + final int thresholdPercentage = 70; + StartupClusterConnectivityChecker connectivityChecker = new StartupClusterConnectivityChecker(thresholdPercentage, 10, predicate); + Set<InetAddressAndPort> peers = createNodes(count); + + AtomicInteger connectedCount = new AtomicInteger(); + + for (int i = 0; i < count; i++) + { + predicate.reset(i); + connectedCount.set(i * 2); + StartupClusterConnectivityChecker.State expectedState = i < thresholdPercentage ? + StartupClusterConnectivityChecker.State.CONTINUE : + StartupClusterConnectivityChecker.State.FINISH_SUCCESS; + Assert.assertEquals("failed on iteration " + i, + expectedState, connectivityChecker.checkStatus(peers, connectedCount, System.nanoTime(), false, i)); + } + } + + /** + * returns true until index = threshold, then returns false. + */ + private class UpdatablePredicate implements Predicate<InetAddressAndPort> + { + int index; + int threshold; + + void reset(int threshold) + { + index = 0; + this.threshold = threshold; + } + + @Override + public boolean test(InetAddressAndPort inetAddressAndPort) + { + index++; + return index <= threshold; + } + } + + private static Set<InetAddressAndPort> createNodes(int count) throws UnknownHostException + { + Set<InetAddressAndPort> nodes = new HashSet<>(); + + if (count < 1) + Assert.fail("need at least *one* node in the set!"); + + InetAddressAndPort node = InetAddressAndPort.getByName("127.0.0.1"); + nodes.add(node); + for (int i = 1; i < count; i++) + { + node = InetAddressAndPort.getByAddress(InetAddresses.increment(node.address)); + nodes.add(node); + } + return nodes; + } + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/b86801e9/test/unit/org/apache/cassandra/service/RemoveTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/RemoveTest.java b/test/unit/org/apache/cassandra/service/RemoveTest.java index 6714a83..0d39322 100644 --- a/test/unit/org/apache/cassandra/service/RemoveTest.java +++ b/test/unit/org/apache/cassandra/service/RemoveTest.java @@ -161,7 +161,7 @@ public class RemoveTest for (InetAddressAndPort host : hosts) { - MessageOut msg = new MessageOut(host, MessagingService.Verb.REPLICATION_FINISHED, null, null, Collections.<Object>emptyList()); + MessageOut msg = new MessageOut(host, MessagingService.Verb.REPLICATION_FINISHED, null, null, Collections.<Object>emptyList(), null); MessagingService.instance().sendRR(msg, FBUtilities.getBroadcastAddressAndPort()); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org