Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 0e652e754 -> eea042b0b
  refs/heads/trunk 477c54c03 -> 14b1c9607


cassandra-stress supports whitelist mode for node config

patch by benedict; reviewed by tjake for CASSANDRA-7658


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/eea042b0
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/eea042b0
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/eea042b0

Branch: refs/heads/cassandra-2.1
Commit: eea042b0b0abfb09f60b672c8930a924c5d7f25b
Parents: 0e652e7
Author: Benedict Elliott Smith <bened...@apache.org>
Authored: Sun Sep 14 09:13:18 2014 +0100
Committer: Benedict Elliott Smith <bened...@apache.org>
Committed: Sun Sep 14 09:13:18 2014 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/stress/settings/SettingsNode.java | 47 ++++++++++++++-
 .../stress/settings/StressSettings.java         |  5 +-
 .../cassandra/stress/util/JavaDriverClient.java | 16 ++++-
 .../stress/util/SmartThriftClient.java          | 62 ++++++++++++++------
 5 files changed, 106 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/eea042b0/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4c39f5c..7e18719 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.1
+ * cassandra-stress supports whitelist mode for node config
  * GCInspector more closely tracks GC; cassandra-stress and nodetool report it
  * nodetool won't output bogus ownership info without a keyspace 
(CASSANDRA-7173)
  * Add human readable option to nodetool commands (CASSANDRA-5433)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/eea042b0/tools/stress/src/org/apache/cassandra/stress/settings/SettingsNode.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsNode.java 
b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsNode.java
index 4fd7d34..30fe908 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsNode.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsNode.java
@@ -22,15 +22,20 @@ package org.apache.cassandra.stress.settings;
 
 
 import java.io.*;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 public class SettingsNode implements Serializable
 {
-
     public final List<String> nodes;
+    public final boolean isWhiteList;
 
     public SettingsNode(Options options)
     {
@@ -63,6 +68,41 @@ public class SettingsNode implements Serializable
         }
         else
             nodes = Arrays.asList(options.list.value().split(","));
+        isWhiteList = options.whitelist.setByUser();
+    }
+
+    public Set<InetAddress> resolveAll()
+    {
+        Set<InetAddress> r = new HashSet<>();
+        for (String node : nodes)
+        {
+            try
+            {
+                r.add(InetAddress.getByName(node));
+            }
+            catch (UnknownHostException e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+        return r;
+    }
+
+    public Set<InetSocketAddress> resolveAll(int port)
+    {
+        Set<InetSocketAddress> r = new HashSet<>();
+        for (String node : nodes)
+        {
+            try
+            {
+                r.add(new InetSocketAddress(InetAddress.getByName(node), 
port));
+            }
+            catch (UnknownHostException e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+        return r;
     }
 
     public String randomNode()
@@ -77,13 +117,14 @@ public class SettingsNode implements Serializable
 
     public static final class Options extends GroupedOptions
     {
+        final OptionSimple whitelist = new OptionSimple("whitelist", "", null, 
"Limit communications to the provided nodes", false);
         final OptionSimple file = new OptionSimple("file=", ".*", null, "Node 
file (one per line)", false);
-        final OptionSimple list = new OptionSimple("", "[^=,]+(,[^=,]+)*", 
"localhost", "comma delimited list of hosts", false);
+        final OptionSimple list = new OptionSimple("", "[^=,]+(,[^=,]+)*", 
"localhost", "comma delimited list of nodes", false);
 
         @Override
         public List<? extends Option> options()
         {
-            return Arrays.asList(file, list);
+            return Arrays.asList(whitelist, file, list);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/eea042b0/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java 
b/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java
index bdd10e5..ba72821 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java
@@ -25,6 +25,9 @@ import java.io.Serializable;
 import java.util.*;
 
 import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
+import com.datastax.driver.core.policies.RoundRobinPolicy;
+import com.datastax.driver.core.policies.WhiteListPolicy;
 import org.apache.cassandra.config.EncryptionOptions;
 import org.apache.cassandra.stress.util.JavaDriverClient;
 import org.apache.cassandra.stress.util.SimpleThriftClient;
@@ -177,7 +180,7 @@ public class StressSettings implements Serializable
                     return client;
 
                 EncryptionOptions.ClientEncryptionOptions encOptions = 
transport.getEncryptionOptions();
-                JavaDriverClient c = new JavaDriverClient(currentNode, 
port.nativePort, encOptions);
+                JavaDriverClient c = new JavaDriverClient(this, currentNode, 
port.nativePort, encOptions);
                 c.connect(mode.compression());
                 if (setKeyspace)
                     c.execute("USE \"" + schema.keyspace + "\";", 
org.apache.cassandra.db.ConsistencyLevel.ONE);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/eea042b0/tools/stress/src/org/apache/cassandra/stress/util/JavaDriverClient.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/util/JavaDriverClient.java 
b/tools/stress/src/org/apache/cassandra/stress/util/JavaDriverClient.java
index c901461..2105179 100644
--- a/tools/stress/src/org/apache/cassandra/stress/util/JavaDriverClient.java
+++ b/tools/stress/src/org/apache/cassandra/stress/util/JavaDriverClient.java
@@ -23,10 +23,13 @@ import java.util.concurrent.ConcurrentMap;
 import javax.net.ssl.SSLContext;
 
 import com.datastax.driver.core.*;
+import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
+import com.datastax.driver.core.policies.WhiteListPolicy;
 import org.apache.cassandra.config.EncryptionOptions;
 import org.apache.cassandra.security.SSLFactory;
 import io.netty.util.internal.logging.InternalLoggerFactory;
 import io.netty.util.internal.logging.Slf4JLoggerFactory;
+import org.apache.cassandra.stress.settings.StressSettings;
 
 public class JavaDriverClient
 {
@@ -41,19 +44,24 @@ public class JavaDriverClient
     private final EncryptionOptions.ClientEncryptionOptions encryptionOptions;
     private Cluster cluster;
     private Session session;
+    private final WhiteListPolicy whitelist;
 
     private static final ConcurrentMap<String, PreparedStatement> stmts = new 
ConcurrentHashMap<>();
 
-    public JavaDriverClient(String host, int port)
+    public JavaDriverClient(StressSettings settings, String host, int port)
     {
-        this(host, port, new EncryptionOptions.ClientEncryptionOptions());
+        this(settings, host, port, new 
EncryptionOptions.ClientEncryptionOptions());
     }
 
-    public JavaDriverClient(String host, int port, 
EncryptionOptions.ClientEncryptionOptions encryptionOptions)
+    public JavaDriverClient(StressSettings settings, String host, int port, 
EncryptionOptions.ClientEncryptionOptions encryptionOptions)
     {
         this.host = host;
         this.port = port;
         this.encryptionOptions = encryptionOptions;
+        if (settings.node.isWhiteList)
+            whitelist = new WhiteListPolicy(new DCAwareRoundRobinPolicy(), 
settings.node.resolveAll(settings.port.nativePort));
+        else
+            whitelist = null;
     }
 
     public PreparedStatement prepare(String query)
@@ -78,6 +86,8 @@ public class JavaDriverClient
                                                 .addContactPoint(host)
                                                 .withPort(port)
                                                 .withoutMetrics(); // The 
driver uses metrics 3 with conflict with our version
+        if (whitelist != null)
+            clusterBuilder.withLoadBalancingPolicy(whitelist);
         clusterBuilder.withCompression(compression);
         if (encryptionOptions.enabled)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/eea042b0/tools/stress/src/org/apache/cassandra/stress/util/SmartThriftClient.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/util/SmartThriftClient.java 
b/tools/stress/src/org/apache/cassandra/stress/util/SmartThriftClient.java
index 7ede496..b880283 100644
--- a/tools/stress/src/org/apache/cassandra/stress/util/SmartThriftClient.java
+++ b/tools/stress/src/org/apache/cassandra/stress/util/SmartThriftClient.java
@@ -21,10 +21,13 @@ package org.apache.cassandra.stress.util;
  */
 
 
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import com.datastax.driver.core.Host;
@@ -41,17 +44,29 @@ public class SmartThriftClient implements ThriftClient
     final String keyspace;
     final Metadata metadata;
     final StressSettings settings;
-    final ConcurrentHashMap<Host, ConcurrentLinkedQueue<Client>> cache = new 
ConcurrentHashMap<>();
+    final ConcurrentHashMap<InetAddress, ConcurrentLinkedQueue<Client>> cache 
= new ConcurrentHashMap<>();
 
     final AtomicInteger queryIdCounter = new AtomicInteger();
     final ConcurrentHashMap<Integer, String> queryStrings = new 
ConcurrentHashMap<>();
     final ConcurrentHashMap<String, Integer> queryIds = new 
ConcurrentHashMap<>();
+    final Set<InetAddress> whiteset;
+    final List<InetAddress> whitelist;
 
     public SmartThriftClient(StressSettings settings, String keyspace, 
Metadata metadata)
     {
         this.metadata = metadata;
         this.keyspace = keyspace;
         this.settings = settings;
+        if (!settings.node.isWhiteList)
+        {
+            whiteset = null;
+            whitelist = null;
+        }
+        else
+        {
+            whiteset = settings.node.resolveAll();
+            whitelist = Arrays.asList(whiteset.toArray(new InetAddress[0]));
+        }
     }
 
     private final AtomicInteger roundrobin = new AtomicInteger();
@@ -73,13 +88,13 @@ public class SmartThriftClient implements ThriftClient
     final class Client
     {
         final Cassandra.Client client;
-        final Host host;
+        final InetAddress server;
         final Map<Integer, Integer> queryMap = new HashMap<>();
 
-        Client(Cassandra.Client client, Host host)
+        Client(Cassandra.Client client, InetAddress server)
         {
             this.client = client;
-            this.host = host;
+            this.server = server;
         }
 
         Integer get(Integer id, boolean cql3) throws TException
@@ -111,22 +126,33 @@ public class SmartThriftClient implements ThriftClient
     private Client get(ByteBuffer pk)
     {
         Set<Host> hosts = metadata.getReplicas(metadata.quote(keyspace), pk);
-        int pos = roundrobin.incrementAndGet() % hosts.size();
-        if (pos < 0)
-            pos = -pos;
-        Host host = Iterators.get(hosts.iterator(), pos);
-        ConcurrentLinkedQueue<Client> q = cache.get(host);
+        InetAddress address = null;
+        if (hosts.size() > 0)
+        {
+            int pos = roundrobin.incrementAndGet() % hosts.size();
+            for (int i = 0 ; address == null && i < hosts.size() ; i++)
+            {
+                if (pos < 0)
+                    pos = -pos;
+                Host host = Iterators.get(hosts.iterator(), (pos + i) % 
hosts.size());
+                if (whiteset == null || whiteset.contains(host.getAddress()))
+                    address = host.getAddress();
+            }
+        }
+        if (address == null)
+            address = 
whitelist.get(ThreadLocalRandom.current().nextInt(whitelist.size()));
+        ConcurrentLinkedQueue<Client> q = cache.get(address);
         if (q == null)
         {
             ConcurrentLinkedQueue<Client> newQ = new 
ConcurrentLinkedQueue<Client>();
-            q = cache.putIfAbsent(host, newQ);
+            q = cache.putIfAbsent(address, newQ);
             if (q == null)
                 q = newQ;
         }
         Client tclient = q.poll();
         if (tclient != null)
             return tclient;
-        return new 
Client(settings.getRawThriftClient(host.getAddress().getHostAddress()), host);
+        return new 
Client(settings.getRawThriftClient(address.getHostAddress()), address);
     }
 
     @Override
@@ -140,7 +166,7 @@ public class SmartThriftClient implements ThriftClient
                 
client.client.batch_mutate(Collections.singletonMap(e.getKey(), e.getValue()), 
consistencyLevel);
             } finally
             {
-                cache.get(client.host).add(client);
+                cache.get(client.server).add(client);
             }
         }
     }
@@ -154,7 +180,7 @@ public class SmartThriftClient implements ThriftClient
             return client.client.get_slice(key, parent, predicate, 
consistencyLevel);
         } finally
         {
-            cache.get(client.host).add(client);
+            cache.get(client.server).add(client);
         }
     }
 
@@ -167,7 +193,7 @@ public class SmartThriftClient implements ThriftClient
             client.client.insert(key, column_parent, column, 
consistency_level);
         } finally
         {
-            cache.get(client.host).add(client);
+            cache.get(client.server).add(client);
         }
     }
 
@@ -180,7 +206,7 @@ public class SmartThriftClient implements ThriftClient
             return 
client.client.execute_cql_query(ByteBufferUtil.bytes(query), compression);
         } finally
         {
-            cache.get(client.host).add(client);
+            cache.get(client.server).add(client);
         }
     }
 
@@ -193,7 +219,7 @@ public class SmartThriftClient implements ThriftClient
             return 
client.client.execute_cql3_query(ByteBufferUtil.bytes(query), compression, 
consistency);
         } finally
         {
-            cache.get(client.host).add(client);
+            cache.get(client.server).add(client);
         }
     }
 
@@ -212,7 +238,7 @@ public class SmartThriftClient implements ThriftClient
             return 
client.client.execute_prepared_cql3_query(client.get(queryId, true), values, 
consistency);
         } finally
         {
-            cache.get(client.host).add(client);
+            cache.get(client.server).add(client);
         }
     }
 
@@ -231,7 +257,7 @@ public class SmartThriftClient implements ThriftClient
             return 
client.client.execute_prepared_cql_query(client.get(queryId, true), values);
         } finally
         {
-            cache.get(client.host).add(client);
+            cache.get(client.server).add(client);
         }
     }
 

Reply via email to