This is an automated email from the ASF dual-hosted git repository. dcapwell pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 9179796dfadbbf5a60cc7490a9bc6b260b9993b0 Merge: 4fb255d efd6b60 Author: David Capwell <dcapw...@apache.org> AuthorDate: Thu Oct 22 13:24:57 2020 -0700 Merge branch 'cassandra-3.11' into trunk .../cassandra/config/ParameterizedClass.java | 5 ++ .../cassandra/config/YamlConfigurationLoader.java | 21 ++++++ .../distributed/impl/AbstractCluster.java | 44 +++++++++-- .../cassandra/distributed/impl/Instance.java | 17 ++--- .../cassandra/distributed/impl/InstanceConfig.java | 49 +++--------- .../cassandra/distributed/test/JVMDTestTest.java | 88 ++++++++++++++++++++++ .../config/YamlConfigurationLoaderTest.java | 52 +++++++++++++ 7 files changed, 220 insertions(+), 56 deletions(-) diff --cc test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java index 44c9744,d053075..36bcb44 --- a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java @@@ -354,13 -323,18 +354,13 @@@ public abstract class AbstractCluster< private InstanceConfig createInstanceConfig(int nodeNum) { - String ipPrefix = "127.0." + subnet + "."; - String seedIp = ipPrefix + "1"; - String ipAddress = ipPrefix + nodeNum; + INodeProvisionStrategy provisionStrategy = nodeProvisionStrategy.create(subnet); long token = tokenSupplier.token(nodeNum); - - NetworkTopology topology = NetworkTopology.build(ipPrefix, broadcastPort, nodeIdTopology); - - InstanceConfig config = InstanceConfig.generate(nodeNum, ipAddress, topology, root, String.valueOf(token), seedIp, datadirCount); + NetworkTopology topology = buildNetworkTopology(provisionStrategy, nodeIdTopology); + InstanceConfig config = InstanceConfig.generate(nodeNum, provisionStrategy, topology, root, Long.toString(token), datadirCount); - config.set("dtest.api.cluster_id", clusterId); + config.set("dtest.api.cluster_id", clusterId.toString()); if (configUpdater != null) configUpdater.accept(config); - return config; } diff --cc test/distributed/org/apache/cassandra/distributed/impl/Instance.java index 04646c1,6ea2c9c..4c778f1 --- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java @@@ -42,20 -44,19 +42,21 @@@ import javax.management.ListenerNotFoun import javax.management.Notification; import javax.management.NotificationListener; +import com.google.common.annotations.VisibleForTesting; + +import io.netty.util.concurrent.GlobalEventExecutor; import org.apache.cassandra.batchlog.BatchlogManager; +import org.apache.cassandra.concurrent.ExecutorLocals; import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.concurrent.SharedExecutorPool; -import org.apache.cassandra.concurrent.StageManager; +import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.config.Config; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.config.Schema; -import org.apache.cassandra.config.SchemaConstants; + import org.apache.cassandra.config.YamlConfigurationLoader; import org.apache.cassandra.cql3.CQLStatement; +import org.apache.cassandra.cql3.QueryHandler; import org.apache.cassandra.cql3.QueryOptions; import org.apache.cassandra.cql3.QueryProcessor; -import org.apache.cassandra.cql3.statements.ParsedStatement; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.Memtable; @@@ -132,15 -124,7 +133,8 @@@ import static org.apache.cassandra.dist public class Instance extends IsolatedExecutor implements IInvokableInstance { - private static final Map<Class<?>, Function<Object, Object>> mapper = new HashMap<Class<?>, Function<Object, Object>>() {{ - this.put(IInstanceConfig.ParameterizedClass.class, (obj) -> { - IInstanceConfig.ParameterizedClass pc = (IInstanceConfig.ParameterizedClass) obj; - return new org.apache.cassandra.config.ParameterizedClass(pc.class_name, pc.parameters); - }); - }}; - public final IInstanceConfig config; + private final long startedAt = System.nanoTime(); // should never be invoked directly, so that it is instantiated on other class loader; // only visible for inheritance @@@ -254,70 -235,171 +248,74 @@@ }).run(); } - private void registerMockMessaging(ICluster<IInstance> cluster) + private void registerMockMessaging(ICluster cluster) { - BiConsumer<InetSocketAddress, IMessage> deliverToInstance = (to, message) -> cluster.get(to).receiveMessage(message); - BiConsumer<InetSocketAddress, IMessage> deliverToInstanceIfNotFiltered = (to, message) -> { - int fromNum = config().num(); - int toNum = cluster.get(to).config().num(); - - if (cluster.filters().permitOutbound(fromNum, toNum, message) - && cluster.filters().permitInbound(fromNum, toNum, message)) - deliverToInstance.accept(to, message); - }; - - Map<InetAddress, InetSocketAddress> addressAndPortMap = new HashMap<>(); - cluster.stream().forEach(instance -> { - InetSocketAddress addressAndPort = instance.broadcastAddress(); - if (!addressAndPort.equals(instance.config().broadcastAddress())) - throw new IllegalStateException("addressAndPort mismatch: " + addressAndPort + " vs " + instance.config().broadcastAddress()); - InetSocketAddress prev = addressAndPortMap.put(addressAndPort.getAddress(), - addressAndPort); - if (null != prev) - throw new IllegalStateException("This version of Cassandra does not support multiple nodes with the same InetAddress: " + addressAndPort + " vs " + prev); + MessagingService.instance().outboundSink.add((message, to) -> { + InetSocketAddress toAddr = fromCassandraInetAddressAndPort(to); + cluster.get(toAddr).receiveMessage(serializeMessage(message.from(), to, message)); + return false; }); - - MessagingService.instance().addMessageSink(new MessageDeliverySink(deliverToInstanceIfNotFiltered, addressAndPortMap::get)); } - // unnecessary if registerMockMessaging used - private void registerFilters(ICluster cluster) + private void registerInboundFilter(ICluster cluster) { - IInstance instance = this; - MessagingService.instance().addMessageSink(new IMessageSink() - { - public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress toAddress) - { - if (isShutdown()) - return false; - - // Port is not passed in, so take a best guess at the destination port from this instance - IInstance to = cluster.get(NetworkTopology.addressAndPort(toAddress, - instance.config().broadcastAddress().getPort())); - int fromNum = config().num(); - int toNum = to.config().num(); - return cluster.filters().permitOutbound(fromNum, toNum, serializeMessage(message, id, - broadcastAddress(), - to.config().broadcastAddress())); - } - - public boolean allowIncomingMessage(MessageIn message, int id) - { - if (isShutdown()) - return false; - - // Port is not passed in, so take a best guess at the destination port from this instance - IInstance from = cluster.get(NetworkTopology.addressAndPort(message.from, - instance.config().broadcastAddress().getPort())); - int fromNum = from.config().num(); - int toNum = config().num(); - - IMessage msg = serializeMessage(message, id, from.config().broadcastAddress(), broadcastAddress()); - - return cluster.filters().permitInbound(fromNum, toNum, msg); - } + MessagingService.instance().inboundSink.add(message -> { ++ if (isShutdown()) ++ return false; + IMessage serialized = serializeMessage(message.from(), toCassandraInetAddressAndPort(broadcastAddress()), message); + int fromNum = cluster.get(serialized.from()).config().num(); + int toNum = config.num(); // since this instance is reciving the message, to will always be this instance + return cluster.filters().permitInbound(fromNum, toNum, serialized); }); } - public static IMessage serializeMessage(MessageOut messageOut, int id, InetSocketAddress from, InetSocketAddress to) + private void registerOutboundFilter(ICluster cluster) { - try (DataOutputBuffer out = new DataOutputBuffer(1024)) - { - int version = MessagingService.instance().getVersion(to.getAddress()); - - out.writeInt(MessagingService.PROTOCOL_MAGIC); - out.writeInt(id); - long timestamp = System.currentTimeMillis(); - out.writeInt((int) timestamp); - messageOut.serialize(out, version); - byte[] bytes = out.toByteArray(); - if (messageOut.serializedSize(version) + 12 != bytes.length) - throw new AssertionError(String.format("Message serializedSize(%s) does not match what was written with serialize(out, %s) for verb %s and serializer %s; " + - "expected %s, actual %s", version, version, messageOut.verb, messageOut.serializer.getClass(), - messageOut.serializedSize(version) + 12, bytes.length)); - return new MessageImpl(messageOut.verb.ordinal(), bytes, id, version, from); - } - catch (IOException e) - { - throw new RuntimeException(e); - } + MessagingService.instance().outboundSink.add((message, to) -> { ++ if (isShutdown()) ++ return false; + IMessage serialzied = serializeMessage(message.from(), to, message); + int fromNum = config.num(); // since this instance is sending the message, from will always be this instance + int toNum = cluster.get(fromCassandraInetAddressAndPort(to)).config().num(); + return cluster.filters().permitOutbound(fromNum, toNum, serialzied); + }); } - public static IMessage serializeMessage(MessageIn messageIn, int id, InetSocketAddress from, InetSocketAddress to) + public void uncaughtException(Thread thread, Throwable throwable) { - try (DataOutputBuffer out = new DataOutputBuffer(1024)) - { - int version = MessagingService.instance().getVersion(to.getAddress()); - - out.writeInt(MessagingService.PROTOCOL_MAGIC); - out.writeInt(id); - long timestamp = System.currentTimeMillis(); - out.writeInt((int) timestamp); - - MessageOut.serialize(out, - from.getAddress(), - messageIn.verb, - messageIn.parameters, - messageIn.payload, - version); - - return new MessageImpl(messageIn.verb.ordinal(), out.toByteArray(), id, version, from); - } - catch (IOException e) - { - throw new RuntimeException(e); - } + sync(CassandraDaemon::uncaughtException).accept(thread, throwable); } - private class MessageDeliverySink implements IMessageSink + private static IMessage serializeMessage(InetAddressAndPort from, InetAddressAndPort to, Message<?> messageOut) { - private final BiConsumer<InetSocketAddress, IMessage> deliver; - private final Function<InetAddress, InetSocketAddress> lookupAddressAndPort; + int fromVersion = MessagingService.instance().versions.get(from); + int toVersion = MessagingService.instance().versions.get(to); - MessageDeliverySink(BiConsumer<InetSocketAddress, IMessage> deliver, - Function<InetAddress, InetSocketAddress> lookupAddressAndPort) + // If we're re-serializing a pre-4.0 message for filtering purposes, take into account possible empty payload + // See CASSANDRA-16157 for details. + if (fromVersion < MessagingService.current_version && + ((messageOut.verb().serializer() == ((IVersionedAsymmetricSerializer) NoPayload.serializer) || messageOut.payload == null))) { - this.deliver = deliver; - this.lookupAddressAndPort = lookupAddressAndPort; + return new MessageImpl(messageOut.verb().id, + ByteArrayUtil.EMPTY_BYTE_ARRAY, + messageOut.id(), + toVersion, + fromCassandraInetAddressAndPort(from)); } - public boolean allowOutgoingMessage(MessageOut messageOut, int id, InetAddress to) + try (DataOutputBuffer out = new DataOutputBuffer(1024)) { - InetSocketAddress from = broadcastAddress(); - assert from.equals(lookupAddressAndPort.apply(messageOut.from)); - - // Tracing logic - similar to org.apache.cassandra.net.OutboundTcpConnection.writeConnected - byte[] sessionBytes = (byte[]) messageOut.parameters.get(Tracing.TRACE_HEADER); - if (sessionBytes != null) - { - UUID sessionId = UUIDGen.getUUID(ByteBuffer.wrap(sessionBytes)); - TraceState state = Tracing.instance.get(sessionId); - String message = String.format("Sending %s message to %s", messageOut.verb, to); - // session may have already finished; see CASSANDRA-5668 - if (state == null) - { - byte[] traceTypeBytes = (byte[]) messageOut.parameters.get(Tracing.TRACE_TYPE); - Tracing.TraceType traceType = traceTypeBytes == null ? Tracing.TraceType.QUERY : Tracing.TraceType.deserialize(traceTypeBytes[0]); - Tracing.instance.trace(ByteBuffer.wrap(sessionBytes), message, traceType.getTTL()); - } - else - { - state.trace(message); - if (messageOut.verb == MessagingService.Verb.REQUEST_RESPONSE) - Tracing.instance.doneWithNonLocalSession(state); - } - } - - InetSocketAddress toFull = lookupAddressAndPort.apply(to); - deliver.accept(toFull, - serializeMessage(messageOut, id, broadcastAddress(), toFull)); - - return false; + Message.serializer.serialize(messageOut, out, toVersion); + byte[] bytes = out.toByteArray(); + if (messageOut.serializedSize(toVersion) != bytes.length) + throw new AssertionError(String.format("Message serializedSize(%s) does not match what was written with serialize(out, %s) for verb %s and serializer %s; " + + "expected %s, actual %s", toVersion, toVersion, messageOut.verb(), messageOut.serializer.getClass(), + messageOut.serializedSize(toVersion), bytes.length)); + return new MessageImpl(messageOut.verb().id, bytes, messageOut.id(), toVersion, fromCassandraInetAddressAndPort(from)); } - - public boolean allowIncomingMessage(MessageIn message, int id) + catch (IOException e) { - // we can filter to our heart's content on the outgoing message; no need to worry about incoming - return true; + throw new RuntimeException(e); } } @@@ -528,13 -592,67 +526,12 @@@ new File(dir).mkdirs(); } - private static Config loadConfig(IInstanceConfig overrides) + private Config loadConfig(IInstanceConfig overrides) { - Config config = new Config(); - overrides.propagate(config, mapper); - return config; + Map<String,Object> params = ((InstanceConfig) overrides).getParams(); + return YamlConfigurationLoader.fromMap(params, Config.class); } - private void initializeRing(ICluster cluster) - { - // This should be done outside instance in order to avoid serializing config - String partitionerName = config.getString("partitioner"); - List<String> initialTokens = new ArrayList<>(); - List<InetSocketAddress> hosts = new ArrayList<>(); - List<UUID> hostIds = new ArrayList<>(); - for (int i = 1 ; i <= cluster.size() ; ++i) - { - IInstanceConfig config = cluster.get(i).config(); - initialTokens.add(config.getString("initial_token")); - hosts.add(config.broadcastAddress()); - hostIds.add(config.hostId()); - } - - try - { - IPartitioner partitioner = FBUtilities.newPartitioner(partitionerName); - StorageService storageService = StorageService.instance; - List<Token> tokens = new ArrayList<>(); - for (String token : initialTokens) - tokens.add(partitioner.getTokenFactory().fromString(token)); - - for (int i = 0; i < tokens.size(); i++) - { - InetSocketAddress ep = hosts.get(i); - UUID hostId = hostIds.get(i); - Token token = tokens.get(i); - Gossiper.runInGossipStageBlocking(() -> { - Gossiper.instance.initializeNodeUnsafe(ep.getAddress(), hostId, 1); - Gossiper.instance.injectApplicationState(ep.getAddress(), - ApplicationState.TOKENS, - new VersionedValue.VersionedValueFactory(partitioner).tokens(Collections.singleton(token))); - storageService.onChange(ep.getAddress(), - ApplicationState.STATUS, - new VersionedValue.VersionedValueFactory(partitioner).normal(Collections.singleton(token))); - Gossiper.instance.realMarkAlive(ep.getAddress(), Gossiper.instance.getEndpointStateForEndpoint(ep.getAddress())); - }); - - int messagingVersion = cluster.get(ep).isShutdown() - ? MessagingService.current_version - : Math.min(MessagingService.current_version, cluster.get(ep).getMessagingVersion()); - MessagingService.instance().setVersion(ep.getAddress(), messagingVersion); - } - - // check that all nodes are in token metadata - for (int i = 0; i < tokens.size(); ++i) - assert storageService.getTokenMetadata().isMember(hosts.get(i).getAddress()); - } - catch (Throwable e) // UnknownHostException - { - throw new RuntimeException(e); - } - } - public Future<Void> shutdown() { return shutdown(true); diff --cc test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java index 2bb486f,af1d962..e243451 --- a/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java @@@ -267,12 -225,12 +231,17 @@@ public class InstanceConfig implements return (String)params.get(name); } + public Map<String, Object> getParams() + { + return params; + } + - public static InstanceConfig generate(int nodeNum, String ipAddress, NetworkTopology networkTopology, File root, String token, String seedIp, int datadirCount) + public static InstanceConfig generate(int nodeNum, + INodeProvisionStrategy provisionStrategy, + NetworkTopology networkTopology, + File root, + String token, + int datadirCount) { return new InstanceConfig(nodeNum, networkTopology, diff --cc test/distributed/org/apache/cassandra/distributed/test/JVMDTestTest.java index 8094190,34b04f7..ebf00d7 --- a/test/distributed/org/apache/cassandra/distributed/test/JVMDTestTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/JVMDTestTest.java @@@ -19,21 -19,32 +19,32 @@@ package org.apache.cassandra.distributed.test; import java.io.IOException; + import java.util.Collections; import java.util.List; + import java.util.Map; + import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; + import com.google.common.collect.ImmutableMap; import org.junit.Test; -import org.slf4j.LoggerFactory; - ++import org.apache.cassandra.concurrent.Stage; + import org.apache.cassandra.config.Config; + import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.distributed.Cluster; import org.apache.cassandra.distributed.api.ConsistencyLevel; import org.apache.cassandra.distributed.api.Feature; import org.apache.cassandra.distributed.api.LogAction; import org.apache.cassandra.service.CassandraDaemon; import org.apache.cassandra.utils.FBUtilities; +import org.assertj.core.api.Assertions; -import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows; + import static org.apache.cassandra.distributed.shared.AssertUtils.row; ++import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + import static org.junit.Assert.assertFalse; + import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; public class JVMDTestTest extends TestBaseImpl { @@@ -69,17 -80,86 +80,94 @@@ long mark = logs.mark(); // get the current position so watching doesn't see any previous exceptions cluster.get(2).runOnInstance(() -> { // pretend that an uncaught exception was thrown - LoggerFactory.getLogger(CassandraDaemon.class).error("Error", new RuntimeException("fail without fail")); + CassandraDaemon.uncaughtException(Thread.currentThread(), new RuntimeException("fail without fail")); }); List<String> errors = logs.watchFor(mark, "^ERROR").getResult(); - assertFalse(errors.isEmpty()); + Assertions.assertThat(errors) + // can't check for "fail without fail" since thats on the next line, and watchFor doesn't + // stitch lines together like grepForError does + .allMatch(s -> s.contains("ERROR")) + .allMatch(s -> s.contains("isolatedExecutor")) + .allMatch(s -> s.contains("Exception in thread")) + .as("Unable to find 'ERROR', 'isolatedExecutor', and 'Exception in thread'") + .isNotEmpty(); } } + + @Test + public void nonSharedConfigClassTest() throws IOException + { + Map<String,Object> commitLogCompression = ImmutableMap.of("class_name", "org.apache.cassandra.io.compress.LZ4Compressor", + "parameters", Collections.<String,Object>emptyMap()); + Map<String,Object> encryptionOptions = ImmutableMap.of("cipher_suites", Collections.singletonList("FakeCipher"), + "optional", false, + "enabled", false); + + try (Cluster cluster = Cluster.build(1) + .withConfig(c -> { + c.set("concurrent_reads", 321); + c.set("internode_compression", Config.InternodeCompression.dc); + c.set("client_encryption_options", encryptionOptions); + c.set("commitlog_compression", commitLogCompression); + }).start()) + { + cluster.get(1).runOnInstance(() -> { - assertEquals(321, DatabaseDescriptor.getConcurrentReaders()); ++ assertEquals(321, Stage.READ.getMaximumPoolSize()); + assertEquals(Config.InternodeCompression.dc, DatabaseDescriptor.internodeCompression()); ++ assertEquals(Collections.singletonList("FakeCipher"), DatabaseDescriptor.getNativeProtocolEncryptionOptions().cipher_suites); + assertEquals("org.apache.cassandra.io.compress.LZ4Compressor", DatabaseDescriptor.getCommitLogCompression().class_name); + assertTrue(DatabaseDescriptor.getCommitLogCompression().parameters.isEmpty()); + }); + } + } + + @Test + public void modifySchemaWithStoppedNode() throws Throwable + { + try (Cluster cluster = init(Cluster.build().withNodes(2).withConfig(c -> c.with(Feature.GOSSIP).with(Feature.NETWORK)).start())) + { + assertFalse(cluster.get(1).isShutdown()); + assertFalse(cluster.get(2).isShutdown()); + cluster.schemaChangeIgnoringStoppedInstances("CREATE TABLE "+KEYSPACE+".tbl1 (id int primary key, i int)"); + + cluster.get(2).shutdown(true).get(1, TimeUnit.MINUTES); + assertFalse(cluster.get(1).isShutdown()); + assertTrue(cluster.get(2).isShutdown()); + cluster.schemaChangeIgnoringStoppedInstances("CREATE TABLE "+KEYSPACE+".tbl2 (id int primary key, i int)"); + + cluster.get(1).shutdown(true).get(1, TimeUnit.MINUTES); + assertTrue(cluster.get(1).isShutdown()); + assertTrue(cluster.get(2).isShutdown()); + + // both nodes down, nothing to record a schema change so should get an exception + Throwable thrown = null; + try + { + cluster.schemaChangeIgnoringStoppedInstances("CREATE TABLE " + KEYSPACE + ".tblX (id int primary key, i int)"); + } + catch (Throwable tr) + { + thrown = tr; + } + assertNotNull("Expected to fail with all nodes down", thrown); + + // Have to restart instance1 before instance2 as it is hard-coded as the seed in in-JVM configuration. + cluster.get(1).startup(); + cluster.get(2).startup(); + assertFalse(cluster.get(1).isShutdown()); + assertFalse(cluster.get(2).isShutdown()); + cluster.schemaChangeIgnoringStoppedInstances("CREATE TABLE "+KEYSPACE+".tbl3 (id int primary key, i int)"); + + assertRows(cluster.get(1).executeInternal("SELECT table_name FROM system_schema.tables WHERE keyspace_name = ?", KEYSPACE), + row("tbl1"), row("tbl2"), row("tbl3")); + assertRows(cluster.get(2).executeInternal("SELECT table_name FROM system_schema.tables WHERE keyspace_name = ?", KEYSPACE), + row("tbl1"), row("tbl2"), row("tbl3")); + + // Finally test schema can be changed with the first node down + cluster.get(1).shutdown(true).get(1, TimeUnit.MINUTES); + cluster.schemaChangeIgnoringStoppedInstances("CREATE TABLE "+KEYSPACE+".tbl4 (id int primary key, i int)"); + assertRows(cluster.get(2).executeInternal("SELECT table_name FROM system_schema.tables WHERE keyspace_name = ?", KEYSPACE), + row("tbl1"), row("tbl2"), row("tbl3"), row("tbl4")); + } + } } diff --cc test/unit/org/apache/cassandra/config/YamlConfigurationLoaderTest.java index 0000000,4811b4d..f705217 mode 000000,100644..100644 --- a/test/unit/org/apache/cassandra/config/YamlConfigurationLoaderTest.java +++ b/test/unit/org/apache/cassandra/config/YamlConfigurationLoaderTest.java @@@ -1,0 -1,54 +1,52 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.cassandra.config; + + import java.util.Collections; + import java.util.Map; + + import com.google.common.collect.ImmutableMap; + import org.junit.Test; + + import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertArrayEquals; + + + public class YamlConfigurationLoaderTest + { + @Test + public void fromMapTest() + { + int storagePort = 123; + Config.CommitLogSync commitLogSync = Config.CommitLogSync.batch; + ParameterizedClass seedProvider = new ParameterizedClass("org.apache.cassandra.locator.SimpleSeedProvider", Collections.emptyMap()); - EncryptionOptions encryptionOptions = new EncryptionOptions.ClientEncryptionOptions(); - encryptionOptions.keystore = "myNewKeystore"; - encryptionOptions.cipher_suites = new String[] {"SomeCipher"}; - ++ EncryptionOptions encryptionOptions = new EncryptionOptions() ++ .withKeyStore("myNewKeystore") ++ .withCipherSuites("SomeCipher") ++ .withOptional(false); + Map<String,Object> map = ImmutableMap.of("storage_port", storagePort, + "commitlog_sync", commitLogSync, + "seed_provider", seedProvider, + "client_encryption_options", encryptionOptions); + Config config = YamlConfigurationLoader.fromMap(map, Config.class); + assertEquals(storagePort, config.storage_port); // Check a simple integer + assertEquals(commitLogSync, config.commitlog_sync); // Check an enum + assertEquals(seedProvider, config.seed_provider); // Check a parameterized class - assertEquals(encryptionOptions.keystore, config.client_encryption_options.keystore); // Check a nested object - assertArrayEquals(encryptionOptions.cipher_suites, config.client_encryption_options.cipher_suites); ++ assertEquals(encryptionOptions, config.client_encryption_options); // Check a nested object + } + } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org