Repository: cassandra Updated Branches: refs/heads/cassandra-2.1 0e652e754 -> eea042b0b refs/heads/trunk 477c54c03 -> 14b1c9607
cassandra-stress supports whitelist mode for node config patch by benedict; reviewed by tjake for CASSANDRA-7658 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/eea042b0 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/eea042b0 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/eea042b0 Branch: refs/heads/cassandra-2.1 Commit: eea042b0b0abfb09f60b672c8930a924c5d7f25b Parents: 0e652e7 Author: Benedict Elliott Smith <bened...@apache.org> Authored: Sun Sep 14 09:13:18 2014 +0100 Committer: Benedict Elliott Smith <bened...@apache.org> Committed: Sun Sep 14 09:13:18 2014 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/stress/settings/SettingsNode.java | 47 ++++++++++++++- .../stress/settings/StressSettings.java | 5 +- .../cassandra/stress/util/JavaDriverClient.java | 16 ++++- .../stress/util/SmartThriftClient.java | 62 ++++++++++++++------ 5 files changed, 106 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/eea042b0/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 4c39f5c..7e18719 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.1.1 + * cassandra-stress supports whitelist mode for node config * GCInspector more closely tracks GC; cassandra-stress and nodetool report it * nodetool won't output bogus ownership info without a keyspace (CASSANDRA-7173) * Add human readable option to nodetool commands (CASSANDRA-5433) http://git-wip-us.apache.org/repos/asf/cassandra/blob/eea042b0/tools/stress/src/org/apache/cassandra/stress/settings/SettingsNode.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsNode.java b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsNode.java index 4fd7d34..30fe908 100644 --- a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsNode.java +++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsNode.java @@ -22,15 +22,20 @@ package org.apache.cassandra.stress.settings; import java.io.*; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; public class SettingsNode implements Serializable { - public final List<String> nodes; + public final boolean isWhiteList; public SettingsNode(Options options) { @@ -63,6 +68,41 @@ public class SettingsNode implements Serializable } else nodes = Arrays.asList(options.list.value().split(",")); + isWhiteList = options.whitelist.setByUser(); + } + + public Set<InetAddress> resolveAll() + { + Set<InetAddress> r = new HashSet<>(); + for (String node : nodes) + { + try + { + r.add(InetAddress.getByName(node)); + } + catch (UnknownHostException e) + { + throw new RuntimeException(e); + } + } + return r; + } + + public Set<InetSocketAddress> resolveAll(int port) + { + Set<InetSocketAddress> r = new HashSet<>(); + for (String node : nodes) + { + try + { + r.add(new InetSocketAddress(InetAddress.getByName(node), port)); + } + catch (UnknownHostException e) + { + throw new RuntimeException(e); + } + } + return r; } public String randomNode() @@ -77,13 +117,14 @@ public class SettingsNode implements Serializable public static final class Options extends GroupedOptions { + final OptionSimple whitelist = new OptionSimple("whitelist", "", null, "Limit communications to the provided nodes", false); final OptionSimple file = new OptionSimple("file=", ".*", null, "Node file (one per line)", false); - final OptionSimple list = new OptionSimple("", "[^=,]+(,[^=,]+)*", "localhost", "comma delimited list of hosts", false); + final OptionSimple list = new OptionSimple("", "[^=,]+(,[^=,]+)*", "localhost", "comma delimited list of nodes", false); @Override public List<? extends Option> options() { - return Arrays.asList(file, list); + return Arrays.asList(whitelist, file, list); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/eea042b0/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java b/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java index bdd10e5..ba72821 100644 --- a/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java +++ b/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java @@ -25,6 +25,9 @@ import java.io.Serializable; import java.util.*; import com.datastax.driver.core.Metadata; +import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy; +import com.datastax.driver.core.policies.RoundRobinPolicy; +import com.datastax.driver.core.policies.WhiteListPolicy; import org.apache.cassandra.config.EncryptionOptions; import org.apache.cassandra.stress.util.JavaDriverClient; import org.apache.cassandra.stress.util.SimpleThriftClient; @@ -177,7 +180,7 @@ public class StressSettings implements Serializable return client; EncryptionOptions.ClientEncryptionOptions encOptions = transport.getEncryptionOptions(); - JavaDriverClient c = new JavaDriverClient(currentNode, port.nativePort, encOptions); + JavaDriverClient c = new JavaDriverClient(this, currentNode, port.nativePort, encOptions); c.connect(mode.compression()); if (setKeyspace) c.execute("USE \"" + schema.keyspace + "\";", org.apache.cassandra.db.ConsistencyLevel.ONE); http://git-wip-us.apache.org/repos/asf/cassandra/blob/eea042b0/tools/stress/src/org/apache/cassandra/stress/util/JavaDriverClient.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/util/JavaDriverClient.java b/tools/stress/src/org/apache/cassandra/stress/util/JavaDriverClient.java index c901461..2105179 100644 --- a/tools/stress/src/org/apache/cassandra/stress/util/JavaDriverClient.java +++ b/tools/stress/src/org/apache/cassandra/stress/util/JavaDriverClient.java @@ -23,10 +23,13 @@ import java.util.concurrent.ConcurrentMap; import javax.net.ssl.SSLContext; import com.datastax.driver.core.*; +import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy; +import com.datastax.driver.core.policies.WhiteListPolicy; import org.apache.cassandra.config.EncryptionOptions; import org.apache.cassandra.security.SSLFactory; import io.netty.util.internal.logging.InternalLoggerFactory; import io.netty.util.internal.logging.Slf4JLoggerFactory; +import org.apache.cassandra.stress.settings.StressSettings; public class JavaDriverClient { @@ -41,19 +44,24 @@ public class JavaDriverClient private final EncryptionOptions.ClientEncryptionOptions encryptionOptions; private Cluster cluster; private Session session; + private final WhiteListPolicy whitelist; private static final ConcurrentMap<String, PreparedStatement> stmts = new ConcurrentHashMap<>(); - public JavaDriverClient(String host, int port) + public JavaDriverClient(StressSettings settings, String host, int port) { - this(host, port, new EncryptionOptions.ClientEncryptionOptions()); + this(settings, host, port, new EncryptionOptions.ClientEncryptionOptions()); } - public JavaDriverClient(String host, int port, EncryptionOptions.ClientEncryptionOptions encryptionOptions) + public JavaDriverClient(StressSettings settings, String host, int port, EncryptionOptions.ClientEncryptionOptions encryptionOptions) { this.host = host; this.port = port; this.encryptionOptions = encryptionOptions; + if (settings.node.isWhiteList) + whitelist = new WhiteListPolicy(new DCAwareRoundRobinPolicy(), settings.node.resolveAll(settings.port.nativePort)); + else + whitelist = null; } public PreparedStatement prepare(String query) @@ -78,6 +86,8 @@ public class JavaDriverClient .addContactPoint(host) .withPort(port) .withoutMetrics(); // The driver uses metrics 3 with conflict with our version + if (whitelist != null) + clusterBuilder.withLoadBalancingPolicy(whitelist); clusterBuilder.withCompression(compression); if (encryptionOptions.enabled) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/eea042b0/tools/stress/src/org/apache/cassandra/stress/util/SmartThriftClient.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/util/SmartThriftClient.java b/tools/stress/src/org/apache/cassandra/stress/util/SmartThriftClient.java index 7ede496..b880283 100644 --- a/tools/stress/src/org/apache/cassandra/stress/util/SmartThriftClient.java +++ b/tools/stress/src/org/apache/cassandra/stress/util/SmartThriftClient.java @@ -21,10 +21,13 @@ package org.apache.cassandra.stress.util; */ +import java.net.InetAddress; +import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicInteger; import com.datastax.driver.core.Host; @@ -41,17 +44,29 @@ public class SmartThriftClient implements ThriftClient final String keyspace; final Metadata metadata; final StressSettings settings; - final ConcurrentHashMap<Host, ConcurrentLinkedQueue<Client>> cache = new ConcurrentHashMap<>(); + final ConcurrentHashMap<InetAddress, ConcurrentLinkedQueue<Client>> cache = new ConcurrentHashMap<>(); final AtomicInteger queryIdCounter = new AtomicInteger(); final ConcurrentHashMap<Integer, String> queryStrings = new ConcurrentHashMap<>(); final ConcurrentHashMap<String, Integer> queryIds = new ConcurrentHashMap<>(); + final Set<InetAddress> whiteset; + final List<InetAddress> whitelist; public SmartThriftClient(StressSettings settings, String keyspace, Metadata metadata) { this.metadata = metadata; this.keyspace = keyspace; this.settings = settings; + if (!settings.node.isWhiteList) + { + whiteset = null; + whitelist = null; + } + else + { + whiteset = settings.node.resolveAll(); + whitelist = Arrays.asList(whiteset.toArray(new InetAddress[0])); + } } private final AtomicInteger roundrobin = new AtomicInteger(); @@ -73,13 +88,13 @@ public class SmartThriftClient implements ThriftClient final class Client { final Cassandra.Client client; - final Host host; + final InetAddress server; final Map<Integer, Integer> queryMap = new HashMap<>(); - Client(Cassandra.Client client, Host host) + Client(Cassandra.Client client, InetAddress server) { this.client = client; - this.host = host; + this.server = server; } Integer get(Integer id, boolean cql3) throws TException @@ -111,22 +126,33 @@ public class SmartThriftClient implements ThriftClient private Client get(ByteBuffer pk) { Set<Host> hosts = metadata.getReplicas(metadata.quote(keyspace), pk); - int pos = roundrobin.incrementAndGet() % hosts.size(); - if (pos < 0) - pos = -pos; - Host host = Iterators.get(hosts.iterator(), pos); - ConcurrentLinkedQueue<Client> q = cache.get(host); + InetAddress address = null; + if (hosts.size() > 0) + { + int pos = roundrobin.incrementAndGet() % hosts.size(); + for (int i = 0 ; address == null && i < hosts.size() ; i++) + { + if (pos < 0) + pos = -pos; + Host host = Iterators.get(hosts.iterator(), (pos + i) % hosts.size()); + if (whiteset == null || whiteset.contains(host.getAddress())) + address = host.getAddress(); + } + } + if (address == null) + address = whitelist.get(ThreadLocalRandom.current().nextInt(whitelist.size())); + ConcurrentLinkedQueue<Client> q = cache.get(address); if (q == null) { ConcurrentLinkedQueue<Client> newQ = new ConcurrentLinkedQueue<Client>(); - q = cache.putIfAbsent(host, newQ); + q = cache.putIfAbsent(address, newQ); if (q == null) q = newQ; } Client tclient = q.poll(); if (tclient != null) return tclient; - return new Client(settings.getRawThriftClient(host.getAddress().getHostAddress()), host); + return new Client(settings.getRawThriftClient(address.getHostAddress()), address); } @Override @@ -140,7 +166,7 @@ public class SmartThriftClient implements ThriftClient client.client.batch_mutate(Collections.singletonMap(e.getKey(), e.getValue()), consistencyLevel); } finally { - cache.get(client.host).add(client); + cache.get(client.server).add(client); } } } @@ -154,7 +180,7 @@ public class SmartThriftClient implements ThriftClient return client.client.get_slice(key, parent, predicate, consistencyLevel); } finally { - cache.get(client.host).add(client); + cache.get(client.server).add(client); } } @@ -167,7 +193,7 @@ public class SmartThriftClient implements ThriftClient client.client.insert(key, column_parent, column, consistency_level); } finally { - cache.get(client.host).add(client); + cache.get(client.server).add(client); } } @@ -180,7 +206,7 @@ public class SmartThriftClient implements ThriftClient return client.client.execute_cql_query(ByteBufferUtil.bytes(query), compression); } finally { - cache.get(client.host).add(client); + cache.get(client.server).add(client); } } @@ -193,7 +219,7 @@ public class SmartThriftClient implements ThriftClient return client.client.execute_cql3_query(ByteBufferUtil.bytes(query), compression, consistency); } finally { - cache.get(client.host).add(client); + cache.get(client.server).add(client); } } @@ -212,7 +238,7 @@ public class SmartThriftClient implements ThriftClient return client.client.execute_prepared_cql3_query(client.get(queryId, true), values, consistency); } finally { - cache.get(client.host).add(client); + cache.get(client.server).add(client); } } @@ -231,7 +257,7 @@ public class SmartThriftClient implements ThriftClient return client.client.execute_prepared_cql_query(client.get(queryId, true), values); } finally { - cache.get(client.host).add(client); + cache.get(client.server).add(client); } }