This is an automated email from the ASF dual-hosted git repository. dcapwell pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit e4fac3582e0a9dda182313a3aa784be35d965f4e Merge: 94663c3 50d8245 Author: David Capwell <dcapw...@apache.org> AuthorDate: Fri Nov 13 12:37:46 2020 -0800 Merge branch 'cassandra-3.11' into trunk .../apache/cassandra/config/CassandraRelevantProperties.java | 11 +++++++++++ src/java/org/apache/cassandra/service/StorageService.java | 12 +++++++----- .../apache/cassandra/distributed/action/GossipHelper.java | 7 ++++++- .../cassandra/distributed/test/ring/BootstrapTest.java | 6 ++++-- 4 files changed, 28 insertions(+), 8 deletions(-) diff --cc src/java/org/apache/cassandra/config/CassandraRelevantProperties.java index 881b7d9,0000000..7402aa1 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java +++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java @@@ -1,240 -1,0 +1,251 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.config; + +import org.apache.cassandra.exceptions.ConfigurationException; + +/** A class that extracts system properties for the cassandra node it runs within. */ +public enum CassandraRelevantProperties +{ + //base JVM properties + JAVA_HOME("java.home"), + CASSANDRA_PID_FILE ("cassandra-pidfile"), + + /** + * Indicates the temporary directory used by the Java Virtual Machine (JVM) + * to create and store temporary files. + */ + JAVA_IO_TMPDIR ("java.io.tmpdir"), + + /** + * Path from which to load native libraries. + * Default is absolute path to lib directory. + */ + JAVA_LIBRARY_PATH ("java.library.path"), + + JAVA_SECURITY_EGD ("java.security.egd"), + + /** Java Runtime Environment version */ + JAVA_VERSION ("java.version"), + + /** Java Virtual Machine implementation name */ + JAVA_VM_NAME ("java.vm.name"), + + /** Line separator ("\n" on UNIX). */ + LINE_SEPARATOR ("line.separator"), + + /** Java class path. */ + JAVA_CLASS_PATH ("java.class.path"), + + /** Operating system architecture. */ + OS_ARCH ("os.arch"), + + /** Operating system name. */ + OS_NAME ("os.name"), + + /** User's home directory. */ + USER_HOME ("user.home"), + + /** Platform word size sun.arch.data.model. Examples: "32", "64", "unknown"*/ + SUN_ARCH_DATA_MODEL ("sun.arch.data.model"), + + //JMX properties + /** + * The value of this property represents the host name string + * that should be associated with remote stubs for locally created remote objects, + * in order to allow clients to invoke methods on the remote object. + */ + JAVA_RMI_SERVER_HOSTNAME ("java.rmi.server.hostname"), + + /** + * If this value is true, object identifiers for remote objects exported by this VM will be generated by using + * a cryptographically secure random number generator. The default value is false. + */ + JAVA_RMI_SERVER_RANDOM_ID ("java.rmi.server.randomIDs"), + + /** + * This property indicates whether password authentication for remote monitoring is + * enabled. By default it is disabled - com.sun.management.jmxremote.authenticate + */ + COM_SUN_MANAGEMENT_JMXREMOTE_AUTHENTICATE ("com.sun.management.jmxremote.authenticate"), + + /** + * The port number to which the RMI connector will be bound - com.sun.management.jmxremote.rmi.port. + * An Integer object that represents the value of the second argument is returned + * if there is no port specified, if the port does not have the correct numeric format, + * or if the specified name is empty or null. + */ + COM_SUN_MANAGEMENT_JMXREMOTE_RMI_PORT ("com.sun.management.jmxremote.rmi.port", "0"), + + /** Cassandra jmx remote port */ + CASSANDRA_JMX_REMOTE_PORT("cassandra.jmx.remote.port"), + + /** This property indicates whether SSL is enabled for monitoring remotely. Default is set to false. */ + COM_SUN_MANAGEMENT_JMXREMOTE_SSL ("com.sun.management.jmxremote.ssl"), + + /** + * This property indicates whether SSL client authentication is enabled - com.sun.management.jmxremote.ssl.need.client.auth. + * Default is set to false. + */ + COM_SUN_MANAGEMENT_JMXREMOTE_SSL_NEED_CLIENT_AUTH ("com.sun.management.jmxremote.ssl.need.client.auth"), + + /** + * This property indicates the location for the access file. If com.sun.management.jmxremote.authenticate is false, + * then this property and the password and access files, are ignored. Otherwise, the access file must exist and + * be in the valid format. If the access file is empty or nonexistent, then no access is allowed. + */ + COM_SUN_MANAGEMENT_JMXREMOTE_ACCESS_FILE ("com.sun.management.jmxremote.access.file"), + + /** This property indicates the path to the password file - com.sun.management.jmxremote.password.file */ + COM_SUN_MANAGEMENT_JMXREMOTE_PASSWORD_FILE ("com.sun.management.jmxremote.password.file"), + + /** Port number to enable JMX RMI connections - com.sun.management.jmxremote.port */ + COM_SUN_MANAGEMENT_JMXREMOTE_PORT ("com.sun.management.jmxremote.port"), + + /** + * A comma-delimited list of SSL/TLS protocol versions to enable. + * Used in conjunction with com.sun.management.jmxremote.ssl - com.sun.management.jmxremote.ssl.enabled.protocols + */ + COM_SUN_MANAGEMENT_JMXREMOTE_SSL_ENABLED_PROTOCOLS ("com.sun.management.jmxremote.ssl.enabled.protocols"), + + /** + * A comma-delimited list of SSL/TLS cipher suites to enable. + * Used in conjunction with com.sun.management.jmxremote.ssl - com.sun.management.jmxremote.ssl.enabled.cipher.suites + */ + COM_SUN_MANAGEMENT_JMXREMOTE_SSL_ENABLED_CIPHER_SUITES ("com.sun.management.jmxremote.ssl.enabled.cipher.suites"), + + /** mx4jaddress */ + MX4JADDRESS ("mx4jaddress"), + + /** mx4jport */ + MX4JPORT ("mx4jport"), + ++ /** ++ * When bootstraping we wait for all schema versions found in gossip to be seen, and if not seen in time we fail ++ * the bootstrap; this property will avoid failing and allow bootstrap to continue if set to true. ++ */ ++ BOOTSTRAP_SKIP_SCHEMA_CHECK("cassandra.skip_schema_check"), ++ ++ /** ++ * When bootstraping how long to wait for schema versions to be seen. ++ */ ++ BOOTSTRAP_SCHEMA_DELAY_MS("cassandra.schema_delay_ms"), ++ + //cassandra properties (without the "cassandra." prefix) + + /** + * The cassandra-foreground option will tell CassandraDaemon whether + * to close stdout/stderr, but it's up to us not to background. + * yes/null + */ + CASSANDRA_FOREGROUND ("cassandra-foreground"), + + DEFAULT_PROVIDE_OVERLAPPING_TOMBSTONES ("default.provide.overlapping.tombstones"), + ORG_APACHE_CASSANDRA_DISABLE_MBEAN_REGISTRATION ("org.apache.cassandra.disable_mbean_registration"), + //only for testing + ORG_APACHE_CASSANDRA_CONF_CASSANDRA_RELEVANT_PROPERTIES_TEST("org.apache.cassandra.conf.CassandraRelevantPropertiesTest"), + ORG_APACHE_CASSANDRA_DB_VIRTUAL_SYSTEM_PROPERTIES_TABLE_TEST("org.apache.cassandra.db.virtual.SystemPropertiesTableTest"), + + /** This property indicates whether disable_mbean_registration is true */ + IS_DISABLED_MBEAN_REGISTRATION("org.apache.cassandra.disable_mbean_registration"); + + CassandraRelevantProperties(String key, String defaultVal) + { + this.key = key; + this.defaultVal = defaultVal; + } + + CassandraRelevantProperties(String key) + { + this.key = key; + this.defaultVal = null; + } + + private final String key; + private final String defaultVal; + + public String getKey() + { + return key; + } + + /** + * Gets the value of the indicated system property. + * @return system property value if it exists, defaultValue otherwise. + */ + public String getString() + { + String value = System.getProperty(key); + + return value == null ? defaultVal : STRING_CONVERTER.convert(value); + } + + /** + * Gets the value of a system property as a boolean. + * @return system property boolean value if it exists, false otherwise(). + */ + public boolean getBoolean() + { + String value = System.getProperty(key); + + return BOOLEAN_CONVERTER.convert(value == null ? defaultVal : value); + } + + /** + * Gets the value of a system property as a int. + * @return system property int value if it exists, defaultValue otherwise. + */ + public int getInt() + { + String value = System.getProperty(key); + + return INTEGER_CONVERTER.convert(value == null ? defaultVal : value); + } + + private interface PropertyConverter<T> + { + T convert(String value); + } + + private static final PropertyConverter<String> STRING_CONVERTER = value -> value; + + private static final PropertyConverter<Boolean> BOOLEAN_CONVERTER = Boolean::parseBoolean; + + private static final PropertyConverter<Integer> INTEGER_CONVERTER = value -> + { + try + { + return Integer.decode(value); + } + catch (NumberFormatException e) + { + throw new ConfigurationException(String.format("Invalid value for system property: " + + "expected integer value but got '%s'", value)); + } + }; + + /** + * @return whether a system property is present or not. + */ + public boolean isPresent() + { + return System.getProperties().containsKey(key); + } +} + diff --cc src/java/org/apache/cassandra/service/StorageService.java index 3201d80,eb13df1..7d27163 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@@ -108,24 -96,15 +108,26 @@@ import org.apache.cassandra.utils.* import org.apache.cassandra.utils.logging.LoggingSupportFactory; import org.apache.cassandra.utils.progress.ProgressEvent; import org.apache.cassandra.utils.progress.ProgressEventType; +import org.apache.cassandra.utils.progress.ProgressListener; +import org.apache.cassandra.utils.progress.jmx.JMXBroadcastExecutor; import org.apache.cassandra.utils.progress.jmx.JMXProgressSupport; -import org.apache.cassandra.utils.progress.jmx.LegacyJMXProgressSupport; +import static com.google.common.collect.Iterables.transform; +import static com.google.common.collect.Iterables.tryFind; import static java.util.Arrays.asList; +import static java.util.Arrays.stream; import static java.util.concurrent.TimeUnit.MINUTES; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.NANOSECONDS; import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toMap; ++import static org.apache.cassandra.config.CassandraRelevantProperties.BOOTSTRAP_SCHEMA_DELAY_MS; ++import static org.apache.cassandra.config.CassandraRelevantProperties.BOOTSTRAP_SKIP_SCHEMA_CHECK; import static org.apache.cassandra.index.SecondaryIndexManager.getIndexName; import static org.apache.cassandra.index.SecondaryIndexManager.isIndexColumnFamily; -import static org.apache.cassandra.service.MigrationManager.evolveSystemKeyspace; +import static org.apache.cassandra.net.NoPayload.noPayload; +import static org.apache.cassandra.net.Verb.REPLICATION_DONE_REQ; +import static org.apache.cassandra.schema.MigrationManager.evolveSystemKeyspace; /** * This abstraction contains the token/identifier of this node @@@ -137,11 -116,10 +139,11 @@@ public class StorageService extends Not { private static final Logger logger = LoggerFactory.getLogger(StorageService.class); + public static final int INDEFINITE = -1; public static final int RING_DELAY = getRingDelay(); // delay after which we assume ring has stablized - public static final int SCHEMA_DELAY = getRingDelay(); // delay after which we assume ring has stablized + public static final int SCHEMA_DELAY_MILLIS = getSchemaDelay(); -- private static final boolean REQUIRE_SCHEMAS = !Boolean.getBoolean("cassandra.skip_schema_check"); ++ private static final boolean REQUIRE_SCHEMAS = !BOOTSTRAP_SKIP_SCHEMA_CHECK.getBoolean(); private final JMXProgressSupport progressSupport = new JMXProgressSupport(this); @@@ -161,10 -146,10 +163,10 @@@ private static int getSchemaDelay() { -- String newdelay = System.getProperty("cassandra.schema_delay_ms"); ++ String newdelay = BOOTSTRAP_SCHEMA_DELAY_MS.getString(); if (newdelay != null) { - logger.info("Overriding SCHEMA_DELAY to {}ms", newdelay); + logger.info("Overriding SCHEMA_DELAY_MILLIS to {}ms", newdelay); return Integer.parseInt(newdelay); } else diff --cc test/distributed/org/apache/cassandra/distributed/action/GossipHelper.java index 6dda98e,0000000..229cb39 mode 100644,000000..100644 --- a/test/distributed/org/apache/cassandra/distributed/action/GossipHelper.java +++ b/test/distributed/org/apache/cassandra/distributed/action/GossipHelper.java @@@ -1,461 -1,0 +1,466 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.action; + +import java.io.IOException; +import java.io.Serializable; +import java.net.InetSocketAddress; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +import org.apache.cassandra.db.SystemKeyspace; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.distributed.api.IInstance; +import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.distributed.api.IIsolatedExecutor; +import org.apache.cassandra.distributed.shared.VersionedApplicationState; +import org.apache.cassandra.gms.ApplicationState; +import org.apache.cassandra.gms.EndpointState; +import org.apache.cassandra.gms.Gossiper; +import org.apache.cassandra.gms.VersionedValue; +import org.apache.cassandra.io.util.DataInputBuffer; +import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.schema.MigrationCoordinator; +import org.apache.cassandra.schema.MigrationManager; +import org.apache.cassandra.service.PendingRangeCalculatorService; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.FBUtilities; + +import static org.apache.cassandra.distributed.impl.DistributedTestSnitch.toCassandraInetAddressAndPort; + +public class GossipHelper +{ + public static InstanceAction statusToBootstrap(IInvokableInstance newNode) + { + return (instance) -> + { + changeGossipState(instance, + newNode, + Arrays.asList(tokens(newNode), + statusBootstrapping(newNode), + statusWithPortBootstrapping(newNode))); + }; + } + + public static InstanceAction statusToNormal(IInvokableInstance peer) + { + return (target) -> + { + changeGossipState(target, + peer, + Arrays.asList(tokens(peer), + statusNormal(peer), + releaseVersion(peer), + netVersion(peer), + statusWithPortNormal(peer))); + }; + } + + /** + * This method is unsafe and should be used _only_ when gossip is not used or available: it creates versioned values on the + * target instance, which means Gossip versioning gets out of sync. Use a safe couterpart at all times when performing _any_ + * ring movement operations _or_ if Gossip is used. + */ + public static void unsafeStatusToNormal(IInvokableInstance target, IInstance peer) + { + int messagingVersion = peer.getMessagingVersion(); + changeGossipState(target, + peer, + Arrays.asList(unsafeVersionedValue(target, + ApplicationState.TOKENS, + (partitioner, tokens) -> new VersionedValue.VersionedValueFactory(partitioner).tokens(tokens), + peer.config().getString("partitioner"), + peer.config().getString("initial_token")), + unsafeVersionedValue(target, + ApplicationState.STATUS, + (partitioner, tokens) -> new VersionedValue.VersionedValueFactory(partitioner).normal(tokens), + peer.config().getString("partitioner"), + peer.config().getString("initial_token")), + unsafeVersionedValue(target, + ApplicationState.STATUS_WITH_PORT, + (partitioner, tokens) -> new VersionedValue.VersionedValueFactory(partitioner).normal(tokens), + peer.config().getString("partitioner"), + peer.config().getString("initial_token")), + unsafeVersionedValue(target, + ApplicationState.NET_VERSION, + (partitioner) -> new VersionedValue.VersionedValueFactory(partitioner).networkVersion(messagingVersion), + peer.config().getString("partitioner")), + unsafeReleaseVersion(target, + peer.config().getString("partitioner"), + peer.getReleaseVersionString()))); + } + + public static InstanceAction statusToLeaving(IInvokableInstance newNode) + { + return (instance) -> { + changeGossipState(instance, + newNode, + Arrays.asList(tokens(newNode), + statusLeaving(newNode), + statusWithPortLeaving(newNode))); + }; + } + + public static InstanceAction bootstrap() + { + return new BootstrapAction(); + } + + public static InstanceAction bootstrap(boolean joinRing, Duration waitForBootstrap, Duration waitForSchema) + { + return new BootstrapAction(joinRing, waitForBootstrap, waitForSchema); + } + + public static InstanceAction disseminateGossipState(IInvokableInstance newNode) + { + return new DisseminateGossipState(newNode); + } + + public static InstanceAction pullSchemaFrom(IInvokableInstance pullFrom) + { + return new PullSchemaFrom(pullFrom); + } + + private static InstanceAction disableBinary() + { + return (instance) -> instance.nodetoolResult("disablebinary").asserts().success(); + } + + private static class DisseminateGossipState implements InstanceAction + { + final Map<InetSocketAddress, byte[]> gossipState; + + public DisseminateGossipState(IInvokableInstance... from) + { + gossipState = new HashMap<>(); + for (IInvokableInstance node : from) + { + byte[] epBytes = node.callsOnInstance(() -> { + EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(FBUtilities.getBroadcastAddressAndPort()); + return toBytes(epState); + }).call(); + gossipState.put(node.broadcastAddress(), epBytes); + } + } + + public void accept(IInvokableInstance instance) + { + instance.appliesOnInstance((IIsolatedExecutor.SerializableFunction<Map<InetSocketAddress, byte[]>, Void>) + (map) -> { + Map<InetAddressAndPort, EndpointState> newState = new HashMap<>(); + for (Map.Entry<InetSocketAddress, byte[]> e : map.entrySet()) + newState.put(toCassandraInetAddressAndPort(e.getKey()), fromBytes(e.getValue())); + + Gossiper.runInGossipStageBlocking(() -> { + Gossiper.instance.applyStateLocally(newState); + }); + return null; + }).apply(gossipState); + } + } + + private static byte[] toBytes(EndpointState epState) + { + try (DataOutputBuffer out = new DataOutputBuffer(1024)) + { + EndpointState.serializer.serialize(epState, out, MessagingService.current_version); + return out.toByteArray(); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + + private static EndpointState fromBytes(byte[] bytes) + { + try (DataInputBuffer in = new DataInputBuffer(bytes)) + { + return EndpointState.serializer.deserialize(in, MessagingService.current_version); + } + catch (Throwable t) + { + throw new RuntimeException(t); + } + } + + private static class PullSchemaFrom implements InstanceAction + { + final InetSocketAddress pullFrom; + + public PullSchemaFrom(IInvokableInstance pullFrom) + { + this.pullFrom = pullFrom.broadcastAddress();; + } + + public void accept(IInvokableInstance pullTo) + { + pullTo.acceptsOnInstance((InetSocketAddress pullFrom) -> { + InetAddressAndPort endpoint = toCassandraInetAddressAndPort(pullFrom); + EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint); + MigrationCoordinator.instance.reportEndpointVersion(endpoint, state); + MigrationCoordinator.instance.awaitSchemaRequests(TimeUnit.SECONDS.toMillis(10)); + }).accept(pullFrom); + } + } + + private static class BootstrapAction implements InstanceAction, Serializable + { + private final boolean joinRing; + private final Duration waitForBootstrap; + private final Duration waitForSchema; + + public BootstrapAction() + { + this(true, Duration.ofMinutes(10), Duration.ofSeconds(10)); + } + + public BootstrapAction(boolean joinRing, Duration waitForBootstrap, Duration waitForSchema) + { + this.joinRing = joinRing; + this.waitForBootstrap = waitForBootstrap; + this.waitForSchema = waitForSchema; + } + + public void accept(IInvokableInstance instance) + { + instance.appliesOnInstance((String partitionerString, String tokenString) -> { + IPartitioner partitioner = FBUtilities.newPartitioner(partitionerString); + List<Token> tokens = Collections.singletonList(partitioner.getTokenFactory().fromString(tokenString)); + try + { + Collection<InetAddressAndPort> collisions = StorageService.instance.prepareForBootstrap(waitForSchema.toMillis()); + assert collisions.size() == 0 : String.format("Didn't expect any replacements but got %s", collisions); + boolean isBootstrapSuccessful = StorageService.instance.bootstrap(tokens, waitForBootstrap.toMillis()); + assert isBootstrapSuccessful : "Bootstrap did not complete successfully"; + StorageService.instance.setUpDistributedSystemKeyspaces(); + if (joinRing) + StorageService.instance.finishJoiningRing(true, tokens); + } + catch (Throwable t) + { + throw new RuntimeException(t); + } + + return null; + }).apply(instance.config().getString("partitioner"), instance.config().getString("initial_token")); + } + } + + public static InstanceAction decomission() + { + return (target) -> target.nodetoolResult("decommission").asserts().success(); + } + + + public static VersionedApplicationState tokens(IInvokableInstance instance) + { + return versionedToken(instance, ApplicationState.TOKENS, (partitioner, tokens) -> new VersionedValue.VersionedValueFactory(partitioner).tokens(tokens)); + } + + public static VersionedApplicationState netVersion(IInvokableInstance instance) + { + return versionedToken(instance, ApplicationState.NET_VERSION, (partitioner, tokens) -> new VersionedValue.VersionedValueFactory(partitioner).networkVersion()); + } + + private static VersionedApplicationState unsafeReleaseVersion(IInvokableInstance instance, String partitionerStr, String releaseVersionStr) + { + return unsafeVersionedValue(instance, ApplicationState.RELEASE_VERSION, (partitioner) -> new VersionedValue.VersionedValueFactory(partitioner).releaseVersion(releaseVersionStr), partitionerStr); + } + + public static VersionedApplicationState releaseVersion(IInvokableInstance instance) + { + return unsafeReleaseVersion(instance, instance.config().getString("partitioner"), instance.getReleaseVersionString()); + } + + public static VersionedApplicationState statusNormal(IInvokableInstance instance) + { + return versionedToken(instance, ApplicationState.STATUS, (partitioner, tokens) -> new VersionedValue.VersionedValueFactory(partitioner).normal(tokens)); + } + + public static VersionedApplicationState statusWithPortNormal(IInvokableInstance instance) + { + return versionedToken(instance, ApplicationState.STATUS_WITH_PORT, (partitioner, tokens) -> new VersionedValue.VersionedValueFactory(partitioner).normal(tokens)); + } + + public static VersionedApplicationState statusBootstrapping(IInvokableInstance instance) + { + return versionedToken(instance, ApplicationState.STATUS, (partitioner, tokens) -> new VersionedValue.VersionedValueFactory(partitioner).bootstrapping(tokens)); + } + + public static VersionedApplicationState statusWithPortBootstrapping(IInvokableInstance instance) + { + return versionedToken(instance, ApplicationState.STATUS_WITH_PORT, (partitioner, tokens) -> new VersionedValue.VersionedValueFactory(partitioner).bootstrapping(tokens)); + } + + public static VersionedApplicationState statusLeaving(IInvokableInstance instance) + { + return versionedToken(instance, ApplicationState.STATUS, (partitioner, tokens) -> new VersionedValue.VersionedValueFactory(partitioner).leaving(tokens)); + } + + public static VersionedApplicationState statusLeft(IInvokableInstance instance) + { + return versionedToken(instance, ApplicationState.STATUS, (partitioner, tokens) -> { + return new VersionedValue.VersionedValueFactory(partitioner).left(tokens, System.currentTimeMillis() + Gossiper.aVeryLongTime); + }); + } + + public static VersionedApplicationState statusWithPortLeft(IInvokableInstance instance) + { + return versionedToken(instance, ApplicationState.STATUS_WITH_PORT, (partitioner, tokens) -> { + return new VersionedValue.VersionedValueFactory(partitioner).left(tokens, System.currentTimeMillis() + Gossiper.aVeryLongTime); + + }); + } + + public static VersionedApplicationState statusWithPortLeaving(IInvokableInstance instance) + { + return versionedToken(instance, ApplicationState.STATUS_WITH_PORT, (partitioner, tokens) -> new VersionedValue.VersionedValueFactory(partitioner).leaving(tokens)); + } + + public static VersionedValue toVersionedValue(VersionedApplicationState vv) + { + return VersionedValue.unsafeMakeVersionedValue(vv.value, vv.version); + } + + public static ApplicationState toApplicationState(VersionedApplicationState vv) + { + return ApplicationState.values()[vv.applicationState]; + } + + private static VersionedApplicationState unsafeVersionedValue(IInvokableInstance instance, + ApplicationState applicationState, + IIsolatedExecutor.SerializableBiFunction<IPartitioner, Collection<Token>, VersionedValue> supplier, + String partitionerStr, String initialTokenStr) + { + return instance.appliesOnInstance((String partitionerString, String tokenString) -> { + IPartitioner partitioner = FBUtilities.newPartitioner(partitionerString); + Token token = partitioner.getTokenFactory().fromString(tokenString); + + VersionedValue versionedValue = supplier.apply(partitioner, Collections.singleton(token)); + return new VersionedApplicationState(applicationState.ordinal(), versionedValue.value, versionedValue.version); + }).apply(partitionerStr, initialTokenStr); + } + + private static VersionedApplicationState unsafeVersionedValue(IInvokableInstance instance, + ApplicationState applicationState, + IIsolatedExecutor.SerializableFunction<IPartitioner, VersionedValue> supplier, + String partitionerStr) + { + return instance.appliesOnInstance((String partitionerString) -> { + IPartitioner partitioner = FBUtilities.newPartitioner(partitionerString); + VersionedValue versionedValue = supplier.apply(partitioner); + return new VersionedApplicationState(applicationState.ordinal(), versionedValue.value, versionedValue.version); + }).apply(partitionerStr); + } + + public static VersionedApplicationState versionedToken(IInvokableInstance instance, ApplicationState applicationState, IIsolatedExecutor.SerializableBiFunction<IPartitioner, Collection<Token>, VersionedValue> supplier) + { + return unsafeVersionedValue(instance, applicationState, supplier, instance.config().getString("partitioner"), instance.config().getString("initial_token")); + } + + public static InstanceAction removeFromRing(IInvokableInstance peer) + { + return (target) -> { + InetAddressAndPort endpoint = toCassandraInetAddressAndPort(peer.broadcastAddress()); + VersionedApplicationState newState = statusLeft(peer); + + target.runOnInstance(() -> { + // state to 'left' + EndpointState currentState = Gossiper.instance.getEndpointStateForEndpoint(endpoint); + ApplicationState as = toApplicationState(newState); + VersionedValue vv = toVersionedValue(newState); + currentState.addApplicationState(as, vv); + StorageService.instance.onChange(endpoint, as, vv); + + // remove from gossip + Gossiper.runInGossipStageBlocking(() -> Gossiper.instance.unsafeAnulEndpoint(endpoint)); + SystemKeyspace.removeEndpoint(endpoint); + PendingRangeCalculatorService.instance.update(); + PendingRangeCalculatorService.instance.blockUntilFinished(); + }); + }; + } + + /** + * Changes gossip state of the `peer` on `target` + */ + public static void changeGossipState(IInvokableInstance target, IInstance peer, List<VersionedApplicationState> newState) + { + InetSocketAddress addr = peer.broadcastAddress(); + UUID hostId = peer.config().hostId(); + int netVersion = peer.getMessagingVersion(); + target.runOnInstance(() -> { + InetAddressAndPort endpoint = toCassandraInetAddressAndPort(addr); + StorageService storageService = StorageService.instance; + + Gossiper.runInGossipStageBlocking(() -> { + EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint); + if (state == null) + { + Gossiper.instance.initializeNodeUnsafe(endpoint, hostId, netVersion, 1); + state = Gossiper.instance.getEndpointStateForEndpoint(endpoint); + if (state.isAlive() && !Gossiper.instance.isDeadState(state)) + Gossiper.instance.realMarkAlive(endpoint, state); + } + + for (VersionedApplicationState value : newState) + { + ApplicationState as = toApplicationState(value); + VersionedValue vv = toVersionedValue(value); + state.addApplicationState(as, vv); + storageService.onChange(endpoint, as, vv); + } + }); + }); + } + + public static void withProperty(String prop, boolean value, Runnable r) + { ++ withProperty(prop, Boolean.toString(value), r); ++ } ++ ++ public static void withProperty(String prop, String value, Runnable r) ++ { + String before = System.getProperty(prop); + try + { - System.setProperty(prop, Boolean.toString(value)); ++ System.setProperty(prop, value); + r.run(); + } + finally + { + if (before == null) + System.clearProperty(prop); + else + System.setProperty(prop, before); + } + } +} diff --cc test/distributed/org/apache/cassandra/distributed/test/ring/BootstrapTest.java index e0c5a78,0000000..15a8d00 mode 100644,000000..100644 --- a/test/distributed/org/apache/cassandra/distributed/test/ring/BootstrapTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/ring/BootstrapTest.java @@@ -1,130 -1,0 +1,132 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.ring; + +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.api.ICluster; +import org.apache.cassandra.distributed.api.IInstanceConfig; +import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.distributed.api.TokenSupplier; +import org.apache.cassandra.distributed.shared.NetworkTopology; +import org.apache.cassandra.distributed.test.TestBaseImpl; + +import static java.util.Arrays.asList; ++import static org.apache.cassandra.config.CassandraRelevantProperties.BOOTSTRAP_SCHEMA_DELAY_MS; +import static org.apache.cassandra.distributed.action.GossipHelper.bootstrap; +import static org.apache.cassandra.distributed.action.GossipHelper.pullSchemaFrom; +import static org.apache.cassandra.distributed.action.GossipHelper.statusToBootstrap; +import static org.apache.cassandra.distributed.action.GossipHelper.withProperty; +import static org.apache.cassandra.distributed.api.Feature.GOSSIP; +import static org.apache.cassandra.distributed.api.Feature.NETWORK; + +public class BootstrapTest extends TestBaseImpl +{ + @Test + public void bootstrapTest() throws Throwable + { + int originalNodeCount = 2; + int expandedNodeCount = originalNodeCount + 1; + + try (Cluster cluster = builder().withNodes(originalNodeCount) + .withTokenSupplier(TokenSupplier.evenlyDistributedTokens(expandedNodeCount)) + .withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(expandedNodeCount, "dc0", "rack0")) + .withConfig(config -> config.with(NETWORK, GOSSIP)) + .start()) + { + populate(cluster,0, 100); + + IInstanceConfig config = cluster.newInstanceConfig(); + IInvokableInstance newInstance = cluster.bootstrap(config); + withProperty("cassandra.join_ring", false, + () -> newInstance.startup(cluster)); + + cluster.forEach(statusToBootstrap(newInstance)); + + cluster.run(asList(pullSchemaFrom(cluster.get(1)), + bootstrap()), + newInstance.config().num()); + + for (Map.Entry<Integer, Long> e : count(cluster).entrySet()) + Assert.assertEquals("Node " + e.getKey() + " has incorrect row state", + 100L, + e.getValue().longValue()); + } + } + + @Test + public void autoBootstrapTest() throws Throwable + { + int originalNodeCount = 2; + int expandedNodeCount = originalNodeCount + 1; + + try (Cluster cluster = builder().withNodes(originalNodeCount) + .withTokenSupplier(TokenSupplier.evenlyDistributedTokens(expandedNodeCount)) + .withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(expandedNodeCount, "dc0", "rack0")) + .withConfig(config -> config.with(NETWORK, GOSSIP)) + .start()) + { + populate(cluster,0, 100); + + IInstanceConfig config = cluster.newInstanceConfig(); + config.set("auto_bootstrap", true); + IInvokableInstance newInstance = cluster.bootstrap(config); - withProperty("cassandra.join_ring", false, - () -> newInstance.startup(cluster)); ++ withProperty(BOOTSTRAP_SCHEMA_DELAY_MS.getKey(), Integer.toString(90 * 1000), ++ () -> withProperty("cassandra.join_ring", false, ++ () -> newInstance.startup(cluster))); + + newInstance.nodetoolResult("join").asserts().success(); + + for (Map.Entry<Integer, Long> e : count(cluster).entrySet()) + Assert.assertEquals("Node " + e.getKey() + " has incorrect row state", e.getValue().longValue(), 100L); + } + } + + public static void populate(ICluster cluster, int from, int to) + { + populate(cluster, from, to, 1, 3, ConsistencyLevel.QUORUM); + } + + public static void populate(ICluster cluster, int from, int to, int coord, int rf, ConsistencyLevel cl) + { + cluster.schemaChange("CREATE KEYSPACE IF NOT EXISTS " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + rf + "};"); + cluster.schemaChange("CREATE TABLE IF NOT EXISTS " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); + for (int i = from; i < to; i++) + { + cluster.coordinator(coord).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (?, ?, ?)", + cl, + i, i, i); + } + } + + public static Map<Integer, Long> count(ICluster cluster) + { + return IntStream.rangeClosed(1, cluster.size()) + .boxed() + .collect(Collectors.toMap(nodeId -> nodeId, + nodeId -> (Long) cluster.get(nodeId).executeInternal("SELECT count(*) FROM " + KEYSPACE + ".tbl")[0][0])); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org