This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 9179796dfadbbf5a60cc7490a9bc6b260b9993b0
Merge: 4fb255d efd6b60
Author: David Capwell <dcapw...@apache.org>
AuthorDate: Thu Oct 22 13:24:57 2020 -0700

    Merge branch 'cassandra-3.11' into trunk

 .../cassandra/config/ParameterizedClass.java       |  5 ++
 .../cassandra/config/YamlConfigurationLoader.java  | 21 ++++++
 .../distributed/impl/AbstractCluster.java          | 44 +++++++++--
 .../cassandra/distributed/impl/Instance.java       | 17 ++---
 .../cassandra/distributed/impl/InstanceConfig.java | 49 +++---------
 .../cassandra/distributed/test/JVMDTestTest.java   | 88 ++++++++++++++++++++++
 .../config/YamlConfigurationLoaderTest.java        | 52 +++++++++++++
 7 files changed, 220 insertions(+), 56 deletions(-)

diff --cc 
test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
index 44c9744,d053075..36bcb44
--- 
a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
@@@ -354,13 -323,18 +354,13 @@@ public abstract class AbstractCluster<
  
      private InstanceConfig createInstanceConfig(int nodeNum)
      {
 -        String ipPrefix = "127.0." + subnet + ".";
 -        String seedIp = ipPrefix + "1";
 -        String ipAddress = ipPrefix + nodeNum;
 +        INodeProvisionStrategy provisionStrategy = 
nodeProvisionStrategy.create(subnet);
          long token = tokenSupplier.token(nodeNum);
 -
 -        NetworkTopology topology = NetworkTopology.build(ipPrefix, 
broadcastPort, nodeIdTopology);
 -
 -        InstanceConfig config = InstanceConfig.generate(nodeNum, ipAddress, 
topology, root, String.valueOf(token), seedIp, datadirCount);
 +        NetworkTopology topology = buildNetworkTopology(provisionStrategy, 
nodeIdTopology);
 +        InstanceConfig config = InstanceConfig.generate(nodeNum, 
provisionStrategy, topology, root, Long.toString(token), datadirCount);
-         config.set("dtest.api.cluster_id", clusterId);
+         config.set("dtest.api.cluster_id", clusterId.toString());
          if (configUpdater != null)
              configUpdater.accept(config);
 -
          return config;
      }
  
diff --cc test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index 04646c1,6ea2c9c..4c778f1
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@@ -42,20 -44,19 +42,21 @@@ import javax.management.ListenerNotFoun
  import javax.management.Notification;
  import javax.management.NotificationListener;
  
 +import com.google.common.annotations.VisibleForTesting;
 +
 +import io.netty.util.concurrent.GlobalEventExecutor;
  import org.apache.cassandra.batchlog.BatchlogManager;
 +import org.apache.cassandra.concurrent.ExecutorLocals;
  import org.apache.cassandra.concurrent.ScheduledExecutors;
  import org.apache.cassandra.concurrent.SharedExecutorPool;
 -import org.apache.cassandra.concurrent.StageManager;
 +import org.apache.cassandra.concurrent.Stage;
  import org.apache.cassandra.config.Config;
  import org.apache.cassandra.config.DatabaseDescriptor;
 -import org.apache.cassandra.config.Schema;
 -import org.apache.cassandra.config.SchemaConstants;
+ import org.apache.cassandra.config.YamlConfigurationLoader;
  import org.apache.cassandra.cql3.CQLStatement;
 +import org.apache.cassandra.cql3.QueryHandler;
  import org.apache.cassandra.cql3.QueryOptions;
  import org.apache.cassandra.cql3.QueryProcessor;
 -import org.apache.cassandra.cql3.statements.ParsedStatement;
  import org.apache.cassandra.db.ColumnFamilyStore;
  import org.apache.cassandra.db.Keyspace;
  import org.apache.cassandra.db.Memtable;
@@@ -132,15 -124,7 +133,8 @@@ import static org.apache.cassandra.dist
  
  public class Instance extends IsolatedExecutor implements IInvokableInstance
  {
-     private static final Map<Class<?>, Function<Object, Object>> mapper = new 
HashMap<Class<?>, Function<Object, Object>>() {{
-         this.put(IInstanceConfig.ParameterizedClass.class, (obj) -> {
-             IInstanceConfig.ParameterizedClass pc = 
(IInstanceConfig.ParameterizedClass) obj;
-             return new 
org.apache.cassandra.config.ParameterizedClass(pc.class_name, pc.parameters);
-         });
-     }};
- 
      public final IInstanceConfig config;
 +    private final long startedAt = System.nanoTime();
  
      // should never be invoked directly, so that it is instantiated on other 
class loader;
      // only visible for inheritance
@@@ -254,70 -235,171 +248,74 @@@
          }).run();
      }
  
 -    private void registerMockMessaging(ICluster<IInstance> cluster)
 +    private void registerMockMessaging(ICluster cluster)
      {
 -        BiConsumer<InetSocketAddress, IMessage> deliverToInstance = (to, 
message) -> cluster.get(to).receiveMessage(message);
 -        BiConsumer<InetSocketAddress, IMessage> 
deliverToInstanceIfNotFiltered = (to, message) -> {
 -            int fromNum = config().num();
 -            int toNum = cluster.get(to).config().num();
 -
 -            if (cluster.filters().permitOutbound(fromNum, toNum, message)
 -                && cluster.filters().permitInbound(fromNum, toNum, message))
 -                deliverToInstance.accept(to, message);
 -        };
 -
 -        Map<InetAddress, InetSocketAddress> addressAndPortMap = new 
HashMap<>();
 -        cluster.stream().forEach(instance -> {
 -            InetSocketAddress addressAndPort = instance.broadcastAddress();
 -            if (!addressAndPort.equals(instance.config().broadcastAddress()))
 -                throw new IllegalStateException("addressAndPort mismatch: " + 
addressAndPort + " vs " + instance.config().broadcastAddress());
 -            InetSocketAddress prev = 
addressAndPortMap.put(addressAndPort.getAddress(),
 -                                                                        
addressAndPort);
 -            if (null != prev)
 -                throw new IllegalStateException("This version of Cassandra 
does not support multiple nodes with the same InetAddress: " + addressAndPort + 
" vs " + prev);
 +        MessagingService.instance().outboundSink.add((message, to) -> {
 +            InetSocketAddress toAddr = fromCassandraInetAddressAndPort(to);
 +            
cluster.get(toAddr).receiveMessage(serializeMessage(message.from(), to, 
message));
 +            return false;
          });
 -
 -        MessagingService.instance().addMessageSink(new 
MessageDeliverySink(deliverToInstanceIfNotFiltered, addressAndPortMap::get));
      }
  
 -    // unnecessary if registerMockMessaging used
 -    private void registerFilters(ICluster cluster)
 +    private void registerInboundFilter(ICluster cluster)
      {
 -        IInstance instance = this;
 -        MessagingService.instance().addMessageSink(new IMessageSink()
 -        {
 -            public boolean allowOutgoingMessage(MessageOut message, int id, 
InetAddress toAddress)
 -            {
 -                if (isShutdown())
 -                    return false;
 -
 -                // Port is not passed in, so take a best guess at the 
destination port from this instance
 -                IInstance to = 
cluster.get(NetworkTopology.addressAndPort(toAddress,
 -                                                                          
instance.config().broadcastAddress().getPort()));
 -                int fromNum = config().num();
 -                int toNum = to.config().num();
 -                return cluster.filters().permitOutbound(fromNum, toNum, 
serializeMessage(message, id,
 -                                                                              
   broadcastAddress(),
 -                                                                              
   to.config().broadcastAddress()));
 -            }
 -
 -            public boolean allowIncomingMessage(MessageIn message, int id)
 -            {
 -                if (isShutdown())
 -                    return false;
 -
 -                // Port is not passed in, so take a best guess at the 
destination port from this instance
 -                IInstance from = 
cluster.get(NetworkTopology.addressAndPort(message.from,
 -                                                                            
instance.config().broadcastAddress().getPort()));
 -                int fromNum = from.config().num();
 -                int toNum = config().num();
 -
 -                IMessage msg = serializeMessage(message, id, 
from.config().broadcastAddress(), broadcastAddress());
 -
 -                return cluster.filters().permitInbound(fromNum, toNum, msg);
 -            }
 +        MessagingService.instance().inboundSink.add(message -> {
++            if (isShutdown())
++                return false;
 +            IMessage serialized = serializeMessage(message.from(), 
toCassandraInetAddressAndPort(broadcastAddress()), message);
 +            int fromNum = cluster.get(serialized.from()).config().num();
 +            int toNum = config.num(); // since this instance is reciving the 
message, to will always be this instance
 +            return cluster.filters().permitInbound(fromNum, toNum, 
serialized);
          });
      }
  
 -    public static IMessage serializeMessage(MessageOut messageOut, int id, 
InetSocketAddress from, InetSocketAddress to)
 +    private void registerOutboundFilter(ICluster cluster)
      {
 -        try (DataOutputBuffer out = new DataOutputBuffer(1024))
 -        {
 -            int version = 
MessagingService.instance().getVersion(to.getAddress());
 -
 -            out.writeInt(MessagingService.PROTOCOL_MAGIC);
 -            out.writeInt(id);
 -            long timestamp = System.currentTimeMillis();
 -            out.writeInt((int) timestamp);
 -            messageOut.serialize(out, version);
 -            byte[] bytes = out.toByteArray();
 -            if (messageOut.serializedSize(version) + 12 != bytes.length)
 -                throw new AssertionError(String.format("Message 
serializedSize(%s) does not match what was written with serialize(out, %s) for 
verb %s and serializer %s; " +
 -                                                       "expected %s, actual 
%s", version, version, messageOut.verb, messageOut.serializer.getClass(),
 -                                                       
messageOut.serializedSize(version) + 12, bytes.length));
 -            return new MessageImpl(messageOut.verb.ordinal(), bytes, id, 
version, from);
 -        }
 -        catch (IOException e)
 -        {
 -            throw new RuntimeException(e);
 -        }
 +        MessagingService.instance().outboundSink.add((message, to) -> {
++            if (isShutdown())
++                return false;
 +            IMessage serialzied = serializeMessage(message.from(), to, 
message);
 +            int fromNum = config.num(); // since this instance is sending the 
message, from will always be this instance
 +            int toNum = 
cluster.get(fromCassandraInetAddressAndPort(to)).config().num();
 +            return cluster.filters().permitOutbound(fromNum, toNum, 
serialzied);
 +        });
      }
  
 -    public static IMessage serializeMessage(MessageIn messageIn, int id, 
InetSocketAddress from, InetSocketAddress to)
 +    public void uncaughtException(Thread thread, Throwable throwable)
      {
 -        try (DataOutputBuffer out = new DataOutputBuffer(1024))
 -        {
 -            int version = 
MessagingService.instance().getVersion(to.getAddress());
 -
 -            out.writeInt(MessagingService.PROTOCOL_MAGIC);
 -            out.writeInt(id);
 -            long timestamp = System.currentTimeMillis();
 -            out.writeInt((int) timestamp);
 -
 -            MessageOut.serialize(out,
 -                                 from.getAddress(),
 -                                 messageIn.verb,
 -                                 messageIn.parameters,
 -                                 messageIn.payload,
 -                                 version);
 -
 -            return new MessageImpl(messageIn.verb.ordinal(), 
out.toByteArray(), id, version, from);
 -        }
 -        catch (IOException e)
 -        {
 -            throw new RuntimeException(e);
 -        }
 +        sync(CassandraDaemon::uncaughtException).accept(thread, throwable);
      }
  
 -    private class MessageDeliverySink implements IMessageSink
 +    private static IMessage serializeMessage(InetAddressAndPort from, 
InetAddressAndPort to, Message<?> messageOut)
      {
 -        private final BiConsumer<InetSocketAddress, IMessage> deliver;
 -        private final Function<InetAddress, InetSocketAddress> 
lookupAddressAndPort;
 +        int fromVersion = MessagingService.instance().versions.get(from);
 +        int toVersion = MessagingService.instance().versions.get(to);
  
 -        MessageDeliverySink(BiConsumer<InetSocketAddress, IMessage> deliver,
 -                            Function<InetAddress, InetSocketAddress> 
lookupAddressAndPort)
 +        // If we're re-serializing a pre-4.0 message for filtering purposes, 
take into account possible empty payload
 +        // See CASSANDRA-16157 for details.
 +        if (fromVersion < MessagingService.current_version &&
 +            ((messageOut.verb().serializer() == 
((IVersionedAsymmetricSerializer) NoPayload.serializer) || messageOut.payload 
== null)))
          {
 -            this.deliver = deliver;
 -            this.lookupAddressAndPort = lookupAddressAndPort;
 +            return new MessageImpl(messageOut.verb().id,
 +                                   ByteArrayUtil.EMPTY_BYTE_ARRAY,
 +                                   messageOut.id(),
 +                                   toVersion,
 +                                   fromCassandraInetAddressAndPort(from));
          }
  
 -        public boolean allowOutgoingMessage(MessageOut messageOut, int id, 
InetAddress to)
 +        try (DataOutputBuffer out = new DataOutputBuffer(1024))
          {
 -            InetSocketAddress from = broadcastAddress();
 -            assert from.equals(lookupAddressAndPort.apply(messageOut.from));
 -
 -            // Tracing logic - similar to 
org.apache.cassandra.net.OutboundTcpConnection.writeConnected
 -            byte[] sessionBytes = (byte[]) 
messageOut.parameters.get(Tracing.TRACE_HEADER);
 -            if (sessionBytes != null)
 -            {
 -                UUID sessionId = 
UUIDGen.getUUID(ByteBuffer.wrap(sessionBytes));
 -                TraceState state = Tracing.instance.get(sessionId);
 -                String message = String.format("Sending %s message to %s", 
messageOut.verb, to);
 -                // session may have already finished; see CASSANDRA-5668
 -                if (state == null)
 -                {
 -                    byte[] traceTypeBytes = (byte[]) 
messageOut.parameters.get(Tracing.TRACE_TYPE);
 -                    Tracing.TraceType traceType = traceTypeBytes == null ? 
Tracing.TraceType.QUERY : Tracing.TraceType.deserialize(traceTypeBytes[0]);
 -                    Tracing.instance.trace(ByteBuffer.wrap(sessionBytes), 
message, traceType.getTTL());
 -                }
 -                else
 -                {
 -                    state.trace(message);
 -                    if (messageOut.verb == 
MessagingService.Verb.REQUEST_RESPONSE)
 -                        Tracing.instance.doneWithNonLocalSession(state);
 -                }
 -            }
 -
 -            InetSocketAddress toFull = lookupAddressAndPort.apply(to);
 -            deliver.accept(toFull,
 -                           serializeMessage(messageOut, id, 
broadcastAddress(), toFull));
 -
 -            return false;
 +            Message.serializer.serialize(messageOut, out, toVersion);
 +            byte[] bytes = out.toByteArray();
 +            if (messageOut.serializedSize(toVersion) != bytes.length)
 +                throw new AssertionError(String.format("Message 
serializedSize(%s) does not match what was written with serialize(out, %s) for 
verb %s and serializer %s; " +
 +                                                       "expected %s, actual 
%s", toVersion, toVersion, messageOut.verb(), messageOut.serializer.getClass(),
 +                                                       
messageOut.serializedSize(toVersion), bytes.length));
 +            return new MessageImpl(messageOut.verb().id, bytes, 
messageOut.id(), toVersion, fromCassandraInetAddressAndPort(from));
          }
 -
 -        public boolean allowIncomingMessage(MessageIn message, int id)
 +        catch (IOException e)
          {
 -            // we can filter to our heart's content on the outgoing message; 
no need to worry about incoming
 -            return true;
 +            throw new RuntimeException(e);
          }
      }
  
@@@ -528,13 -592,67 +526,12 @@@
              new File(dir).mkdirs();
      }
  
 -    private static Config loadConfig(IInstanceConfig overrides)
 +    private Config loadConfig(IInstanceConfig overrides)
      {
-         Config config = new Config();
-         overrides.propagate(config, mapper);
-         return config;
+         Map<String,Object> params = ((InstanceConfig) overrides).getParams();
+         return YamlConfigurationLoader.fromMap(params, Config.class);
      }
  
 -    private void initializeRing(ICluster cluster)
 -    {
 -        // This should be done outside instance in order to avoid serializing 
config
 -        String partitionerName = config.getString("partitioner");
 -        List<String> initialTokens = new ArrayList<>();
 -        List<InetSocketAddress> hosts = new ArrayList<>();
 -        List<UUID> hostIds = new ArrayList<>();
 -        for (int i = 1 ; i <= cluster.size() ; ++i)
 -        {
 -            IInstanceConfig config = cluster.get(i).config();
 -            initialTokens.add(config.getString("initial_token"));
 -            hosts.add(config.broadcastAddress());
 -            hostIds.add(config.hostId());
 -        }
 -
 -        try
 -        {
 -            IPartitioner partitioner = 
FBUtilities.newPartitioner(partitionerName);
 -            StorageService storageService = StorageService.instance;
 -            List<Token> tokens = new ArrayList<>();
 -            for (String token : initialTokens)
 -                tokens.add(partitioner.getTokenFactory().fromString(token));
 -
 -            for (int i = 0; i < tokens.size(); i++)
 -            {
 -                InetSocketAddress ep = hosts.get(i);
 -                UUID hostId = hostIds.get(i);
 -                Token token = tokens.get(i);
 -                Gossiper.runInGossipStageBlocking(() -> {
 -                    Gossiper.instance.initializeNodeUnsafe(ep.getAddress(), 
hostId, 1);
 -                    Gossiper.instance.injectApplicationState(ep.getAddress(),
 -                                                             
ApplicationState.TOKENS,
 -                                                             new 
VersionedValue.VersionedValueFactory(partitioner).tokens(Collections.singleton(token)));
 -                    storageService.onChange(ep.getAddress(),
 -                                            ApplicationState.STATUS,
 -                                            new 
VersionedValue.VersionedValueFactory(partitioner).normal(Collections.singleton(token)));
 -                    Gossiper.instance.realMarkAlive(ep.getAddress(), 
Gossiper.instance.getEndpointStateForEndpoint(ep.getAddress()));
 -                });
 -
 -                int messagingVersion = cluster.get(ep).isShutdown()
 -                                       ? MessagingService.current_version
 -                                       : 
Math.min(MessagingService.current_version, 
cluster.get(ep).getMessagingVersion());
 -                MessagingService.instance().setVersion(ep.getAddress(), 
messagingVersion);
 -            }
 -
 -            // check that all nodes are in token metadata
 -            for (int i = 0; i < tokens.size(); ++i)
 -                assert 
storageService.getTokenMetadata().isMember(hosts.get(i).getAddress());
 -        }
 -        catch (Throwable e) // UnknownHostException
 -        {
 -            throw new RuntimeException(e);
 -        }
 -    }
 -
      public Future<Void> shutdown()
      {
          return shutdown(true);
diff --cc 
test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
index 2bb486f,af1d962..e243451
--- a/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
@@@ -267,12 -225,12 +231,17 @@@ public class InstanceConfig implements 
          return (String)params.get(name);
      }
  
+     public Map<String, Object> getParams()
+     {
+         return params;
+     }
+ 
 -    public static InstanceConfig generate(int nodeNum, String ipAddress, 
NetworkTopology networkTopology, File root, String token, String seedIp, int 
datadirCount)
 +    public static InstanceConfig generate(int nodeNum,
 +                                          INodeProvisionStrategy 
provisionStrategy,
 +                                          NetworkTopology networkTopology,
 +                                          File root,
 +                                          String token,
 +                                          int datadirCount)
      {
          return new InstanceConfig(nodeNum,
                                    networkTopology,
diff --cc 
test/distributed/org/apache/cassandra/distributed/test/JVMDTestTest.java
index 8094190,34b04f7..ebf00d7
--- a/test/distributed/org/apache/cassandra/distributed/test/JVMDTestTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/JVMDTestTest.java
@@@ -19,21 -19,32 +19,32 @@@
  package org.apache.cassandra.distributed.test;
  
  import java.io.IOException;
+ import java.util.Collections;
  import java.util.List;
+ import java.util.Map;
+ import java.util.concurrent.TimeUnit;
  import java.util.concurrent.TimeoutException;
  
+ import com.google.common.collect.ImmutableMap;
  import org.junit.Test;
  
 -import org.slf4j.LoggerFactory;
 -
++import org.apache.cassandra.concurrent.Stage;
+ import org.apache.cassandra.config.Config;
+ import org.apache.cassandra.config.DatabaseDescriptor;
  import org.apache.cassandra.distributed.Cluster;
  import org.apache.cassandra.distributed.api.ConsistencyLevel;
  import org.apache.cassandra.distributed.api.Feature;
  import org.apache.cassandra.distributed.api.LogAction;
  import org.apache.cassandra.service.CassandraDaemon;
  import org.apache.cassandra.utils.FBUtilities;
 +import org.assertj.core.api.Assertions;
  
 -import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
+ import static org.apache.cassandra.distributed.shared.AssertUtils.row;
++import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
  import static org.junit.Assert.assertEquals;
 +import static org.junit.Assert.assertTrue;
+ import static org.junit.Assert.assertFalse;
+ import static org.junit.Assert.assertNotNull;
 -import static org.junit.Assert.assertTrue;
  
  public class JVMDTestTest extends TestBaseImpl
  {
@@@ -69,17 -80,86 +80,94 @@@
              long mark = logs.mark(); // get the current position so watching 
doesn't see any previous exceptions
              cluster.get(2).runOnInstance(() -> {
                  // pretend that an uncaught exception was thrown
 -                LoggerFactory.getLogger(CassandraDaemon.class).error("Error", 
new RuntimeException("fail without fail"));
 +                CassandraDaemon.uncaughtException(Thread.currentThread(), new 
RuntimeException("fail without fail"));
              });
              List<String> errors = logs.watchFor(mark, "^ERROR").getResult();
 -            assertFalse(errors.isEmpty());
 +            Assertions.assertThat(errors)
 +                      // can't check for "fail without fail" since thats on 
the next line, and watchFor doesn't
 +                      // stitch lines together like grepForError does
 +                      .allMatch(s -> s.contains("ERROR"))
 +                      .allMatch(s -> s.contains("isolatedExecutor"))
 +                      .allMatch(s -> s.contains("Exception in thread"))
 +                      .as("Unable to find 'ERROR', 'isolatedExecutor', and 
'Exception in thread'")
 +                      .isNotEmpty();
          }
      }
+ 
+     @Test
+     public void nonSharedConfigClassTest() throws IOException
+     {
+         Map<String,Object> commitLogCompression = 
ImmutableMap.of("class_name", "org.apache.cassandra.io.compress.LZ4Compressor",
+                                                                   
"parameters", Collections.<String,Object>emptyMap());
+         Map<String,Object> encryptionOptions = 
ImmutableMap.of("cipher_suites", Collections.singletonList("FakeCipher"),
+                                                                "optional", 
false,
+                                                                "enabled", 
false);
+ 
+         try (Cluster cluster = Cluster.build(1)
+                                       .withConfig(c -> {
+                                           c.set("concurrent_reads", 321);
+                                           c.set("internode_compression", 
Config.InternodeCompression.dc);
+                                           c.set("client_encryption_options", 
encryptionOptions);
+                                           c.set("commitlog_compression", 
commitLogCompression);
+                                       }).start())
+         {
+             cluster.get(1).runOnInstance(() -> {
 -                assertEquals(321, DatabaseDescriptor.getConcurrentReaders());
++                assertEquals(321, Stage.READ.getMaximumPoolSize());
+                 assertEquals(Config.InternodeCompression.dc, 
DatabaseDescriptor.internodeCompression());
++                assertEquals(Collections.singletonList("FakeCipher"), 
DatabaseDescriptor.getNativeProtocolEncryptionOptions().cipher_suites);
+                 
assertEquals("org.apache.cassandra.io.compress.LZ4Compressor", 
DatabaseDescriptor.getCommitLogCompression().class_name);
+                 
assertTrue(DatabaseDescriptor.getCommitLogCompression().parameters.isEmpty());
+             });
+         }
+     }
+ 
+     @Test
+     public void modifySchemaWithStoppedNode() throws Throwable
+     {
+         try (Cluster cluster = init(Cluster.build().withNodes(2).withConfig(c 
-> c.with(Feature.GOSSIP).with(Feature.NETWORK)).start()))
+         {
+             assertFalse(cluster.get(1).isShutdown());
+             assertFalse(cluster.get(2).isShutdown());
+             cluster.schemaChangeIgnoringStoppedInstances("CREATE TABLE 
"+KEYSPACE+".tbl1 (id int primary key, i int)");
+ 
+             cluster.get(2).shutdown(true).get(1, TimeUnit.MINUTES);
+             assertFalse(cluster.get(1).isShutdown());
+             assertTrue(cluster.get(2).isShutdown());
+             cluster.schemaChangeIgnoringStoppedInstances("CREATE TABLE 
"+KEYSPACE+".tbl2 (id int primary key, i int)");
+ 
+             cluster.get(1).shutdown(true).get(1, TimeUnit.MINUTES);
+             assertTrue(cluster.get(1).isShutdown());
+             assertTrue(cluster.get(2).isShutdown());
+ 
+             // both nodes down, nothing to record a schema change so should 
get an exception
+             Throwable thrown = null;
+             try
+             {
+                 cluster.schemaChangeIgnoringStoppedInstances("CREATE TABLE " 
+ KEYSPACE + ".tblX (id int primary key, i int)");
+             }
+             catch (Throwable tr)
+             {
+                 thrown = tr;
+             }
+             assertNotNull("Expected to fail with all nodes down", thrown);
+ 
+             // Have to restart instance1 before instance2 as it is hard-coded 
as the seed in in-JVM configuration.
+             cluster.get(1).startup();
+             cluster.get(2).startup();
+             assertFalse(cluster.get(1).isShutdown());
+             assertFalse(cluster.get(2).isShutdown());
+             cluster.schemaChangeIgnoringStoppedInstances("CREATE TABLE 
"+KEYSPACE+".tbl3 (id int primary key, i int)");
+ 
+             assertRows(cluster.get(1).executeInternal("SELECT table_name FROM 
system_schema.tables WHERE keyspace_name = ?", KEYSPACE),
+                        row("tbl1"), row("tbl2"), row("tbl3"));
+             assertRows(cluster.get(2).executeInternal("SELECT table_name FROM 
system_schema.tables WHERE keyspace_name = ?", KEYSPACE),
+                        row("tbl1"), row("tbl2"), row("tbl3"));
+ 
+             // Finally test schema can be changed with the first node down
+             cluster.get(1).shutdown(true).get(1, TimeUnit.MINUTES);
+             cluster.schemaChangeIgnoringStoppedInstances("CREATE TABLE 
"+KEYSPACE+".tbl4 (id int primary key, i int)");
+             assertRows(cluster.get(2).executeInternal("SELECT table_name FROM 
system_schema.tables WHERE keyspace_name = ?", KEYSPACE),
+                        row("tbl1"), row("tbl2"), row("tbl3"), row("tbl4"));
+         }
+     }
  }
diff --cc test/unit/org/apache/cassandra/config/YamlConfigurationLoaderTest.java
index 0000000,4811b4d..f705217
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/config/YamlConfigurationLoaderTest.java
+++ b/test/unit/org/apache/cassandra/config/YamlConfigurationLoaderTest.java
@@@ -1,0 -1,54 +1,52 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.config;
+ 
+ import java.util.Collections;
+ import java.util.Map;
+ 
+ import com.google.common.collect.ImmutableMap;
+ import org.junit.Test;
+ 
+ import static org.junit.Assert.assertEquals;
 -import static org.junit.Assert.assertArrayEquals;
+ 
+ 
+ public class YamlConfigurationLoaderTest
+ {
+     @Test
+     public void fromMapTest()
+     {
+         int storagePort = 123;
+         Config.CommitLogSync commitLogSync = Config.CommitLogSync.batch;
+         ParameterizedClass seedProvider = new 
ParameterizedClass("org.apache.cassandra.locator.SimpleSeedProvider", 
Collections.emptyMap());
 -        EncryptionOptions encryptionOptions = new 
EncryptionOptions.ClientEncryptionOptions();
 -        encryptionOptions.keystore = "myNewKeystore";
 -        encryptionOptions.cipher_suites = new String[] {"SomeCipher"};
 -
++        EncryptionOptions encryptionOptions = new EncryptionOptions()
++                                              .withKeyStore("myNewKeystore")
++                                              .withCipherSuites("SomeCipher")
++                                              .withOptional(false);
+         Map<String,Object> map = ImmutableMap.of("storage_port", storagePort,
+                                                  "commitlog_sync", 
commitLogSync,
+                                                  "seed_provider", 
seedProvider,
+                                                  "client_encryption_options", 
encryptionOptions);
+         Config config = YamlConfigurationLoader.fromMap(map, Config.class);
+         assertEquals(storagePort, config.storage_port); // Check a simple 
integer
+         assertEquals(commitLogSync, config.commitlog_sync); // Check an enum
+         assertEquals(seedProvider, config.seed_provider); // Check a 
parameterized class
 -        assertEquals(encryptionOptions.keystore, 
config.client_encryption_options.keystore); // Check a nested object
 -        assertArrayEquals(encryptionOptions.cipher_suites, 
config.client_encryption_options.cipher_suites);
++        assertEquals(encryptionOptions, config.client_encryption_options); // 
Check a nested object
+     }
+ }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to