This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit e4fac3582e0a9dda182313a3aa784be35d965f4e
Merge: 94663c3 50d8245
Author: David Capwell <dcapw...@apache.org>
AuthorDate: Fri Nov 13 12:37:46 2020 -0800

    Merge branch 'cassandra-3.11' into trunk

 .../apache/cassandra/config/CassandraRelevantProperties.java | 11 +++++++++++
 src/java/org/apache/cassandra/service/StorageService.java    | 12 +++++++-----
 .../apache/cassandra/distributed/action/GossipHelper.java    |  7 ++++++-
 .../cassandra/distributed/test/ring/BootstrapTest.java       |  6 ++++--
 4 files changed, 28 insertions(+), 8 deletions(-)

diff --cc src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
index 881b7d9,0000000..7402aa1
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
+++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
@@@ -1,240 -1,0 +1,251 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.config;
 +
 +import org.apache.cassandra.exceptions.ConfigurationException;
 +
 +/** A class that extracts system properties for the cassandra node it runs 
within. */
 +public enum CassandraRelevantProperties
 +{
 +    //base JVM properties
 +    JAVA_HOME("java.home"),
 +    CASSANDRA_PID_FILE ("cassandra-pidfile"),
 +
 +    /**
 +     * Indicates the temporary directory used by the Java Virtual Machine 
(JVM)
 +     * to create and store temporary files.
 +     */
 +    JAVA_IO_TMPDIR ("java.io.tmpdir"),
 +
 +    /**
 +     * Path from which to load native libraries.
 +     * Default is absolute path to lib directory.
 +     */
 +    JAVA_LIBRARY_PATH ("java.library.path"),
 +
 +    JAVA_SECURITY_EGD ("java.security.egd"),
 +
 +    /** Java Runtime Environment version */
 +    JAVA_VERSION ("java.version"),
 +
 +    /** Java Virtual Machine implementation name */
 +    JAVA_VM_NAME ("java.vm.name"),
 +
 +    /** Line separator ("\n" on UNIX). */
 +    LINE_SEPARATOR ("line.separator"),
 +
 +    /** Java class path. */
 +    JAVA_CLASS_PATH ("java.class.path"),
 +
 +    /** Operating system architecture. */
 +    OS_ARCH ("os.arch"),
 +
 +    /** Operating system name. */
 +    OS_NAME ("os.name"),
 +
 +    /** User's home directory. */
 +    USER_HOME ("user.home"),
 +
 +    /** Platform word size sun.arch.data.model. Examples: "32", "64", 
"unknown"*/
 +    SUN_ARCH_DATA_MODEL ("sun.arch.data.model"),
 +
 +    //JMX properties
 +    /**
 +     * The value of this property represents the host name string
 +     * that should be associated with remote stubs for locally created remote 
objects,
 +     * in order to allow clients to invoke methods on the remote object.
 +     */
 +    JAVA_RMI_SERVER_HOSTNAME ("java.rmi.server.hostname"),
 +
 +    /**
 +     * If this value is true, object identifiers for remote objects exported 
by this VM will be generated by using
 +     * a cryptographically secure random number generator. The default value 
is false.
 +     */
 +    JAVA_RMI_SERVER_RANDOM_ID ("java.rmi.server.randomIDs"),
 +
 +    /**
 +     * This property indicates whether password authentication for remote 
monitoring is
 +     * enabled. By default it is disabled - 
com.sun.management.jmxremote.authenticate
 +     */
 +    COM_SUN_MANAGEMENT_JMXREMOTE_AUTHENTICATE 
("com.sun.management.jmxremote.authenticate"),
 +
 +    /**
 +     * The port number to which the RMI connector will be bound - 
com.sun.management.jmxremote.rmi.port.
 +     * An Integer object that represents the value of the second argument is 
returned
 +     * if there is no port specified, if the port does not have the correct 
numeric format,
 +     * or if the specified name is empty or null.
 +     */
 +    COM_SUN_MANAGEMENT_JMXREMOTE_RMI_PORT 
("com.sun.management.jmxremote.rmi.port", "0"),
 +
 +    /** Cassandra jmx remote port */
 +    CASSANDRA_JMX_REMOTE_PORT("cassandra.jmx.remote.port"),
 +
 +    /** This property  indicates whether SSL is enabled for monitoring 
remotely. Default is set to false. */
 +    COM_SUN_MANAGEMENT_JMXREMOTE_SSL ("com.sun.management.jmxremote.ssl"),
 +
 +    /**
 +     * This property indicates whether SSL client authentication is enabled - 
com.sun.management.jmxremote.ssl.need.client.auth.
 +     * Default is set to false.
 +     */
 +    COM_SUN_MANAGEMENT_JMXREMOTE_SSL_NEED_CLIENT_AUTH 
("com.sun.management.jmxremote.ssl.need.client.auth"),
 +
 +    /**
 +     * This property indicates the location for the access file. If 
com.sun.management.jmxremote.authenticate is false,
 +     * then this property and the password and access files, are ignored. 
Otherwise, the access file must exist and
 +     * be in the valid format. If the access file is empty or nonexistent, 
then no access is allowed.
 +     */
 +    COM_SUN_MANAGEMENT_JMXREMOTE_ACCESS_FILE 
("com.sun.management.jmxremote.access.file"),
 +
 +    /** This property indicates the path to the password file - 
com.sun.management.jmxremote.password.file */
 +    COM_SUN_MANAGEMENT_JMXREMOTE_PASSWORD_FILE 
("com.sun.management.jmxremote.password.file"),
 +
 +    /** Port number to enable JMX RMI connections - 
com.sun.management.jmxremote.port */
 +    COM_SUN_MANAGEMENT_JMXREMOTE_PORT ("com.sun.management.jmxremote.port"),
 +
 +    /**
 +     * A comma-delimited list of SSL/TLS protocol versions to enable.
 +     * Used in conjunction with com.sun.management.jmxremote.ssl - 
com.sun.management.jmxremote.ssl.enabled.protocols
 +     */
 +    COM_SUN_MANAGEMENT_JMXREMOTE_SSL_ENABLED_PROTOCOLS 
("com.sun.management.jmxremote.ssl.enabled.protocols"),
 +
 +    /**
 +     * A comma-delimited list of SSL/TLS cipher suites to enable.
 +     * Used in conjunction with com.sun.management.jmxremote.ssl - 
com.sun.management.jmxremote.ssl.enabled.cipher.suites
 +     */
 +    COM_SUN_MANAGEMENT_JMXREMOTE_SSL_ENABLED_CIPHER_SUITES 
("com.sun.management.jmxremote.ssl.enabled.cipher.suites"),
 +
 +    /** mx4jaddress */
 +    MX4JADDRESS ("mx4jaddress"),
 +
 +    /** mx4jport */
 +    MX4JPORT ("mx4jport"),
 +
++    /**
++     * When bootstraping we wait for all schema versions found in gossip to 
be seen, and if not seen in time we fail
++     * the bootstrap; this property will avoid failing and allow bootstrap to 
continue if set to true.
++     */
++    BOOTSTRAP_SKIP_SCHEMA_CHECK("cassandra.skip_schema_check"),
++
++    /**
++     * When bootstraping how long to wait for schema versions to be seen.
++     */
++    BOOTSTRAP_SCHEMA_DELAY_MS("cassandra.schema_delay_ms"),
++
 +    //cassandra properties (without the "cassandra." prefix)
 +
 +    /**
 +     * The cassandra-foreground option will tell CassandraDaemon whether
 +     * to close stdout/stderr, but it's up to us not to background.
 +     * yes/null
 +     */
 +    CASSANDRA_FOREGROUND ("cassandra-foreground"),
 +
 +    DEFAULT_PROVIDE_OVERLAPPING_TOMBSTONES 
("default.provide.overlapping.tombstones"),
 +    ORG_APACHE_CASSANDRA_DISABLE_MBEAN_REGISTRATION 
("org.apache.cassandra.disable_mbean_registration"),
 +    //only for testing
 +    
ORG_APACHE_CASSANDRA_CONF_CASSANDRA_RELEVANT_PROPERTIES_TEST("org.apache.cassandra.conf.CassandraRelevantPropertiesTest"),
 +    
ORG_APACHE_CASSANDRA_DB_VIRTUAL_SYSTEM_PROPERTIES_TABLE_TEST("org.apache.cassandra.db.virtual.SystemPropertiesTableTest"),
 +
 +    /** This property indicates whether disable_mbean_registration is true */
 +    
IS_DISABLED_MBEAN_REGISTRATION("org.apache.cassandra.disable_mbean_registration");
 +
 +    CassandraRelevantProperties(String key, String defaultVal)
 +    {
 +        this.key = key;
 +        this.defaultVal = defaultVal;
 +    }
 +
 +    CassandraRelevantProperties(String key)
 +    {
 +        this.key = key;
 +        this.defaultVal = null;
 +    }
 +
 +    private final String key;
 +    private final String defaultVal;
 +
 +    public String getKey()
 +    {
 +        return key;
 +    }
 +
 +    /**
 +     * Gets the value of the indicated system property.
 +     * @return system property value if it exists, defaultValue otherwise.
 +     */
 +    public String getString()
 +    {
 +        String value = System.getProperty(key);
 +
 +        return value == null ? defaultVal : STRING_CONVERTER.convert(value);
 +    }
 +
 +    /**
 +     * Gets the value of a system property as a boolean.
 +     * @return system property boolean value if it exists, false otherwise().
 +     */
 +    public boolean getBoolean()
 +    {
 +        String value = System.getProperty(key);
 +
 +        return BOOLEAN_CONVERTER.convert(value == null ? defaultVal : value);
 +    }
 +
 +    /**
 +     * Gets the value of a system property as a int.
 +     * @return system property int value if it exists, defaultValue otherwise.
 +     */
 +    public int getInt()
 +    {
 +        String value = System.getProperty(key);
 +
 +        return INTEGER_CONVERTER.convert(value == null ? defaultVal : value);
 +    }
 +
 +    private interface PropertyConverter<T>
 +    {
 +        T convert(String value);
 +    }
 +
 +    private static final PropertyConverter<String> STRING_CONVERTER = value 
-> value;
 +
 +    private static final PropertyConverter<Boolean> BOOLEAN_CONVERTER = 
Boolean::parseBoolean;
 +
 +    private static final PropertyConverter<Integer> INTEGER_CONVERTER = value 
->
 +    {
 +        try
 +        {
 +            return Integer.decode(value);
 +        }
 +        catch (NumberFormatException e)
 +        {
 +            throw new ConfigurationException(String.format("Invalid value for 
system property: " +
 +                                                           "expected integer 
value but got '%s'", value));
 +        }
 +    };
 +
 +    /**
 +     * @return whether a system property is present or not.
 +     */
 +    public boolean isPresent()
 +    {
 +        return System.getProperties().containsKey(key);
 +    }
 +}
 +
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index 3201d80,eb13df1..7d27163
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -108,24 -96,15 +108,26 @@@ import org.apache.cassandra.utils.*
  import org.apache.cassandra.utils.logging.LoggingSupportFactory;
  import org.apache.cassandra.utils.progress.ProgressEvent;
  import org.apache.cassandra.utils.progress.ProgressEventType;
 +import org.apache.cassandra.utils.progress.ProgressListener;
 +import org.apache.cassandra.utils.progress.jmx.JMXBroadcastExecutor;
  import org.apache.cassandra.utils.progress.jmx.JMXProgressSupport;
 -import org.apache.cassandra.utils.progress.jmx.LegacyJMXProgressSupport;
  
 +import static com.google.common.collect.Iterables.transform;
 +import static com.google.common.collect.Iterables.tryFind;
  import static java.util.Arrays.asList;
 +import static java.util.Arrays.stream;
  import static java.util.concurrent.TimeUnit.MINUTES;
 +import static java.util.concurrent.TimeUnit.MILLISECONDS;
 +import static java.util.concurrent.TimeUnit.NANOSECONDS;
  import static java.util.stream.Collectors.toList;
 +import static java.util.stream.Collectors.toMap;
++import static 
org.apache.cassandra.config.CassandraRelevantProperties.BOOTSTRAP_SCHEMA_DELAY_MS;
++import static 
org.apache.cassandra.config.CassandraRelevantProperties.BOOTSTRAP_SKIP_SCHEMA_CHECK;
  import static org.apache.cassandra.index.SecondaryIndexManager.getIndexName;
  import static 
org.apache.cassandra.index.SecondaryIndexManager.isIndexColumnFamily;
 -import static 
org.apache.cassandra.service.MigrationManager.evolveSystemKeyspace;
 +import static org.apache.cassandra.net.NoPayload.noPayload;
 +import static org.apache.cassandra.net.Verb.REPLICATION_DONE_REQ;
 +import static 
org.apache.cassandra.schema.MigrationManager.evolveSystemKeyspace;
  
  /**
   * This abstraction contains the token/identifier of this node
@@@ -137,11 -116,10 +139,11 @@@ public class StorageService extends Not
  {
      private static final Logger logger = 
LoggerFactory.getLogger(StorageService.class);
  
 +    public static final int INDEFINITE = -1;
      public static final int RING_DELAY = getRingDelay(); // delay after which 
we assume ring has stablized
-     public static final int SCHEMA_DELAY = getRingDelay(); // delay after 
which we assume ring has stablized
+     public static final int SCHEMA_DELAY_MILLIS = getSchemaDelay();
  
--    private static final boolean REQUIRE_SCHEMAS = 
!Boolean.getBoolean("cassandra.skip_schema_check");
++    private static final boolean REQUIRE_SCHEMAS = 
!BOOTSTRAP_SKIP_SCHEMA_CHECK.getBoolean();
  
      private final JMXProgressSupport progressSupport = new 
JMXProgressSupport(this);
  
@@@ -161,10 -146,10 +163,10 @@@
  
      private static int getSchemaDelay()
      {
--        String newdelay = System.getProperty("cassandra.schema_delay_ms");
++        String newdelay = BOOTSTRAP_SCHEMA_DELAY_MS.getString();
          if (newdelay != null)
          {
-             logger.info("Overriding SCHEMA_DELAY to {}ms", newdelay);
+             logger.info("Overriding SCHEMA_DELAY_MILLIS to {}ms", newdelay);
              return Integer.parseInt(newdelay);
          }
          else
diff --cc 
test/distributed/org/apache/cassandra/distributed/action/GossipHelper.java
index 6dda98e,0000000..229cb39
mode 100644,000000..100644
--- a/test/distributed/org/apache/cassandra/distributed/action/GossipHelper.java
+++ b/test/distributed/org/apache/cassandra/distributed/action/GossipHelper.java
@@@ -1,461 -1,0 +1,466 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.distributed.action;
 +
 +import java.io.IOException;
 +import java.io.Serializable;
 +import java.net.InetSocketAddress;
 +import java.time.Duration;
 +import java.util.Arrays;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.UUID;
 +import java.util.concurrent.TimeUnit;
 +import java.util.function.Supplier;
 +
 +import org.apache.cassandra.db.SystemKeyspace;
 +import org.apache.cassandra.dht.IPartitioner;
 +import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.distributed.api.IInstance;
 +import org.apache.cassandra.distributed.api.IInvokableInstance;
 +import org.apache.cassandra.distributed.api.IIsolatedExecutor;
 +import org.apache.cassandra.distributed.shared.VersionedApplicationState;
 +import org.apache.cassandra.gms.ApplicationState;
 +import org.apache.cassandra.gms.EndpointState;
 +import org.apache.cassandra.gms.Gossiper;
 +import org.apache.cassandra.gms.VersionedValue;
 +import org.apache.cassandra.io.util.DataInputBuffer;
 +import org.apache.cassandra.io.util.DataOutputBuffer;
 +import org.apache.cassandra.locator.InetAddressAndPort;
 +import org.apache.cassandra.net.MessagingService;
 +import org.apache.cassandra.schema.MigrationCoordinator;
 +import org.apache.cassandra.schema.MigrationManager;
 +import org.apache.cassandra.service.PendingRangeCalculatorService;
 +import org.apache.cassandra.service.StorageService;
 +import org.apache.cassandra.utils.FBUtilities;
 +
 +import static 
org.apache.cassandra.distributed.impl.DistributedTestSnitch.toCassandraInetAddressAndPort;
 +
 +public class GossipHelper
 +{
 +    public static InstanceAction statusToBootstrap(IInvokableInstance newNode)
 +    {
 +        return (instance) ->
 +        {
 +            changeGossipState(instance,
 +                              newNode,
 +                              Arrays.asList(tokens(newNode),
 +                                            statusBootstrapping(newNode),
 +                                            
statusWithPortBootstrapping(newNode)));
 +        };
 +    }
 +
 +    public static InstanceAction statusToNormal(IInvokableInstance peer)
 +    {
 +        return (target) ->
 +        {
 +            changeGossipState(target,
 +                              peer,
 +                              Arrays.asList(tokens(peer),
 +                                            statusNormal(peer),
 +                                            releaseVersion(peer),
 +                                            netVersion(peer),
 +                                            statusWithPortNormal(peer)));
 +        };
 +    }
 +
 +    /**
 +     * This method is unsafe and should be used _only_ when gossip is not 
used or available: it creates versioned values on the
 +     * target instance, which means Gossip versioning gets out of sync. Use a 
safe couterpart at all times when performing _any_
 +     * ring movement operations _or_ if Gossip is used.
 +     */
 +    public static void unsafeStatusToNormal(IInvokableInstance target, 
IInstance peer)
 +    {
 +        int messagingVersion = peer.getMessagingVersion();
 +        changeGossipState(target,
 +                          peer,
 +                          Arrays.asList(unsafeVersionedValue(target,
 +                                                             
ApplicationState.TOKENS,
 +                                                             (partitioner, 
tokens) -> new VersionedValue.VersionedValueFactory(partitioner).tokens(tokens),
 +                                                             
peer.config().getString("partitioner"),
 +                                                             
peer.config().getString("initial_token")),
 +                                        unsafeVersionedValue(target,
 +                                                             
ApplicationState.STATUS,
 +                                                             (partitioner, 
tokens) -> new VersionedValue.VersionedValueFactory(partitioner).normal(tokens),
 +                                                             
peer.config().getString("partitioner"),
 +                                                             
peer.config().getString("initial_token")),
 +                                        unsafeVersionedValue(target,
 +                                                             
ApplicationState.STATUS_WITH_PORT,
 +                                                             (partitioner, 
tokens) -> new VersionedValue.VersionedValueFactory(partitioner).normal(tokens),
 +                                                             
peer.config().getString("partitioner"),
 +                                                             
peer.config().getString("initial_token")),
 +                                        unsafeVersionedValue(target,
 +                                                             
ApplicationState.NET_VERSION,
 +                                                             (partitioner) -> 
new 
VersionedValue.VersionedValueFactory(partitioner).networkVersion(messagingVersion),
 +                                                             
peer.config().getString("partitioner")),
 +                                        unsafeReleaseVersion(target,
 +                                                             
peer.config().getString("partitioner"),
 +                                                             
peer.getReleaseVersionString())));
 +    }
 +
 +    public static InstanceAction statusToLeaving(IInvokableInstance newNode)
 +    {
 +        return (instance) -> {
 +            changeGossipState(instance,
 +                              newNode,
 +                              Arrays.asList(tokens(newNode),
 +                                            statusLeaving(newNode),
 +                                            statusWithPortLeaving(newNode)));
 +        };
 +    }
 +
 +    public static InstanceAction bootstrap()
 +    {
 +        return new BootstrapAction();
 +    }
 +
 +    public static InstanceAction bootstrap(boolean joinRing, Duration 
waitForBootstrap, Duration waitForSchema)
 +    {
 +        return new BootstrapAction(joinRing, waitForBootstrap, waitForSchema);
 +    }
 +
 +    public static InstanceAction disseminateGossipState(IInvokableInstance 
newNode)
 +    {
 +        return new DisseminateGossipState(newNode);
 +    }
 +
 +    public static InstanceAction pullSchemaFrom(IInvokableInstance pullFrom)
 +    {
 +        return new PullSchemaFrom(pullFrom);
 +    }
 +
 +    private static InstanceAction disableBinary()
 +    {
 +        return (instance) -> 
instance.nodetoolResult("disablebinary").asserts().success();
 +    }
 +
 +    private static class DisseminateGossipState implements InstanceAction
 +    {
 +        final Map<InetSocketAddress, byte[]> gossipState;
 +
 +        public DisseminateGossipState(IInvokableInstance... from)
 +        {
 +            gossipState = new HashMap<>();
 +            for (IInvokableInstance node : from)
 +            {
 +                byte[] epBytes = node.callsOnInstance(() -> {
 +                    EndpointState epState = 
Gossiper.instance.getEndpointStateForEndpoint(FBUtilities.getBroadcastAddressAndPort());
 +                    return toBytes(epState);
 +                }).call();
 +                gossipState.put(node.broadcastAddress(), epBytes);
 +            }
 +        }
 +
 +        public void accept(IInvokableInstance instance)
 +        {
 +            
instance.appliesOnInstance((IIsolatedExecutor.SerializableFunction<Map<InetSocketAddress,
 byte[]>, Void>)
 +                                       (map) -> {
 +                                           Map<InetAddressAndPort, 
EndpointState> newState = new HashMap<>();
 +                                           for (Map.Entry<InetSocketAddress, 
byte[]> e : map.entrySet())
 +                                               
newState.put(toCassandraInetAddressAndPort(e.getKey()), 
fromBytes(e.getValue()));
 +
 +                                           
Gossiper.runInGossipStageBlocking(() -> {
 +                                               
Gossiper.instance.applyStateLocally(newState);
 +                                           });
 +                                           return null;
 +                                       }).apply(gossipState);
 +        }
 +    }
 +
 +    private static byte[] toBytes(EndpointState epState)
 +    {
 +        try (DataOutputBuffer out = new DataOutputBuffer(1024))
 +        {
 +            EndpointState.serializer.serialize(epState, out, 
MessagingService.current_version);
 +            return out.toByteArray();
 +        }
 +        catch (IOException e)
 +        {
 +            throw new RuntimeException(e);
 +        }
 +    }
 +
 +    private static EndpointState fromBytes(byte[] bytes)
 +    {
 +        try (DataInputBuffer in = new DataInputBuffer(bytes))
 +        {
 +            return EndpointState.serializer.deserialize(in, 
MessagingService.current_version);
 +        }
 +        catch (Throwable t)
 +        {
 +            throw new RuntimeException(t);
 +        }
 +    }
 +
 +    private static class PullSchemaFrom implements InstanceAction
 +    {
 +        final InetSocketAddress pullFrom;
 +
 +        public PullSchemaFrom(IInvokableInstance pullFrom)
 +        {
 +            this.pullFrom = pullFrom.broadcastAddress();;
 +        }
 +
 +        public void accept(IInvokableInstance pullTo)
 +        {
 +            pullTo.acceptsOnInstance((InetSocketAddress pullFrom) -> {
 +                InetAddressAndPort endpoint = 
toCassandraInetAddressAndPort(pullFrom);
 +                EndpointState state = 
Gossiper.instance.getEndpointStateForEndpoint(endpoint);
 +                MigrationCoordinator.instance.reportEndpointVersion(endpoint, 
state);
 +                
MigrationCoordinator.instance.awaitSchemaRequests(TimeUnit.SECONDS.toMillis(10));
 +            }).accept(pullFrom);
 +        }
 +    }
 +
 +    private static class BootstrapAction implements InstanceAction, 
Serializable
 +    {
 +        private final boolean joinRing;
 +        private final Duration waitForBootstrap;
 +        private final Duration waitForSchema;
 +
 +        public BootstrapAction()
 +        {
 +            this(true, Duration.ofMinutes(10), Duration.ofSeconds(10));
 +        }
 +
 +        public BootstrapAction(boolean joinRing, Duration waitForBootstrap, 
Duration waitForSchema)
 +        {
 +            this.joinRing = joinRing;
 +            this.waitForBootstrap = waitForBootstrap;
 +            this.waitForSchema = waitForSchema;
 +        }
 +
 +        public void accept(IInvokableInstance instance)
 +        {
 +            instance.appliesOnInstance((String partitionerString, String 
tokenString) -> {
 +                IPartitioner partitioner = 
FBUtilities.newPartitioner(partitionerString);
 +                List<Token> tokens = 
Collections.singletonList(partitioner.getTokenFactory().fromString(tokenString));
 +                try
 +                {
 +                    Collection<InetAddressAndPort> collisions = 
StorageService.instance.prepareForBootstrap(waitForSchema.toMillis());
 +                    assert collisions.size() == 0 : String.format("Didn't 
expect any replacements but got %s", collisions);
 +                    boolean isBootstrapSuccessful = 
StorageService.instance.bootstrap(tokens, waitForBootstrap.toMillis());
 +                    assert isBootstrapSuccessful : "Bootstrap did not 
complete successfully";
 +                    StorageService.instance.setUpDistributedSystemKeyspaces();
 +                    if (joinRing)
 +                        StorageService.instance.finishJoiningRing(true, 
tokens);
 +                }
 +                catch (Throwable t)
 +                {
 +                    throw new RuntimeException(t);
 +                }
 +
 +                return null;
 +            }).apply(instance.config().getString("partitioner"), 
instance.config().getString("initial_token"));
 +        }
 +    }
 +
 +    public static InstanceAction decomission()
 +    {
 +        return (target) -> 
target.nodetoolResult("decommission").asserts().success();
 +    }
 +
 +
 +    public static VersionedApplicationState tokens(IInvokableInstance 
instance)
 +    {
 +        return versionedToken(instance, ApplicationState.TOKENS, 
(partitioner, tokens) -> new 
VersionedValue.VersionedValueFactory(partitioner).tokens(tokens));
 +    }
 +
 +    public static VersionedApplicationState netVersion(IInvokableInstance 
instance)
 +    {
 +        return versionedToken(instance, ApplicationState.NET_VERSION, 
(partitioner, tokens) -> new 
VersionedValue.VersionedValueFactory(partitioner).networkVersion());
 +    }
 +
 +    private static VersionedApplicationState 
unsafeReleaseVersion(IInvokableInstance instance, String partitionerStr, String 
releaseVersionStr)
 +    {
 +        return unsafeVersionedValue(instance, 
ApplicationState.RELEASE_VERSION, (partitioner) -> new 
VersionedValue.VersionedValueFactory(partitioner).releaseVersion(releaseVersionStr),
 partitionerStr);
 +    }
 +
 +    public static VersionedApplicationState releaseVersion(IInvokableInstance 
instance)
 +    {
 +        return unsafeReleaseVersion(instance, 
instance.config().getString("partitioner"), instance.getReleaseVersionString());
 +    }
 +
 +    public static VersionedApplicationState statusNormal(IInvokableInstance 
instance)
 +    {
 +        return versionedToken(instance, ApplicationState.STATUS, 
(partitioner, tokens) -> new 
VersionedValue.VersionedValueFactory(partitioner).normal(tokens));
 +    }
 +
 +    public static VersionedApplicationState 
statusWithPortNormal(IInvokableInstance instance)
 +    {
 +        return versionedToken(instance, ApplicationState.STATUS_WITH_PORT, 
(partitioner, tokens) -> new 
VersionedValue.VersionedValueFactory(partitioner).normal(tokens));
 +    }
 +
 +    public static VersionedApplicationState 
statusBootstrapping(IInvokableInstance instance)
 +    {
 +        return versionedToken(instance, ApplicationState.STATUS, 
(partitioner, tokens) -> new 
VersionedValue.VersionedValueFactory(partitioner).bootstrapping(tokens));
 +    }
 +
 +    public static VersionedApplicationState 
statusWithPortBootstrapping(IInvokableInstance instance)
 +    {
 +        return versionedToken(instance, ApplicationState.STATUS_WITH_PORT, 
(partitioner, tokens) -> new 
VersionedValue.VersionedValueFactory(partitioner).bootstrapping(tokens));
 +    }
 +
 +    public static VersionedApplicationState statusLeaving(IInvokableInstance 
instance)
 +    {
 +        return versionedToken(instance, ApplicationState.STATUS, 
(partitioner, tokens) -> new 
VersionedValue.VersionedValueFactory(partitioner).leaving(tokens));
 +    }
 +
 +    public static VersionedApplicationState statusLeft(IInvokableInstance 
instance)
 +    {
 +        return versionedToken(instance, ApplicationState.STATUS, 
(partitioner, tokens) -> {
 +            return new 
VersionedValue.VersionedValueFactory(partitioner).left(tokens, 
System.currentTimeMillis() + Gossiper.aVeryLongTime);
 +        });
 +    }
 +
 +    public static VersionedApplicationState 
statusWithPortLeft(IInvokableInstance instance)
 +    {
 +        return versionedToken(instance, ApplicationState.STATUS_WITH_PORT, 
(partitioner, tokens) -> {
 +            return new 
VersionedValue.VersionedValueFactory(partitioner).left(tokens, 
System.currentTimeMillis() + Gossiper.aVeryLongTime);
 +
 +        });
 +    }
 +
 +    public static VersionedApplicationState 
statusWithPortLeaving(IInvokableInstance instance)
 +    {
 +        return versionedToken(instance, ApplicationState.STATUS_WITH_PORT, 
(partitioner, tokens) -> new 
VersionedValue.VersionedValueFactory(partitioner).leaving(tokens));
 +    }
 +
 +    public static VersionedValue toVersionedValue(VersionedApplicationState 
vv)
 +    {
 +        return VersionedValue.unsafeMakeVersionedValue(vv.value, vv.version);
 +    }
 +
 +    public static ApplicationState 
toApplicationState(VersionedApplicationState vv)
 +    {
 +        return ApplicationState.values()[vv.applicationState];
 +    }
 +
 +    private static VersionedApplicationState 
unsafeVersionedValue(IInvokableInstance instance,
 +                                                                 
ApplicationState applicationState,
 +                                                                 
IIsolatedExecutor.SerializableBiFunction<IPartitioner, Collection<Token>, 
VersionedValue> supplier,
 +                                                                 String 
partitionerStr, String initialTokenStr)
 +    {
 +        return instance.appliesOnInstance((String partitionerString, String 
tokenString) -> {
 +            IPartitioner partitioner = 
FBUtilities.newPartitioner(partitionerString);
 +            Token token = 
partitioner.getTokenFactory().fromString(tokenString);
 +
 +            VersionedValue versionedValue = supplier.apply(partitioner, 
Collections.singleton(token));
 +            return new VersionedApplicationState(applicationState.ordinal(), 
versionedValue.value, versionedValue.version);
 +        }).apply(partitionerStr, initialTokenStr);
 +    }
 +
 +    private static VersionedApplicationState 
unsafeVersionedValue(IInvokableInstance instance,
 +                                                                 
ApplicationState applicationState,
 +                                                                 
IIsolatedExecutor.SerializableFunction<IPartitioner, VersionedValue> supplier,
 +                                                                 String 
partitionerStr)
 +    {
 +        return instance.appliesOnInstance((String partitionerString) -> {
 +            IPartitioner partitioner = 
FBUtilities.newPartitioner(partitionerString);
 +            VersionedValue versionedValue = supplier.apply(partitioner);
 +            return new VersionedApplicationState(applicationState.ordinal(), 
versionedValue.value, versionedValue.version);
 +        }).apply(partitionerStr);
 +    }
 +
 +    public static VersionedApplicationState versionedToken(IInvokableInstance 
instance, ApplicationState applicationState, 
IIsolatedExecutor.SerializableBiFunction<IPartitioner, Collection<Token>, 
VersionedValue> supplier)
 +    {
 +        return unsafeVersionedValue(instance, applicationState, supplier, 
instance.config().getString("partitioner"), 
instance.config().getString("initial_token"));
 +    }
 +
 +    public static InstanceAction removeFromRing(IInvokableInstance peer)
 +    {
 +        return (target) -> {
 +            InetAddressAndPort endpoint = 
toCassandraInetAddressAndPort(peer.broadcastAddress());
 +            VersionedApplicationState newState = statusLeft(peer);
 +
 +            target.runOnInstance(() -> {
 +                // state to 'left'
 +                EndpointState currentState = 
Gossiper.instance.getEndpointStateForEndpoint(endpoint);
 +                ApplicationState as = toApplicationState(newState);
 +                VersionedValue vv = toVersionedValue(newState);
 +                currentState.addApplicationState(as, vv);
 +                StorageService.instance.onChange(endpoint, as, vv);
 +
 +                // remove from gossip
 +                Gossiper.runInGossipStageBlocking(() -> 
Gossiper.instance.unsafeAnulEndpoint(endpoint));
 +                SystemKeyspace.removeEndpoint(endpoint);
 +                PendingRangeCalculatorService.instance.update();
 +                PendingRangeCalculatorService.instance.blockUntilFinished();
 +            });
 +        };
 +    }
 +
 +    /**
 +     * Changes gossip state of the `peer` on `target`
 +     */
 +    public static void changeGossipState(IInvokableInstance target, IInstance 
peer, List<VersionedApplicationState> newState)
 +    {
 +        InetSocketAddress addr = peer.broadcastAddress();
 +        UUID hostId = peer.config().hostId();
 +        int netVersion = peer.getMessagingVersion();
 +        target.runOnInstance(() -> {
 +            InetAddressAndPort endpoint = toCassandraInetAddressAndPort(addr);
 +            StorageService storageService = StorageService.instance;
 +
 +            Gossiper.runInGossipStageBlocking(() -> {
 +                EndpointState state = 
Gossiper.instance.getEndpointStateForEndpoint(endpoint);
 +                if (state == null)
 +                {
 +                    Gossiper.instance.initializeNodeUnsafe(endpoint, hostId, 
netVersion, 1);
 +                    state = 
Gossiper.instance.getEndpointStateForEndpoint(endpoint);
 +                    if (state.isAlive() && 
!Gossiper.instance.isDeadState(state))
 +                        Gossiper.instance.realMarkAlive(endpoint, state);
 +                }
 +
 +                for (VersionedApplicationState value : newState)
 +                {
 +                    ApplicationState as = toApplicationState(value);
 +                    VersionedValue vv = toVersionedValue(value);
 +                    state.addApplicationState(as, vv);
 +                    storageService.onChange(endpoint, as, vv);
 +                }
 +            });
 +        });
 +    }
 +
 +    public static void withProperty(String prop, boolean value, Runnable r)
 +    {
++        withProperty(prop, Boolean.toString(value), r);
++    }
++
++    public static void withProperty(String prop, String value, Runnable r)
++    {
 +        String before = System.getProperty(prop);
 +        try
 +        {
-             System.setProperty(prop, Boolean.toString(value));
++            System.setProperty(prop, value);
 +            r.run();
 +        }
 +        finally
 +        {
 +            if (before == null)
 +                System.clearProperty(prop);
 +            else
 +                System.setProperty(prop, before);
 +        }
 +    }
 +}
diff --cc 
test/distributed/org/apache/cassandra/distributed/test/ring/BootstrapTest.java
index e0c5a78,0000000..15a8d00
mode 100644,000000..100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/ring/BootstrapTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/ring/BootstrapTest.java
@@@ -1,130 -1,0 +1,132 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.distributed.test.ring;
 +
 +import java.util.Map;
 +import java.util.stream.Collectors;
 +import java.util.stream.IntStream;
 +
 +import org.junit.Assert;
 +import org.junit.Test;
 +
 +import org.apache.cassandra.distributed.Cluster;
 +import org.apache.cassandra.distributed.api.ConsistencyLevel;
 +import org.apache.cassandra.distributed.api.ICluster;
 +import org.apache.cassandra.distributed.api.IInstanceConfig;
 +import org.apache.cassandra.distributed.api.IInvokableInstance;
 +import org.apache.cassandra.distributed.api.TokenSupplier;
 +import org.apache.cassandra.distributed.shared.NetworkTopology;
 +import org.apache.cassandra.distributed.test.TestBaseImpl;
 +
 +import static java.util.Arrays.asList;
++import static 
org.apache.cassandra.config.CassandraRelevantProperties.BOOTSTRAP_SCHEMA_DELAY_MS;
 +import static org.apache.cassandra.distributed.action.GossipHelper.bootstrap;
 +import static 
org.apache.cassandra.distributed.action.GossipHelper.pullSchemaFrom;
 +import static 
org.apache.cassandra.distributed.action.GossipHelper.statusToBootstrap;
 +import static 
org.apache.cassandra.distributed.action.GossipHelper.withProperty;
 +import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
 +import static org.apache.cassandra.distributed.api.Feature.NETWORK;
 +
 +public class BootstrapTest extends TestBaseImpl
 +{
 +    @Test
 +    public void bootstrapTest() throws Throwable
 +    {
 +        int originalNodeCount = 2;
 +        int expandedNodeCount = originalNodeCount + 1;
 +
 +        try (Cluster cluster = builder().withNodes(originalNodeCount)
 +                                        
.withTokenSupplier(TokenSupplier.evenlyDistributedTokens(expandedNodeCount))
 +                                        
.withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(expandedNodeCount, 
"dc0", "rack0"))
 +                                        .withConfig(config -> 
config.with(NETWORK, GOSSIP))
 +                                        .start())
 +        {
 +            populate(cluster,0, 100);
 +
 +            IInstanceConfig config = cluster.newInstanceConfig();
 +            IInvokableInstance newInstance = cluster.bootstrap(config);
 +            withProperty("cassandra.join_ring", false,
 +                         () -> newInstance.startup(cluster));
 +
 +            cluster.forEach(statusToBootstrap(newInstance));
 +
 +            cluster.run(asList(pullSchemaFrom(cluster.get(1)),
 +                               bootstrap()),
 +                        newInstance.config().num());
 +
 +            for (Map.Entry<Integer, Long> e : count(cluster).entrySet())
 +                Assert.assertEquals("Node " + e.getKey() + " has incorrect 
row state",
 +                                    100L,
 +                                    e.getValue().longValue());
 +        }
 +    }
 +
 +    @Test
 +    public void autoBootstrapTest() throws Throwable
 +    {
 +        int originalNodeCount = 2;
 +        int expandedNodeCount = originalNodeCount + 1;
 +
 +        try (Cluster cluster = builder().withNodes(originalNodeCount)
 +                                        
.withTokenSupplier(TokenSupplier.evenlyDistributedTokens(expandedNodeCount))
 +                                        
.withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(expandedNodeCount, 
"dc0", "rack0"))
 +                                        .withConfig(config -> 
config.with(NETWORK, GOSSIP))
 +                                        .start())
 +        {
 +            populate(cluster,0, 100);
 +
 +            IInstanceConfig config = cluster.newInstanceConfig();
 +            config.set("auto_bootstrap", true);
 +            IInvokableInstance newInstance = cluster.bootstrap(config);
-             withProperty("cassandra.join_ring", false,
-                          () -> newInstance.startup(cluster));
++            withProperty(BOOTSTRAP_SCHEMA_DELAY_MS.getKey(), 
Integer.toString(90 * 1000),
++                         () -> withProperty("cassandra.join_ring", false,
++                                            () -> 
newInstance.startup(cluster)));
 +
 +            newInstance.nodetoolResult("join").asserts().success();
 +
 +            for (Map.Entry<Integer, Long> e : count(cluster).entrySet())
 +                Assert.assertEquals("Node " + e.getKey() + " has incorrect 
row state", e.getValue().longValue(), 100L);
 +        }
 +    }
 +
 +    public static void populate(ICluster cluster, int from, int to)
 +    {
 +        populate(cluster, from, to, 1, 3, ConsistencyLevel.QUORUM);
 +    }
 +
 +    public static void populate(ICluster cluster, int from, int to, int 
coord, int rf, ConsistencyLevel cl)
 +    {
 +        cluster.schemaChange("CREATE KEYSPACE IF NOT EXISTS " + KEYSPACE + " 
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + rf + 
"};");
 +        cluster.schemaChange("CREATE TABLE IF NOT EXISTS " + KEYSPACE + ".tbl 
(pk int, ck int, v int, PRIMARY KEY (pk, ck))");
 +        for (int i = from; i < to; i++)
 +        {
 +            cluster.coordinator(coord).execute("INSERT INTO " + KEYSPACE + 
".tbl (pk, ck, v) VALUES (?, ?, ?)",
 +                                               cl,
 +                                               i, i, i);
 +        }
 +    }
 +
 +    public static Map<Integer, Long> count(ICluster cluster)
 +    {
 +        return IntStream.rangeClosed(1, cluster.size())
 +                        .boxed()
 +                        .collect(Collectors.toMap(nodeId -> nodeId,
 +                                                  nodeId -> (Long) 
cluster.get(nodeId).executeInternal("SELECT count(*) FROM " + KEYSPACE + 
".tbl")[0][0]));
 +    }
 +}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to