Switch stress to use ITransportFactory patch by Sam Tunnicliffe; reviewed by Aleksey Yeschenko for CASSANDRA-6641
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e7b23cc7 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e7b23cc7 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e7b23cc7 Branch: refs/heads/trunk Commit: e7b23cc741622ccf8b1487d8622cca47dcb9cc34 Parents: 36af409 Author: Aleksey Yeschenko <alek...@apache.org> Authored: Fri Jan 31 01:38:54 2014 -0500 Committer: Aleksey Yeschenko <alek...@apache.org> Committed: Fri Jan 31 01:38:54 2014 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/cli/CliOptions.java | 45 +++++++++++-- .../org/apache/cassandra/stress/Session.java | 69 +++++++++++++------- 3 files changed, 88 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e7b23cc7/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 56a72ef..94b21d4 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -17,6 +17,7 @@ * Add AbstractCompactionStrategy.startup() method (CASSANDRA-6637) * SSTableScanner may skip rows during cleanup (CASSANDRA-6638) * sstables from stalled repair sessions can resurrect deleted data (CASSANDRA-6503) + * Switch stress to use ITransportFactory (CASSANDRA-6641) Merged from 1.2: * fsync compression metadata (CASSANDRA-6531) * Validate CF existence on execution for prepared statement (CASSANDRA-6535) http://git-wip-us.apache.org/repos/asf/cassandra/blob/e7b23cc7/src/java/org/apache/cassandra/cli/CliOptions.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cli/CliOptions.java b/src/java/org/apache/cassandra/cli/CliOptions.java index 68f17c9..7894bf9 100644 --- a/src/java/org/apache/cassandra/cli/CliOptions.java +++ b/src/java/org/apache/cassandra/cli/CliOptions.java @@ -17,9 +17,15 @@ */ package org.apache.cassandra.cli; +import java.util.HashMap; +import java.util.Map; + +import com.google.common.base.Joiner; import org.apache.commons.cli.*; +import org.apache.cassandra.config.EncryptionOptions; import org.apache.cassandra.thrift.ITransportFactory; +import org.apache.cassandra.thrift.SSLTransportFactory; /** * @@ -114,11 +120,6 @@ public class CliOptions css.hostName = DEFAULT_HOST; } - if (cmd.hasOption(TRANSPORT_FACTORY)) - { - css.transportFactory = validateAndSetTransportFactory(cmd.getOptionValue(TRANSPORT_FACTORY)); - } - if (cmd.hasOption(DEBUG_OPTION)) { css.debug = true; @@ -217,6 +218,12 @@ public class CliOptions css.encOptions.cipher_suites = cmd.getOptionValue(SSL_CIPHER_SUITES).split(","); } + if (cmd.hasOption(TRANSPORT_FACTORY)) + { + css.transportFactory = validateAndSetTransportFactory(cmd.getOptionValue(TRANSPORT_FACTORY)); + configureTransportFactory(css.transportFactory, css.encOptions); + } + // Abort if there are any unrecognized arguments left if (cmd.getArgs().length > 0) { @@ -281,4 +288,32 @@ public class CliOptions throw new IllegalArgumentException(String.format("Cannot create a transport factory '%s'.", transportFactory), e); } } + + private static void configureTransportFactory(ITransportFactory transportFactory, EncryptionOptions encOptions) + { + Map<String, String> options = new HashMap<>(); + // If the supplied factory supports the same set of options as our SSL impl, set those + if (transportFactory.supportedOptions().contains(SSLTransportFactory.TRUSTSTORE)) + options.put(SSLTransportFactory.TRUSTSTORE, encOptions.truststore); + if (transportFactory.supportedOptions().contains(SSLTransportFactory.TRUSTSTORE_PASSWORD)) + options.put(SSLTransportFactory.TRUSTSTORE_PASSWORD, encOptions.truststore_password); + if (transportFactory.supportedOptions().contains(SSLTransportFactory.PROTOCOL)) + options.put(SSLTransportFactory.PROTOCOL, encOptions.protocol); + if (transportFactory.supportedOptions().contains(SSLTransportFactory.CIPHER_SUITES)) + options.put(SSLTransportFactory.CIPHER_SUITES, Joiner.on(',').join(encOptions.cipher_suites)); + + if (transportFactory.supportedOptions().contains(SSLTransportFactory.KEYSTORE) + && encOptions.require_client_auth) + options.put(SSLTransportFactory.KEYSTORE, encOptions.keystore); + if (transportFactory.supportedOptions().contains(SSLTransportFactory.KEYSTORE_PASSWORD) + && encOptions.require_client_auth) + options.put(SSLTransportFactory.KEYSTORE_PASSWORD, encOptions.keystore_password); + + // Now check if any of the factory's supported options are set as system properties + for (String optionKey : transportFactory.supportedOptions()) + if (System.getProperty(optionKey) != null) + options.put(optionKey, System.getProperty(optionKey)); + + transportFactory.setOptions(options); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e7b23cc7/tools/stress/src/org/apache/cassandra/stress/Session.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/Session.java b/tools/stress/src/org/apache/cassandra/stress/Session.java index 242fa14..9ac865d 100644 --- a/tools/stress/src/org/apache/cassandra/stress/Session.java +++ b/tools/stress/src/org/apache/cassandra/stress/Session.java @@ -24,29 +24,25 @@ import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.atomic.AtomicInteger; +import com.google.common.base.Joiner; +import org.apache.commons.cli.*; +import org.apache.commons.lang3.StringUtils; import com.yammer.metrics.Metrics; import org.apache.cassandra.auth.IAuthenticator; -import org.apache.cassandra.cli.transport.FramedTransportFactory; import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.EncryptionOptions; import org.apache.cassandra.config.EncryptionOptions.ClientEncryptionOptions; +import org.apache.cassandra.config.EncryptionOptions; +import org.apache.cassandra.db.ColumnFamilyType; +import org.apache.cassandra.db.marshal.*; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.exceptions.SyntaxException; -import org.apache.cassandra.db.marshal.*; -import org.apache.commons.cli.*; - -import org.apache.cassandra.db.ColumnFamilyType; import org.apache.cassandra.stress.util.CassandraClient; -import org.apache.cassandra.transport.SimpleClient; import org.apache.cassandra.thrift.*; -import org.apache.commons.lang3.StringUtils; - +import org.apache.cassandra.transport.SimpleClient; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.thrift.protocol.TBinaryProtocol; -import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; -import org.apache.thrift.transport.TTransportFactory; public class Session implements Serializable { @@ -175,7 +171,7 @@ public class Session implements Serializable public final boolean timeUUIDComparator; public double traceProbability = 0.0; public EncryptionOptions encOptions = new ClientEncryptionOptions(); - public TTransportFactory transportFactory = new FramedTransportFactory(); + public ITransportFactory transportFactory = new TFramedTransportFactory(); public Session(String[] arguments) throws IllegalArgumentException, SyntaxException { @@ -455,7 +451,10 @@ public class Session implements Serializable encOptions.cipher_suites = cmd.getOptionValue(SSL_CIPHER_SUITES).split(","); if (cmd.hasOption("tf")) + { transportFactory = validateAndSetTransportFactory(cmd.getOptionValue("tf")); + configureTransportFactory(transportFactory, encOptions); + } if (cmd.hasOption("un")) username = cmd.getOptionValue("un"); @@ -476,17 +475,17 @@ public class Session implements Serializable sigma = numDifferentKeys * STDev; } - private TTransportFactory validateAndSetTransportFactory(String transportFactory) + private ITransportFactory validateAndSetTransportFactory(String transportFactory) { try { Class factory = Class.forName(transportFactory); - if(!TTransportFactory.class.isAssignableFrom(factory)) + if(!ITransportFactory.class.isAssignableFrom(factory)) throw new IllegalArgumentException(String.format("transport factory '%s' " + - "not derived from TTransportFactory", transportFactory)); + "not derived from ITransportFactory", transportFactory)); - return (TTransportFactory) factory.newInstance(); + return (ITransportFactory) factory.newInstance(); } catch (Exception e) { @@ -494,6 +493,34 @@ public class Session implements Serializable } } + private void configureTransportFactory(ITransportFactory transportFactory, EncryptionOptions encOptions) + { + Map<String, String> options = new HashMap<>(); + // If the supplied factory supports the same set of options as our SSL impl, set those + if (transportFactory.supportedOptions().contains(SSLTransportFactory.TRUSTSTORE)) + options.put(SSLTransportFactory.TRUSTSTORE, encOptions.truststore); + if (transportFactory.supportedOptions().contains(SSLTransportFactory.TRUSTSTORE_PASSWORD)) + options.put(SSLTransportFactory.TRUSTSTORE_PASSWORD, encOptions.truststore_password); + if (transportFactory.supportedOptions().contains(SSLTransportFactory.PROTOCOL)) + options.put(SSLTransportFactory.PROTOCOL, encOptions.protocol); + if (transportFactory.supportedOptions().contains(SSLTransportFactory.CIPHER_SUITES)) + options.put(SSLTransportFactory.CIPHER_SUITES, Joiner.on(',').join(encOptions.cipher_suites)); + + if (transportFactory.supportedOptions().contains(SSLTransportFactory.KEYSTORE) + && encOptions.require_client_auth) + options.put(SSLTransportFactory.KEYSTORE, encOptions.keystore); + if (transportFactory.supportedOptions().contains(SSLTransportFactory.KEYSTORE_PASSWORD) + && encOptions.require_client_auth) + options.put(SSLTransportFactory.KEYSTORE_PASSWORD, encOptions.keystore_password); + + // Now check if any of the factory's supported options are set as system properties + for (String optionKey : transportFactory.supportedOptions()) + if (System.getProperty(optionKey) != null) + options.put(optionKey, System.getProperty(optionKey)); + + transportFactory.setOptions(options); + } + public int getCardinality() { return cardinality; @@ -748,12 +775,11 @@ public class Session implements Serializable // random node selection for fake load balancing String currentNode = nodes[Stress.randomizer.nextInt(nodes.length)]; - TSocket socket = new TSocket(currentNode, port); - TTransport transport = transportFactory.getTransport(socket); - CassandraClient client = new CassandraClient(new TBinaryProtocol(transport)); - try { + TTransport transport = transportFactory.openTransport(currentNode, port); + CassandraClient client = new CassandraClient(new TBinaryProtocol(transport)); + if (!transport.isOpen()) transport.open(); @@ -771,6 +797,7 @@ public class Session implements Serializable AuthenticationRequest authenticationRequest = new AuthenticationRequest(credentials); client.login(authenticationRequest); } + return client; } catch (AuthenticationException e) { @@ -788,8 +815,6 @@ public class Session implements Serializable { throw new RuntimeException(e.getMessage()); } - - return client; } public SimpleClient getNativeClient()