Repository: cassandra Updated Branches: refs/heads/trunk e856d8f81 -> 6d15a6da7
Extract LoaderOptions to be able to be used from outside patch by Eric Fenderbosch; reviewed by yukim for CASSANDRA-10637 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6d15a6da Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6d15a6da Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6d15a6da Branch: refs/heads/trunk Commit: 6d15a6da7da8fc41f97d4346d65f6bf9d0e1637f Parents: e856d8f Author: Eric Fenderbosch <eric.fenderbo...@segmint.com> Authored: Mon Nov 2 14:50:35 2015 -0500 Committer: Yuki Morishita <yu...@apache.org> Committed: Wed Feb 24 11:35:39 2016 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/tools/BulkLoadException.java | 13 + .../org/apache/cassandra/tools/BulkLoader.java | 415 ++------------ .../apache/cassandra/tools/LoaderOptions.java | 537 +++++++++++++++++++ 4 files changed, 592 insertions(+), 374 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d15a6da/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 1d8664c..3aa62ae 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.4 + * Extract LoaderOptions to be able to be used from outside (CASSANDRA-10637) * fix OnDiskIndexTest to properly treat empty ranges (CASSANDRA-11205) * fix TrackerTest to handle new notifications (CASSANDRA-11178) * add SASI validation for partitioner and complex columns (CASSANDRA-11169) http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d15a6da/src/java/org/apache/cassandra/tools/BulkLoadException.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/BulkLoadException.java b/src/java/org/apache/cassandra/tools/BulkLoadException.java new file mode 100644 index 0000000..fb5d459 --- /dev/null +++ b/src/java/org/apache/cassandra/tools/BulkLoadException.java @@ -0,0 +1,13 @@ +package org.apache.cassandra.tools; + +public class BulkLoadException extends Exception +{ + + private static final long serialVersionUID = 1L; + + public BulkLoadException(Throwable cause) + { + super(cause); + } + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d15a6da/src/java/org/apache/cassandra/tools/BulkLoader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/BulkLoader.java b/src/java/org/apache/cassandra/tools/BulkLoader.java index 63caae1..f19924e 100644 --- a/src/java/org/apache/cassandra/tools/BulkLoader.java +++ b/src/java/org/apache/cassandra/tools/BulkLoader.java @@ -17,26 +17,22 @@ */ package org.apache.cassandra.tools; -import java.io.File; import java.io.IOException; -import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; import java.net.InetAddress; -import java.net.MalformedURLException; -import java.net.UnknownHostException; -import java.util.*; - -import com.google.common.collect.HashMultimap; -import com.google.common.collect.Multimap; -import org.apache.commons.cli.*; +import java.util.Set; +import javax.net.ssl.SSLContext; import com.datastax.driver.core.AuthProvider; import com.datastax.driver.core.JdkSSLOptions; -import com.datastax.driver.core.PlainTextAuthProvider; import com.datastax.driver.core.SSLOptions; -import javax.net.ssl.SSLContext; -import org.apache.cassandra.config.*; -import org.apache.cassandra.exceptions.ConfigurationException; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; + +import org.apache.cassandra.config.Config; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.EncryptionOptions; import org.apache.cassandra.io.sstable.SSTableLoader; import org.apache.cassandra.security.SSLFactory; import org.apache.cassandra.streaming.*; @@ -46,35 +42,15 @@ import org.apache.cassandra.utils.OutputHandler; public class BulkLoader { - private static final String TOOL_NAME = "sstableloader"; - private static final String VERBOSE_OPTION = "verbose"; - private static final String HELP_OPTION = "help"; - private static final String NOPROGRESS_OPTION = "no-progress"; - private static final String IGNORE_NODES_OPTION = "ignore"; - private static final String INITIAL_HOST_ADDRESS_OPTION = "nodes"; - private static final String NATIVE_PORT_OPTION = "port"; - private static final String USER_OPTION = "username"; - private static final String PASSWD_OPTION = "password"; - private static final String AUTH_PROVIDER_OPTION = "auth-provider"; - private static final String THROTTLE_MBITS = "throttle"; - private static final String INTER_DC_THROTTLE_MBITS = "inter-dc-throttle"; - - /* client encryption options */ - private static final String SSL_TRUSTSTORE = "truststore"; - private static final String SSL_TRUSTSTORE_PW = "truststore-password"; - private static final String SSL_KEYSTORE = "keystore"; - private static final String SSL_KEYSTORE_PW = "keystore-password"; - private static final String SSL_PROTOCOL = "ssl-protocol"; - private static final String SSL_ALGORITHM = "ssl-alg"; - private static final String SSL_STORE_TYPE = "store-type"; - private static final String SSL_CIPHER_SUITES = "ssl-ciphers"; - private static final String CONNECTIONS_PER_HOST = "connections-per-host"; - private static final String CONFIG_PATH = "conf-path"; - - public static void main(String args[]) + public static void main(String args[]) throws BulkLoadException + { + LoaderOptions options = LoaderOptions.builder().parseArgs(args).build(); + load(options); + } + + public static void load(LoaderOptions options) throws BulkLoadException { Config.setClientMode(true); - LoaderOptions options = LoaderOptions.parseArgs(args).validateArguments(); OutputHandler handler = new OutputHandler.SystemOutput(options.verbose, options.debug); SSTableLoader loader = new SSTableLoader( options.directory, @@ -86,8 +62,8 @@ public class BulkLoader options.sslStoragePort, options.serverEncOptions, buildSSLOptions((EncryptionOptions.ClientEncryptionOptions)options.encOptions)), - handler, - options.connectionsPerHost); + handler, + options.connectionsPerHost); DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(options.throttle); DatabaseDescriptor.setInterDCStreamThroughputOutboundMegabitsPerSec(options.interDcThrottle); StreamResultFuture future = null; @@ -110,9 +86,11 @@ public class BulkLoader JVMStabilityInspector.inspectThrowable(e); System.err.println(e.getMessage()); if (e.getCause() != null) + { System.err.println(e.getCause()); + } e.printStackTrace(System.err); - System.exit(1); + throw new BulkLoadException(e); } try @@ -120,18 +98,20 @@ public class BulkLoader future.get(); if (!options.noProgress) + { indicator.printSummary(options.connectionsPerHost); + } // Give sockets time to gracefully close Thread.sleep(1000); - System.exit(0); // We need that to stop non daemonized threads + // System.exit(0); // We need that to stop non daemonized threads } catch (Exception e) { System.err.println("Streaming to the following hosts failed:"); System.err.println(loader.getFailedHosts()); e.printStackTrace(System.err); - System.exit(1); + throw new BulkLoadException(e); } } @@ -196,14 +176,16 @@ public class BulkLoader long current = 0; int completed = 0; - if (progressInfo != null && session.peer.equals(progressInfo.peer) && (session.sessionIndex == progressInfo.sessionIndex)) + if (progressInfo != null && session.peer.equals(progressInfo.peer) && session.sessionIndex == progressInfo.sessionIndex) { session.updateProgress(progressInfo); } for (ProgressInfo progress : session.getSendingFiles()) { if (progress.isCompleted()) + { completed++; + } current += progress.currentBytes; } totalProgress += current; @@ -215,7 +197,9 @@ public class BulkLoader sb.append(" ").append(String.format("%-3d", size == 0 ? 100L : current * 100L / size)).append("% "); if (updateTotalFiles) + { totalFiles += session.getTotalFilesToSend(); + } } } @@ -225,9 +209,11 @@ public class BulkLoader sb.append("total: ").append(totalSize == 0 ? 100L : totalProgress * 100L / totalSize).append("% "); sb.append(String.format("%-3d", mbPerSec(deltaProgress, deltaTime))).append("MB/s"); - int average = mbPerSec(totalProgress, (time - start)); + int average = mbPerSec(totalProgress, time - start); if (average > peak) + { peak = average; + } sb.append("(avg: ").append(average).append(" MB/s)"); System.out.print(sb.toString()); @@ -236,15 +222,15 @@ public class BulkLoader private int mbPerSec(long bytes, long timeInNano) { - double bytesPerNano = ((double)bytes) / timeInNano; - return (int)((bytesPerNano * 1000 * 1000 * 1000) / (1024 * 1024)); + double bytesPerNano = (double)bytes / timeInNano; + return (int)(bytesPerNano * 1000 * 1000 * 1000 / (1024 * 1024)); } private void printSummary(int connectionsPerHost) { long end = System.nanoTime(); - long durationMS = ((end - start) / (1000000)); - int average = mbPerSec(lastProgress, (end - start)); + long durationMS = (end - start) / 1000000; + int average = mbPerSec(lastProgress, end - start); StringBuilder sb = new StringBuilder(); sb.append("\nSummary statistics: \n"); sb.append(String.format(" %-30s: %-10d%n", "Connections per host: ", connectionsPerHost)); @@ -261,7 +247,9 @@ public class BulkLoader { if (!clientEncryptionOptions.enabled) + { return null; + } SSLContext sslContext; try @@ -296,7 +284,7 @@ public class BulkLoader super(hosts, port, authProvider, sslOptions); this.storagePort = storagePort; this.sslStoragePort = sslStoragePort; - this.serverEncOptions = serverEncryptionOptions; + serverEncOptions = serverEncryptionOptions; } @Override @@ -306,327 +294,6 @@ public class BulkLoader } } - static class LoaderOptions - { - public final File directory; - - public boolean debug; - public boolean verbose; - public boolean noProgress; - public int nativePort = 9042; - public String user; - public String passwd; - public String authProviderName; - public AuthProvider authProvider; - public int throttle = 0; - public int interDcThrottle = 0; - public int storagePort; - public int sslStoragePort; - public EncryptionOptions encOptions = new EncryptionOptions.ClientEncryptionOptions(); - public int connectionsPerHost = 1; - public EncryptionOptions.ServerEncryptionOptions serverEncOptions = new EncryptionOptions.ServerEncryptionOptions(); - - public final Set<InetAddress> hosts = new HashSet<>(); - public final Set<InetAddress> ignores = new HashSet<>(); - - LoaderOptions(File directory) - { - this.directory = directory; - } - - public static LoaderOptions parseArgs(String cmdArgs[]) - { - CommandLineParser parser = new GnuParser(); - CmdLineOptions options = getCmdLineOptions(); - try - { - CommandLine cmd = parser.parse(options, cmdArgs, false); - - if (cmd.hasOption(HELP_OPTION)) - { - printUsage(options); - System.exit(0); - } - - String[] args = cmd.getArgs(); - if (args.length == 0) - { - System.err.println("Missing sstable directory argument"); - printUsage(options); - System.exit(1); - } - - if (args.length > 1) - { - System.err.println("Too many arguments"); - printUsage(options); - System.exit(1); - } - - String dirname = args[0]; - File dir = new File(dirname); - - if (!dir.exists()) - errorMsg("Unknown directory: " + dirname, options); - - if (!dir.isDirectory()) - errorMsg(dirname + " is not a directory", options); - - LoaderOptions opts = new LoaderOptions(dir); - - opts.verbose = cmd.hasOption(VERBOSE_OPTION); - opts.noProgress = cmd.hasOption(NOPROGRESS_OPTION); - - if (cmd.hasOption(NATIVE_PORT_OPTION)) - opts.nativePort = Integer.parseInt(cmd.getOptionValue(NATIVE_PORT_OPTION)); - - if (cmd.hasOption(USER_OPTION)) - opts.user = cmd.getOptionValue(USER_OPTION); - - if (cmd.hasOption(PASSWD_OPTION)) - opts.passwd = cmd.getOptionValue(PASSWD_OPTION); - - if (cmd.hasOption(AUTH_PROVIDER_OPTION)) - opts.authProviderName = cmd.getOptionValue(AUTH_PROVIDER_OPTION); - - if (cmd.hasOption(INITIAL_HOST_ADDRESS_OPTION)) - { - String[] nodes = cmd.getOptionValue(INITIAL_HOST_ADDRESS_OPTION).split(","); - try - { - for (String node : nodes) - { - opts.hosts.add(InetAddress.getByName(node.trim())); - } - } - catch (UnknownHostException e) - { - errorMsg("Unknown host: " + e.getMessage(), options); - } - - } - else - { - System.err.println("Initial hosts must be specified (-d)"); - printUsage(options); - System.exit(1); - } - - if (cmd.hasOption(IGNORE_NODES_OPTION)) - { - String[] nodes = cmd.getOptionValue(IGNORE_NODES_OPTION).split(","); - try - { - for (String node : nodes) - { - opts.ignores.add(InetAddress.getByName(node.trim())); - } - } - catch (UnknownHostException e) - { - errorMsg("Unknown host: " + e.getMessage(), options); - } - } - - if (cmd.hasOption(CONNECTIONS_PER_HOST)) - opts.connectionsPerHost = Integer.parseInt(cmd.getOptionValue(CONNECTIONS_PER_HOST)); - - // try to load config file first, so that values can be rewritten with other option values. - // otherwise use default config. - Config config; - if (cmd.hasOption(CONFIG_PATH)) - { - File configFile = new File(cmd.getOptionValue(CONFIG_PATH)); - if (!configFile.exists()) - { - errorMsg("Config file not found", options); - } - config = new YamlConfigurationLoader().loadConfig(configFile.toURI().toURL()); - } - else - { - config = new Config(); - // unthrottle stream by default - config.stream_throughput_outbound_megabits_per_sec = 0; - config.inter_dc_stream_throughput_outbound_megabits_per_sec = 0; - } - opts.storagePort = config.storage_port; - opts.sslStoragePort = config.ssl_storage_port; - opts.throttle = config.stream_throughput_outbound_megabits_per_sec; - opts.interDcThrottle = config.inter_dc_stream_throughput_outbound_megabits_per_sec; - opts.encOptions = config.client_encryption_options; - opts.serverEncOptions = config.server_encryption_options; - - if (cmd.hasOption(THROTTLE_MBITS)) - { - opts.throttle = Integer.parseInt(cmd.getOptionValue(THROTTLE_MBITS)); - } - - if (cmd.hasOption(INTER_DC_THROTTLE_MBITS)) - { - opts.interDcThrottle = Integer.parseInt(cmd.getOptionValue(INTER_DC_THROTTLE_MBITS)); - } - - if (cmd.hasOption(SSL_TRUSTSTORE)) - { - opts.encOptions.truststore = cmd.getOptionValue(SSL_TRUSTSTORE); - } - - if (cmd.hasOption(SSL_TRUSTSTORE_PW)) - { - opts.encOptions.truststore_password = cmd.getOptionValue(SSL_TRUSTSTORE_PW); - } - - if (cmd.hasOption(SSL_KEYSTORE)) - { - opts.encOptions.keystore = cmd.getOptionValue(SSL_KEYSTORE); - // if a keystore was provided, lets assume we'll need to use it - opts.encOptions.require_client_auth = true; - } - - if (cmd.hasOption(SSL_KEYSTORE_PW)) - { - opts.encOptions.keystore_password = cmd.getOptionValue(SSL_KEYSTORE_PW); - } - - if (cmd.hasOption(SSL_PROTOCOL)) - { - opts.encOptions.protocol = cmd.getOptionValue(SSL_PROTOCOL); - } - - if (cmd.hasOption(SSL_ALGORITHM)) - { - opts.encOptions.algorithm = cmd.getOptionValue(SSL_ALGORITHM); - } - - if (cmd.hasOption(SSL_STORE_TYPE)) - { - opts.encOptions.store_type = cmd.getOptionValue(SSL_STORE_TYPE); - } - - if (cmd.hasOption(SSL_CIPHER_SUITES)) - { - opts.encOptions.cipher_suites = cmd.getOptionValue(SSL_CIPHER_SUITES).split(","); - } - - return opts; - } - catch (ParseException | ConfigurationException | MalformedURLException e) - { - errorMsg(e.getMessage(), options); - return null; - } - } - - public LoaderOptions validateArguments() - { - // Both username and password need to be provided - if ((user != null) != (passwd != null)) - errorMsg("Username and password must both be provided", getCmdLineOptions()); - - if (user != null) - { - // Support for 3rd party auth providers that support plain text credentials. - // In this case the auth provider must provide a constructor of the form: - // - // public MyAuthProvider(String username, String password) - if (authProviderName != null) - { - try - { - Class authProviderClass = Class.forName(authProviderName); - Constructor constructor = authProviderClass.getConstructor(String.class, String.class); - authProvider = (AuthProvider)constructor.newInstance(user, passwd); - } - catch (ClassNotFoundException e) - { - errorMsg("Unknown auth provider: " + e.getMessage(), getCmdLineOptions()); - } - catch (NoSuchMethodException e) - { - errorMsg("Auth provider does not support plain text credentials: " + e.getMessage(), getCmdLineOptions()); - } - catch (InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) - { - errorMsg("Could not create auth provider with plain text credentials: " + e.getMessage(), getCmdLineOptions()); - } - } - else - { - // If a 3rd party auth provider wasn't provided use the driver plain text provider - authProvider = new PlainTextAuthProvider(user, passwd); - } - } - // Alternate support for 3rd party auth providers that don't use plain text credentials. - // In this case the auth provider must provide a nullary constructor of the form: - // - // public MyAuthProvider() - else if (authProviderName != null) - { - try - { - authProvider = (AuthProvider)Class.forName(authProviderName).newInstance(); - } - catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) - { - errorMsg("Unknown auth provider" + e.getMessage(), getCmdLineOptions()); - } - } - - return this; - } - - private static void errorMsg(String msg, CmdLineOptions options) - { - System.err.println(msg); - printUsage(options); - System.exit(1); - } - - private static CmdLineOptions getCmdLineOptions() - { - CmdLineOptions options = new CmdLineOptions(); - options.addOption("v", VERBOSE_OPTION, "verbose output"); - options.addOption("h", HELP_OPTION, "display this help message"); - options.addOption(null, NOPROGRESS_OPTION, "don't display progress"); - options.addOption("i", IGNORE_NODES_OPTION, "NODES", "don't stream to this (comma separated) list of nodes"); - options.addOption("d", INITIAL_HOST_ADDRESS_OPTION, "initial hosts", "Required. try to connect to these hosts (comma separated) initially for ring information"); - options.addOption("p", NATIVE_PORT_OPTION, "rpc port", "port used for native connection (default 9042)"); - options.addOption("t", THROTTLE_MBITS, "throttle", "throttle speed in Mbits (default unlimited)"); - options.addOption("idct", INTER_DC_THROTTLE_MBITS, "inter-dc-throttle", "inter-datacenter throttle speed in Mbits (default unlimited)"); - options.addOption("u", USER_OPTION, "username", "username for cassandra authentication"); - options.addOption("pw", PASSWD_OPTION, "password", "password for cassandra authentication"); - options.addOption("ap", AUTH_PROVIDER_OPTION, "auth provider", "custom AuthProvider class name for cassandra authentication"); - options.addOption("cph", CONNECTIONS_PER_HOST, "connectionsPerHost", "number of concurrent connections-per-host."); - // ssl connection-related options - options.addOption("ts", SSL_TRUSTSTORE, "TRUSTSTORE", "Client SSL: full path to truststore"); - options.addOption("tspw", SSL_TRUSTSTORE_PW, "TRUSTSTORE-PASSWORD", "Client SSL: password of the truststore"); - options.addOption("ks", SSL_KEYSTORE, "KEYSTORE", "Client SSL: full path to keystore"); - options.addOption("kspw", SSL_KEYSTORE_PW, "KEYSTORE-PASSWORD", "Client SSL: password of the keystore"); - options.addOption("prtcl", SSL_PROTOCOL, "PROTOCOL", "Client SSL: connections protocol to use (default: TLS)"); - options.addOption("alg", SSL_ALGORITHM, "ALGORITHM", "Client SSL: algorithm (default: SunX509)"); - options.addOption("st", SSL_STORE_TYPE, "STORE-TYPE", "Client SSL: type of store"); - options.addOption("ciphers", SSL_CIPHER_SUITES, "CIPHER-SUITES", "Client SSL: comma-separated list of encryption suites to use"); - options.addOption("f", CONFIG_PATH, "path to config file", "cassandra.yaml file path for streaming throughput and client/server SSL."); - return options; - } - - public static void printUsage(Options options) - { - String usage = String.format("%s [options] <dir_path>", TOOL_NAME); - String header = System.lineSeparator() + - "Bulk load the sstables found in the directory <dir_path> to the configured cluster." + - "The parent directories of <dir_path> are used as the target keyspace/table name. " + - "So for instance, to load an sstable named Standard1-g-1-Data.db into Keyspace1/Standard1, " + - "you will need to have the files Standard1-g-1-Data.db and Standard1-g-1-Index.db into a directory /path/to/Keyspace1/Standard1/."; - String footer = System.lineSeparator() + - "You can provide cassandra.yaml file with -f command line option to set up streaming throughput, client and server encryption options. " + - "Only stream_throughput_outbound_megabits_per_sec, inter_dc_stream_throughput_outbound_megabits_per_sec, server_encryption_options and client_encryption_options are read from yaml. " + - "You can override options read from cassandra.yaml with corresponding command line options."; - new HelpFormatter().printHelp(usage, header, options, footer); - } - } - public static class CmdLineOptions extends Options { /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d15a6da/src/java/org/apache/cassandra/tools/LoaderOptions.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/LoaderOptions.java b/src/java/org/apache/cassandra/tools/LoaderOptions.java new file mode 100644 index 0000000..6a6e575 --- /dev/null +++ b/src/java/org/apache/cassandra/tools/LoaderOptions.java @@ -0,0 +1,537 @@ +package org.apache.cassandra.tools; + +import java.io.File; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.net.*; +import java.util.HashSet; +import java.util.Set; + +import org.apache.cassandra.config.*; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.tools.BulkLoader.CmdLineOptions; + +import com.datastax.driver.core.AuthProvider; +import com.datastax.driver.core.PlainTextAuthProvider; +import org.apache.commons.cli.*; + +public class LoaderOptions +{ + + public static final String HELP_OPTION = "help"; + public static final String VERBOSE_OPTION = "verbose"; + public static final String NOPROGRESS_OPTION = "no-progress"; + public static final String NATIVE_PORT_OPTION = "port"; + public static final String USER_OPTION = "username"; + public static final String PASSWD_OPTION = "password"; + public static final String AUTH_PROVIDER_OPTION = "auth-provider"; + public static final String INITIAL_HOST_ADDRESS_OPTION = "nodes"; + public static final String IGNORE_NODES_OPTION = "ignore"; + public static final String CONNECTIONS_PER_HOST = "connections-per-host"; + public static final String CONFIG_PATH = "conf-path"; + public static final String THROTTLE_MBITS = "throttle"; + public static final String INTER_DC_THROTTLE_MBITS = "inter-dc-throttle"; + public static final String TOOL_NAME = "sstableloader"; + + /* client encryption options */ + public static final String SSL_TRUSTSTORE = "truststore"; + public static final String SSL_TRUSTSTORE_PW = "truststore-password"; + public static final String SSL_KEYSTORE = "keystore"; + public static final String SSL_KEYSTORE_PW = "keystore-password"; + public static final String SSL_PROTOCOL = "ssl-protocol"; + public static final String SSL_ALGORITHM = "ssl-alg"; + public static final String SSL_STORE_TYPE = "store-type"; + public static final String SSL_CIPHER_SUITES = "ssl-ciphers"; + + public final File directory; + public final boolean debug; + public final boolean verbose; + public final boolean noProgress; + public final int nativePort; + public final String user; + public final String passwd; + public final AuthProvider authProvider; + public final int throttle; + public final int interDcThrottle; + public final int storagePort; + public final int sslStoragePort; + public final EncryptionOptions encOptions; + public final int connectionsPerHost; + public final EncryptionOptions.ServerEncryptionOptions serverEncOptions; + public final Set<InetAddress> hosts; + public final Set<InetAddress> ignores = new HashSet<>(); + + LoaderOptions(Builder builder) + { + directory = builder.directory; + debug = builder.debug; + verbose = builder.verbose; + noProgress = builder.noProgress; + nativePort = builder.nativePort; + user = builder.user; + passwd = builder.passwd; + authProvider = builder.authProvider; + throttle = builder.throttle; + interDcThrottle = builder.interDcThrottle; + storagePort = builder.storagePort; + sslStoragePort = builder.sslStoragePort; + encOptions = builder.encOptions; + connectionsPerHost = builder.connectionsPerHost; + serverEncOptions = builder.serverEncOptions; + hosts = builder.hosts; + } + + static class Builder + { + File directory; + boolean debug; + boolean verbose; + boolean noProgress; + int nativePort = 9042; + String user; + String passwd; + String authProviderName; + AuthProvider authProvider; + int throttle = 0; + int interDcThrottle = 0; + int storagePort; + int sslStoragePort; + EncryptionOptions encOptions = new EncryptionOptions.ClientEncryptionOptions(); + int connectionsPerHost = 1; + EncryptionOptions.ServerEncryptionOptions serverEncOptions = new EncryptionOptions.ServerEncryptionOptions(); + Set<InetAddress> hosts = new HashSet<>(); + Set<InetAddress> ignores = new HashSet<>(); + + Builder() + { + // + } + + public LoaderOptions build() + { + constructAuthProvider(); + return new LoaderOptions(this); + } + + public Builder directory(File directory) + { + this.directory = directory; + return this; + } + + public Builder debug(boolean debug) + { + this.debug = debug; + return this; + } + + public Builder verbose(boolean verbose) + { + this.verbose = verbose; + return this; + } + + public Builder noProgress(boolean noProgress) + { + this.noProgress = noProgress; + return this; + } + + public Builder nativePort(int nativePort) + { + this.nativePort = nativePort; + return this; + } + + public Builder user(String user) + { + this.user = user; + return this; + } + + public Builder password(String passwd) + { + this.passwd = passwd; + return this; + } + + public Builder authProvider(AuthProvider authProvider) + { + this.authProvider = authProvider; + return this; + } + + public Builder throttle(int throttle) + { + this.throttle = throttle; + return this; + } + + public Builder interDcThrottle(int interDcThrottle) + { + this.interDcThrottle = interDcThrottle; + return this; + } + + public Builder storagePort(int storagePort) + { + this.storagePort = storagePort; + return this; + } + + public Builder sslStoragePort(int sslStoragePort) + { + this.sslStoragePort = sslStoragePort; + return this; + } + + public Builder encOptions(EncryptionOptions encOptions) + { + this.encOptions = encOptions; + return this; + } + + public Builder connectionsPerHost(int connectionsPerHost) + { + this.connectionsPerHost = connectionsPerHost; + return this; + } + + public Builder serverEncOptions(EncryptionOptions.ServerEncryptionOptions serverEncOptions) + { + this.serverEncOptions = serverEncOptions; + return this; + } + + public Builder hosts(Set<InetAddress> hosts) + { + this.hosts = hosts; + return this; + } + + public Builder host(InetAddress host) + { + hosts.add(host); + return this; + } + + public Builder ignore(Set<InetAddress> ignores) + { + this.ignores = ignores; + return this; + } + + public Builder ignore(InetAddress ignore) + { + ignores.add(ignore); + return this; + } + + public Builder parseArgs(String cmdArgs[]) + { + CommandLineParser parser = new GnuParser(); + CmdLineOptions options = getCmdLineOptions(); + try + { + CommandLine cmd = parser.parse(options, cmdArgs, false); + + if (cmd.hasOption(HELP_OPTION)) + { + printUsage(options); + System.exit(0); + } + + String[] args = cmd.getArgs(); + if (args.length == 0) + { + System.err.println("Missing sstable directory argument"); + printUsage(options); + System.exit(1); + } + + if (args.length > 1) + { + System.err.println("Too many arguments"); + printUsage(options); + System.exit(1); + } + + String dirname = args[0]; + File dir = new File(dirname); + + if (!dir.exists()) + { + errorMsg("Unknown directory: " + dirname, options); + } + + if (!dir.isDirectory()) + { + errorMsg(dirname + " is not a directory", options); + } + + directory = dir; + + verbose = cmd.hasOption(VERBOSE_OPTION); + noProgress = cmd.hasOption(NOPROGRESS_OPTION); + + if (cmd.hasOption(NATIVE_PORT_OPTION)) + { + nativePort = Integer.parseInt(cmd.getOptionValue(NATIVE_PORT_OPTION)); + } + + if (cmd.hasOption(USER_OPTION)) + { + user = cmd.getOptionValue(USER_OPTION); + } + + if (cmd.hasOption(PASSWD_OPTION)) + { + passwd = cmd.getOptionValue(PASSWD_OPTION); + } + + if (cmd.hasOption(AUTH_PROVIDER_OPTION)) + { + authProviderName = cmd.getOptionValue(AUTH_PROVIDER_OPTION); + } + + if (cmd.hasOption(INITIAL_HOST_ADDRESS_OPTION)) + { + String[] nodes = cmd.getOptionValue(INITIAL_HOST_ADDRESS_OPTION).split(","); + try + { + for (String node : nodes) + { + hosts.add(InetAddress.getByName(node.trim())); + } + } catch (UnknownHostException e) + { + errorMsg("Unknown host: " + e.getMessage(), options); + } + + } else + { + System.err.println("Initial hosts must be specified (-d)"); + printUsage(options); + System.exit(1); + } + + if (cmd.hasOption(IGNORE_NODES_OPTION)) + { + String[] nodes = cmd.getOptionValue(IGNORE_NODES_OPTION).split(","); + try + { + for (String node : nodes) + { + ignores.add(InetAddress.getByName(node.trim())); + } + } catch (UnknownHostException e) + { + errorMsg("Unknown host: " + e.getMessage(), options); + } + } + + if (cmd.hasOption(CONNECTIONS_PER_HOST)) + { + connectionsPerHost = Integer.parseInt(cmd.getOptionValue(CONNECTIONS_PER_HOST)); + } + + // try to load config file first, so that values can be + // rewritten with other option values. + // otherwise use default config. + Config config; + if (cmd.hasOption(CONFIG_PATH)) + { + File configFile = new File(cmd.getOptionValue(CONFIG_PATH)); + if (!configFile.exists()) + { + errorMsg("Config file not found", options); + } + config = new YamlConfigurationLoader().loadConfig(configFile.toURI().toURL()); + } + else + { + config = new Config(); + // unthrottle stream by default + config.stream_throughput_outbound_megabits_per_sec = 0; + config.inter_dc_stream_throughput_outbound_megabits_per_sec = 0; + } + storagePort = config.storage_port; + sslStoragePort = config.ssl_storage_port; + throttle = config.stream_throughput_outbound_megabits_per_sec; + encOptions = config.client_encryption_options; + serverEncOptions = config.server_encryption_options; + + if (cmd.hasOption(THROTTLE_MBITS)) + { + throttle = Integer.parseInt(cmd.getOptionValue(THROTTLE_MBITS)); + } + + if (cmd.hasOption(INTER_DC_THROTTLE_MBITS)) + { + interDcThrottle = Integer.parseInt(cmd.getOptionValue(INTER_DC_THROTTLE_MBITS)); + } + + if (cmd.hasOption(SSL_TRUSTSTORE)) + { + encOptions.truststore = cmd.getOptionValue(SSL_TRUSTSTORE); + } + + if (cmd.hasOption(SSL_TRUSTSTORE_PW)) + { + encOptions.truststore_password = cmd.getOptionValue(SSL_TRUSTSTORE_PW); + } + + if (cmd.hasOption(SSL_KEYSTORE)) + { + encOptions.keystore = cmd.getOptionValue(SSL_KEYSTORE); + // if a keystore was provided, lets assume we'll need to use + // it + encOptions.require_client_auth = true; + } + + if (cmd.hasOption(SSL_KEYSTORE_PW)) + { + encOptions.keystore_password = cmd.getOptionValue(SSL_KEYSTORE_PW); + } + + if (cmd.hasOption(SSL_PROTOCOL)) + { + encOptions.protocol = cmd.getOptionValue(SSL_PROTOCOL); + } + + if (cmd.hasOption(SSL_ALGORITHM)) + { + encOptions.algorithm = cmd.getOptionValue(SSL_ALGORITHM); + } + + if (cmd.hasOption(SSL_STORE_TYPE)) + { + encOptions.store_type = cmd.getOptionValue(SSL_STORE_TYPE); + } + + if (cmd.hasOption(SSL_CIPHER_SUITES)) + { + encOptions.cipher_suites = cmd.getOptionValue(SSL_CIPHER_SUITES).split(","); + } + + return this; + } + catch (ParseException | ConfigurationException | MalformedURLException e) + { + errorMsg(e.getMessage(), options); + return null; + } + } + + private void constructAuthProvider() + { + // Both username and password need to be provided + if ((user != null) != (passwd != null)) + errorMsg("Username and password must both be provided", getCmdLineOptions()); + + if (user != null) + { + // Support for 3rd party auth providers that support plain text credentials. + // In this case the auth provider must provide a constructor of the form: + // + // public MyAuthProvider(String username, String password) + if (authProviderName != null) + { + try + { + Class authProviderClass = Class.forName(authProviderName); + Constructor constructor = authProviderClass.getConstructor(String.class, String.class); + authProvider = (AuthProvider)constructor.newInstance(user, passwd); + } + catch (ClassNotFoundException e) + { + errorMsg("Unknown auth provider: " + e.getMessage(), getCmdLineOptions()); + } + catch (NoSuchMethodException e) + { + errorMsg("Auth provider does not support plain text credentials: " + e.getMessage(), getCmdLineOptions()); + } + catch (InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) + { + errorMsg("Could not create auth provider with plain text credentials: " + e.getMessage(), getCmdLineOptions()); + } + } + else + { + // If a 3rd party auth provider wasn't provided use the driver plain text provider + this.authProvider = new PlainTextAuthProvider(user, passwd); + } + } + // Alternate support for 3rd party auth providers that don't use plain text credentials. + // In this case the auth provider must provide a nullary constructor of the form: + // + // public MyAuthProvider() + else if (authProviderName != null) + { + try + { + authProvider = (AuthProvider)Class.forName(authProviderName).newInstance(); + } + catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) + { + errorMsg("Unknown auth provider: " + e.getMessage(), getCmdLineOptions()); + } + } + } + } + + public static Builder builder() + { + return new Builder(); + } + + private static void errorMsg(String msg, CmdLineOptions options) + { + System.err.println(msg); + printUsage(options); + System.exit(1); + } + + private static CmdLineOptions getCmdLineOptions() + { + CmdLineOptions options = new CmdLineOptions(); + options.addOption("v", VERBOSE_OPTION, "verbose output"); + options.addOption("h", HELP_OPTION, "display this help message"); + options.addOption(null, NOPROGRESS_OPTION, "don't display progress"); + options.addOption("i", IGNORE_NODES_OPTION, "NODES", "don't stream to this (comma separated) list of nodes"); + options.addOption("d", INITIAL_HOST_ADDRESS_OPTION, "initial hosts", "Required. try to connect to these hosts (comma separated) initially for ring information"); + options.addOption("p", NATIVE_PORT_OPTION, "rpc port", "port used for native connection (default 9042)"); + options.addOption("t", THROTTLE_MBITS, "throttle", "throttle speed in Mbits (default unlimited)"); + options.addOption("idct", INTER_DC_THROTTLE_MBITS, "inter-dc-throttle", "inter-datacenter throttle speed in Mbits (default unlimited)"); + options.addOption("u", USER_OPTION, "username", "username for cassandra authentication"); + options.addOption("pw", PASSWD_OPTION, "password", "password for cassandra authentication"); + options.addOption("ap", AUTH_PROVIDER_OPTION, "auth provider", "custom AuthProvider class name for cassandra authentication"); + options.addOption("cph", CONNECTIONS_PER_HOST, "connectionsPerHost", "number of concurrent connections-per-host."); + // ssl connection-related options + options.addOption("ts", SSL_TRUSTSTORE, "TRUSTSTORE", "Client SSL: full path to truststore"); + options.addOption("tspw", SSL_TRUSTSTORE_PW, "TRUSTSTORE-PASSWORD", "Client SSL: password of the truststore"); + options.addOption("ks", SSL_KEYSTORE, "KEYSTORE", "Client SSL: full path to keystore"); + options.addOption("kspw", SSL_KEYSTORE_PW, "KEYSTORE-PASSWORD", "Client SSL: password of the keystore"); + options.addOption("prtcl", SSL_PROTOCOL, "PROTOCOL", "Client SSL: connections protocol to use (default: TLS)"); + options.addOption("alg", SSL_ALGORITHM, "ALGORITHM", "Client SSL: algorithm (default: SunX509)"); + options.addOption("st", SSL_STORE_TYPE, "STORE-TYPE", "Client SSL: type of store"); + options.addOption("ciphers", SSL_CIPHER_SUITES, "CIPHER-SUITES", "Client SSL: comma-separated list of encryption suites to use"); + options.addOption("f", CONFIG_PATH, "path to config file", "cassandra.yaml file path for streaming throughput and client/server SSL."); + return options; + } + + public static void printUsage(Options options) + { + String usage = String.format("%s [options] <dir_path>", TOOL_NAME); + String header = System.lineSeparator() + + "Bulk load the sstables found in the directory <dir_path> to the configured cluster." + + "The parent directories of <dir_path> are used as the target keyspace/table name. " + + "So for instance, to load an sstable named Standard1-g-1-Data.db into Keyspace1/Standard1, " + + "you will need to have the files Standard1-g-1-Data.db and Standard1-g-1-Index.db into a directory /path/to/Keyspace1/Standard1/."; + String footer = System.lineSeparator() + + "You can provide cassandra.yaml file with -f command line option to set up streaming throughput, client and server encryption options. " + + "Only stream_throughput_outbound_megabits_per_sec, server_encryption_options and client_encryption_options are read from yaml. " + + "You can override options read from cassandra.yaml with corresponding command line options."; + new HelpFormatter().printHelp(usage, header, options, footer); + } +} \ No newline at end of file