Repository: cassandra
Updated Branches:
  refs/heads/trunk e856d8f81 -> 6d15a6da7


Extract LoaderOptions to be able to be used from outside

patch by Eric Fenderbosch; reviewed by yukim for CASSANDRA-10637


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6d15a6da
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6d15a6da
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6d15a6da

Branch: refs/heads/trunk
Commit: 6d15a6da7da8fc41f97d4346d65f6bf9d0e1637f
Parents: e856d8f
Author: Eric Fenderbosch <eric.fenderbo...@segmint.com>
Authored: Mon Nov 2 14:50:35 2015 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Feb 24 11:35:39 2016 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/tools/BulkLoadException.java      |  13 +
 .../org/apache/cassandra/tools/BulkLoader.java  | 415 ++------------
 .../apache/cassandra/tools/LoaderOptions.java   | 537 +++++++++++++++++++
 4 files changed, 592 insertions(+), 374 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d15a6da/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1d8664c..3aa62ae 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.4
+ * Extract LoaderOptions to be able to be used from outside (CASSANDRA-10637)
  * fix OnDiskIndexTest to properly treat empty ranges (CASSANDRA-11205)
  * fix TrackerTest to handle new notifications (CASSANDRA-11178)
  * add SASI validation for partitioner and complex columns (CASSANDRA-11169)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d15a6da/src/java/org/apache/cassandra/tools/BulkLoadException.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/BulkLoadException.java 
b/src/java/org/apache/cassandra/tools/BulkLoadException.java
new file mode 100644
index 0000000..fb5d459
--- /dev/null
+++ b/src/java/org/apache/cassandra/tools/BulkLoadException.java
@@ -0,0 +1,13 @@
+package org.apache.cassandra.tools;
+
+public class BulkLoadException extends Exception
+{
+
+    private static final long serialVersionUID = 1L;
+
+    public BulkLoadException(Throwable cause)
+    {
+        super(cause);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d15a6da/src/java/org/apache/cassandra/tools/BulkLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/BulkLoader.java 
b/src/java/org/apache/cassandra/tools/BulkLoader.java
index 63caae1..f19924e 100644
--- a/src/java/org/apache/cassandra/tools/BulkLoader.java
+++ b/src/java/org/apache/cassandra/tools/BulkLoader.java
@@ -17,26 +17,22 @@
  */
 package org.apache.cassandra.tools;
 
-import java.io.File;
 import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
 import java.net.InetAddress;
-import java.net.MalformedURLException;
-import java.net.UnknownHostException;
-import java.util.*;
-
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
-import org.apache.commons.cli.*;
+import java.util.Set;
+import javax.net.ssl.SSLContext;
 
 import com.datastax.driver.core.AuthProvider;
 import com.datastax.driver.core.JdkSSLOptions;
-import com.datastax.driver.core.PlainTextAuthProvider;
 import com.datastax.driver.core.SSLOptions;
-import javax.net.ssl.SSLContext;
-import org.apache.cassandra.config.*;
-import org.apache.cassandra.exceptions.ConfigurationException;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.EncryptionOptions;
 import org.apache.cassandra.io.sstable.SSTableLoader;
 import org.apache.cassandra.security.SSLFactory;
 import org.apache.cassandra.streaming.*;
@@ -46,35 +42,15 @@ import org.apache.cassandra.utils.OutputHandler;
 
 public class BulkLoader
 {
-    private static final String TOOL_NAME = "sstableloader";
-    private static final String VERBOSE_OPTION  = "verbose";
-    private static final String HELP_OPTION  = "help";
-    private static final String NOPROGRESS_OPTION  = "no-progress";
-    private static final String IGNORE_NODES_OPTION  = "ignore";
-    private static final String INITIAL_HOST_ADDRESS_OPTION = "nodes";
-    private static final String NATIVE_PORT_OPTION = "port";
-    private static final String USER_OPTION = "username";
-    private static final String PASSWD_OPTION = "password";
-    private static final String AUTH_PROVIDER_OPTION = "auth-provider";
-    private static final String THROTTLE_MBITS = "throttle";
-    private static final String INTER_DC_THROTTLE_MBITS = "inter-dc-throttle";
-
-    /* client encryption options */
-    private static final String SSL_TRUSTSTORE = "truststore";
-    private static final String SSL_TRUSTSTORE_PW = "truststore-password";
-    private static final String SSL_KEYSTORE = "keystore";
-    private static final String SSL_KEYSTORE_PW = "keystore-password";
-    private static final String SSL_PROTOCOL = "ssl-protocol";
-    private static final String SSL_ALGORITHM = "ssl-alg";
-    private static final String SSL_STORE_TYPE = "store-type";
-    private static final String SSL_CIPHER_SUITES = "ssl-ciphers";
-    private static final String CONNECTIONS_PER_HOST = "connections-per-host";
-    private static final String CONFIG_PATH = "conf-path";
-
-    public static void main(String args[])
+    public static void main(String args[]) throws BulkLoadException
+    {
+        LoaderOptions options = 
LoaderOptions.builder().parseArgs(args).build();
+        load(options);
+    }
+
+    public static void load(LoaderOptions options) throws BulkLoadException
     {
         Config.setClientMode(true);
-        LoaderOptions options = 
LoaderOptions.parseArgs(args).validateArguments();
         OutputHandler handler = new 
OutputHandler.SystemOutput(options.verbose, options.debug);
         SSTableLoader loader = new SSTableLoader(
                 options.directory,
@@ -86,8 +62,8 @@ public class BulkLoader
                         options.sslStoragePort,
                         options.serverEncOptions,
                         
buildSSLOptions((EncryptionOptions.ClientEncryptionOptions)options.encOptions)),
-                handler,
-                options.connectionsPerHost);
+                        handler,
+                        options.connectionsPerHost);
         
DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(options.throttle);
         
DatabaseDescriptor.setInterDCStreamThroughputOutboundMegabitsPerSec(options.interDcThrottle);
         StreamResultFuture future = null;
@@ -110,9 +86,11 @@ public class BulkLoader
             JVMStabilityInspector.inspectThrowable(e);
             System.err.println(e.getMessage());
             if (e.getCause() != null)
+            {
                 System.err.println(e.getCause());
+            }
             e.printStackTrace(System.err);
-            System.exit(1);
+            throw new BulkLoadException(e);
         }
 
         try
@@ -120,18 +98,20 @@ public class BulkLoader
             future.get();
 
             if (!options.noProgress)
+            {
                 indicator.printSummary(options.connectionsPerHost);
+            }
 
             // Give sockets time to gracefully close
             Thread.sleep(1000);
-            System.exit(0); // We need that to stop non daemonized threads
+            // System.exit(0); // We need that to stop non daemonized threads
         }
         catch (Exception e)
         {
             System.err.println("Streaming to the following hosts failed:");
             System.err.println(loader.getFailedHosts());
             e.printStackTrace(System.err);
-            System.exit(1);
+            throw new BulkLoadException(e);
         }
     }
 
@@ -196,14 +176,16 @@ public class BulkLoader
                         long current = 0;
                         int completed = 0;
 
-                        if (progressInfo != null && 
session.peer.equals(progressInfo.peer) && (session.sessionIndex == 
progressInfo.sessionIndex))
+                        if (progressInfo != null && 
session.peer.equals(progressInfo.peer) && session.sessionIndex == 
progressInfo.sessionIndex)
                         {
                             session.updateProgress(progressInfo);
                         }
                         for (ProgressInfo progress : session.getSendingFiles())
                         {
                             if (progress.isCompleted())
+                            {
                                 completed++;
+                            }
                             current += progress.currentBytes;
                         }
                         totalProgress += current;
@@ -215,7 +197,9 @@ public class BulkLoader
                         sb.append(" ").append(String.format("%-3d", size == 0 
? 100L : current * 100L / size)).append("% ");
 
                         if (updateTotalFiles)
+                        {
                             totalFiles += session.getTotalFilesToSend();
+                        }
                     }
                 }
 
@@ -225,9 +209,11 @@ public class BulkLoader
 
                 sb.append("total: ").append(totalSize == 0 ? 100L : 
totalProgress * 100L / totalSize).append("% ");
                 sb.append(String.format("%-3d", mbPerSec(deltaProgress, 
deltaTime))).append("MB/s");
-                int average = mbPerSec(totalProgress, (time - start));
+                int average = mbPerSec(totalProgress, time - start);
                 if (average > peak)
+                {
                     peak = average;
+                }
                 sb.append("(avg: ").append(average).append(" MB/s)");
 
                 System.out.print(sb.toString());
@@ -236,15 +222,15 @@ public class BulkLoader
 
         private int mbPerSec(long bytes, long timeInNano)
         {
-            double bytesPerNano = ((double)bytes) / timeInNano;
-            return (int)((bytesPerNano * 1000 * 1000 * 1000) / (1024 * 1024));
+            double bytesPerNano = (double)bytes / timeInNano;
+            return (int)(bytesPerNano * 1000 * 1000 * 1000 / (1024 * 1024));
         }
 
         private void printSummary(int connectionsPerHost)
         {
             long end = System.nanoTime();
-            long durationMS = ((end - start) / (1000000));
-            int average = mbPerSec(lastProgress, (end - start));
+            long durationMS = (end - start) / 1000000;
+            int average = mbPerSec(lastProgress, end - start);
             StringBuilder sb = new StringBuilder();
             sb.append("\nSummary statistics: \n");
             sb.append(String.format("   %-30s: %-10d%n", "Connections per 
host: ", connectionsPerHost));
@@ -261,7 +247,9 @@ public class BulkLoader
     {
 
         if (!clientEncryptionOptions.enabled)
+        {
             return null;
+        }
 
         SSLContext sslContext;
         try
@@ -296,7 +284,7 @@ public class BulkLoader
             super(hosts, port, authProvider, sslOptions);
             this.storagePort = storagePort;
             this.sslStoragePort = sslStoragePort;
-            this.serverEncOptions = serverEncryptionOptions;
+            serverEncOptions = serverEncryptionOptions;
         }
 
         @Override
@@ -306,327 +294,6 @@ public class BulkLoader
         }
     }
 
-    static class LoaderOptions
-    {
-        public final File directory;
-
-        public boolean debug;
-        public boolean verbose;
-        public boolean noProgress;
-        public int nativePort = 9042;
-        public String user;
-        public String passwd;
-        public String authProviderName;
-        public AuthProvider authProvider;
-        public int throttle = 0;
-        public int interDcThrottle = 0;
-        public int storagePort;
-        public int sslStoragePort;
-        public EncryptionOptions encOptions = new 
EncryptionOptions.ClientEncryptionOptions();
-        public int connectionsPerHost = 1;
-        public EncryptionOptions.ServerEncryptionOptions serverEncOptions = 
new EncryptionOptions.ServerEncryptionOptions();
-
-        public final Set<InetAddress> hosts = new HashSet<>();
-        public final Set<InetAddress> ignores = new HashSet<>();
-
-        LoaderOptions(File directory)
-        {
-            this.directory = directory;
-        }
-
-        public static LoaderOptions parseArgs(String cmdArgs[])
-        {
-            CommandLineParser parser = new GnuParser();
-            CmdLineOptions options = getCmdLineOptions();
-            try
-            {
-                CommandLine cmd = parser.parse(options, cmdArgs, false);
-
-                if (cmd.hasOption(HELP_OPTION))
-                {
-                    printUsage(options);
-                    System.exit(0);
-                }
-
-                String[] args = cmd.getArgs();
-                if (args.length == 0)
-                {
-                    System.err.println("Missing sstable directory argument");
-                    printUsage(options);
-                    System.exit(1);
-                }
-
-                if (args.length > 1)
-                {
-                    System.err.println("Too many arguments");
-                    printUsage(options);
-                    System.exit(1);
-                }
-
-                String dirname = args[0];
-                File dir = new File(dirname);
-
-                if (!dir.exists())
-                    errorMsg("Unknown directory: " + dirname, options);
-
-                if (!dir.isDirectory())
-                    errorMsg(dirname + " is not a directory", options);
-
-                LoaderOptions opts = new LoaderOptions(dir);
-
-                opts.verbose = cmd.hasOption(VERBOSE_OPTION);
-                opts.noProgress = cmd.hasOption(NOPROGRESS_OPTION);
-
-                if (cmd.hasOption(NATIVE_PORT_OPTION))
-                    opts.nativePort = 
Integer.parseInt(cmd.getOptionValue(NATIVE_PORT_OPTION));
-
-                if (cmd.hasOption(USER_OPTION))
-                    opts.user = cmd.getOptionValue(USER_OPTION);
-
-                if (cmd.hasOption(PASSWD_OPTION))
-                    opts.passwd = cmd.getOptionValue(PASSWD_OPTION);
-
-                if (cmd.hasOption(AUTH_PROVIDER_OPTION))
-                    opts.authProviderName = 
cmd.getOptionValue(AUTH_PROVIDER_OPTION);
-
-                if (cmd.hasOption(INITIAL_HOST_ADDRESS_OPTION))
-                {
-                    String[] nodes = 
cmd.getOptionValue(INITIAL_HOST_ADDRESS_OPTION).split(",");
-                    try
-                    {
-                        for (String node : nodes)
-                        {
-                            opts.hosts.add(InetAddress.getByName(node.trim()));
-                        }
-                    }
-                    catch (UnknownHostException e)
-                    {
-                        errorMsg("Unknown host: " + e.getMessage(), options);
-                    }
-
-                }
-                else
-                {
-                    System.err.println("Initial hosts must be specified (-d)");
-                    printUsage(options);
-                    System.exit(1);
-                }
-
-                if (cmd.hasOption(IGNORE_NODES_OPTION))
-                {
-                    String[] nodes = 
cmd.getOptionValue(IGNORE_NODES_OPTION).split(",");
-                    try
-                    {
-                        for (String node : nodes)
-                        {
-                            
opts.ignores.add(InetAddress.getByName(node.trim()));
-                        }
-                    }
-                    catch (UnknownHostException e)
-                    {
-                        errorMsg("Unknown host: " + e.getMessage(), options);
-                    }
-                }
-
-                if (cmd.hasOption(CONNECTIONS_PER_HOST))
-                    opts.connectionsPerHost = 
Integer.parseInt(cmd.getOptionValue(CONNECTIONS_PER_HOST));
-
-                // try to load config file first, so that values can be 
rewritten with other option values.
-                // otherwise use default config.
-                Config config;
-                if (cmd.hasOption(CONFIG_PATH))
-                {
-                    File configFile = new 
File(cmd.getOptionValue(CONFIG_PATH));
-                    if (!configFile.exists())
-                    {
-                        errorMsg("Config file not found", options);
-                    }
-                    config = new 
YamlConfigurationLoader().loadConfig(configFile.toURI().toURL());
-                }
-                else
-                {
-                    config = new Config();
-                    // unthrottle stream by default
-                    config.stream_throughput_outbound_megabits_per_sec = 0;
-                    
config.inter_dc_stream_throughput_outbound_megabits_per_sec = 0;
-                }
-                opts.storagePort = config.storage_port;
-                opts.sslStoragePort = config.ssl_storage_port;
-                opts.throttle = 
config.stream_throughput_outbound_megabits_per_sec;
-                opts.interDcThrottle = 
config.inter_dc_stream_throughput_outbound_megabits_per_sec;
-                opts.encOptions = config.client_encryption_options;
-                opts.serverEncOptions = config.server_encryption_options;
-
-                if (cmd.hasOption(THROTTLE_MBITS))
-                {
-                    opts.throttle = 
Integer.parseInt(cmd.getOptionValue(THROTTLE_MBITS));
-                }
-
-                if (cmd.hasOption(INTER_DC_THROTTLE_MBITS))
-                {
-                    opts.interDcThrottle = 
Integer.parseInt(cmd.getOptionValue(INTER_DC_THROTTLE_MBITS));
-                }
-
-                if (cmd.hasOption(SSL_TRUSTSTORE))
-                {
-                    opts.encOptions.truststore = 
cmd.getOptionValue(SSL_TRUSTSTORE);
-                }
-
-                if (cmd.hasOption(SSL_TRUSTSTORE_PW))
-                {
-                    opts.encOptions.truststore_password = 
cmd.getOptionValue(SSL_TRUSTSTORE_PW);
-                }
-
-                if (cmd.hasOption(SSL_KEYSTORE))
-                {
-                    opts.encOptions.keystore = 
cmd.getOptionValue(SSL_KEYSTORE);
-                    // if a keystore was provided, lets assume we'll need to 
use it
-                    opts.encOptions.require_client_auth = true;
-                }
-
-                if (cmd.hasOption(SSL_KEYSTORE_PW))
-                {
-                    opts.encOptions.keystore_password = 
cmd.getOptionValue(SSL_KEYSTORE_PW);
-                }
-
-                if (cmd.hasOption(SSL_PROTOCOL))
-                {
-                    opts.encOptions.protocol = 
cmd.getOptionValue(SSL_PROTOCOL);
-                }
-
-                if (cmd.hasOption(SSL_ALGORITHM))
-                {
-                    opts.encOptions.algorithm = 
cmd.getOptionValue(SSL_ALGORITHM);
-                }
-
-                if (cmd.hasOption(SSL_STORE_TYPE))
-                {
-                    opts.encOptions.store_type = 
cmd.getOptionValue(SSL_STORE_TYPE);
-                }
-
-                if (cmd.hasOption(SSL_CIPHER_SUITES))
-                {
-                    opts.encOptions.cipher_suites = 
cmd.getOptionValue(SSL_CIPHER_SUITES).split(",");
-                }
-
-                return opts;
-            }
-            catch (ParseException | ConfigurationException | 
MalformedURLException e)
-            {
-                errorMsg(e.getMessage(), options);
-                return null;
-            }
-        }
-
-        public LoaderOptions validateArguments()
-        {
-            // Both username and password need to be provided
-            if ((user != null) != (passwd != null))
-                errorMsg("Username and password must both be provided", 
getCmdLineOptions());
-
-            if (user != null)
-            {
-                // Support for 3rd party auth providers that support plain 
text credentials.
-                // In this case the auth provider must provide a constructor 
of the form:
-                //
-                // public MyAuthProvider(String username, String password)
-                if (authProviderName != null)
-                {
-                    try
-                    {
-                        Class authProviderClass = 
Class.forName(authProviderName);
-                        Constructor constructor = 
authProviderClass.getConstructor(String.class, String.class);
-                        authProvider = 
(AuthProvider)constructor.newInstance(user, passwd);
-                    }
-                    catch (ClassNotFoundException e)
-                    {
-                        errorMsg("Unknown auth provider: " + e.getMessage(), 
getCmdLineOptions());
-                    }
-                    catch (NoSuchMethodException e)
-                    {
-                        errorMsg("Auth provider does not support plain text 
credentials: " + e.getMessage(), getCmdLineOptions());
-                    }
-                    catch (InstantiationException | IllegalAccessException | 
IllegalArgumentException | InvocationTargetException e)
-                    {
-                        errorMsg("Could not create auth provider with plain 
text credentials: " + e.getMessage(), getCmdLineOptions());
-                    }
-                }
-                else
-                {
-                    // If a 3rd party auth provider wasn't provided use the 
driver plain text provider
-                    authProvider = new PlainTextAuthProvider(user, passwd);
-                }
-            }
-            // Alternate support for 3rd party auth providers that don't use 
plain text credentials.
-            // In this case the auth provider must provide a nullary 
constructor of the form:
-            //
-            // public MyAuthProvider()
-            else if (authProviderName != null)
-            {
-                try
-                {
-                    authProvider = 
(AuthProvider)Class.forName(authProviderName).newInstance();
-                }
-                catch (ClassNotFoundException | InstantiationException | 
IllegalAccessException e)
-                {
-                    errorMsg("Unknown auth provider" + e.getMessage(), 
getCmdLineOptions());
-                }
-            }
-
-            return this;
-        }
-
-        private static void errorMsg(String msg, CmdLineOptions options)
-        {
-            System.err.println(msg);
-            printUsage(options);
-            System.exit(1);
-        }
-
-        private static CmdLineOptions getCmdLineOptions()
-        {
-            CmdLineOptions options = new CmdLineOptions();
-            options.addOption("v",  VERBOSE_OPTION,      "verbose output");
-            options.addOption("h",  HELP_OPTION,         "display this help 
message");
-            options.addOption(null, NOPROGRESS_OPTION,   "don't display 
progress");
-            options.addOption("i",  IGNORE_NODES_OPTION, "NODES", "don't 
stream to this (comma separated) list of nodes");
-            options.addOption("d",  INITIAL_HOST_ADDRESS_OPTION, "initial 
hosts", "Required. try to connect to these hosts (comma separated) initially 
for ring information");
-            options.addOption("p",  NATIVE_PORT_OPTION, "rpc port", "port used 
for native connection (default 9042)");
-            options.addOption("t",  THROTTLE_MBITS, "throttle", "throttle 
speed in Mbits (default unlimited)");
-            options.addOption("idct",  INTER_DC_THROTTLE_MBITS, 
"inter-dc-throttle", "inter-datacenter throttle speed in Mbits (default 
unlimited)");
-            options.addOption("u",  USER_OPTION, "username", "username for 
cassandra authentication");
-            options.addOption("pw", PASSWD_OPTION, "password", "password for 
cassandra authentication");
-            options.addOption("ap", AUTH_PROVIDER_OPTION, "auth provider", 
"custom AuthProvider class name for cassandra authentication");
-            options.addOption("cph", CONNECTIONS_PER_HOST, 
"connectionsPerHost", "number of concurrent connections-per-host.");
-            // ssl connection-related options
-            options.addOption("ts", SSL_TRUSTSTORE, "TRUSTSTORE", "Client SSL: 
full path to truststore");
-            options.addOption("tspw", SSL_TRUSTSTORE_PW, 
"TRUSTSTORE-PASSWORD", "Client SSL: password of the truststore");
-            options.addOption("ks", SSL_KEYSTORE, "KEYSTORE", "Client SSL: 
full path to keystore");
-            options.addOption("kspw", SSL_KEYSTORE_PW, "KEYSTORE-PASSWORD", 
"Client SSL: password of the keystore");
-            options.addOption("prtcl", SSL_PROTOCOL, "PROTOCOL", "Client SSL: 
connections protocol to use (default: TLS)");
-            options.addOption("alg", SSL_ALGORITHM, "ALGORITHM", "Client SSL: 
algorithm (default: SunX509)");
-            options.addOption("st", SSL_STORE_TYPE, "STORE-TYPE", "Client SSL: 
type of store");
-            options.addOption("ciphers", SSL_CIPHER_SUITES, "CIPHER-SUITES", 
"Client SSL: comma-separated list of encryption suites to use");
-            options.addOption("f", CONFIG_PATH, "path to config file", 
"cassandra.yaml file path for streaming throughput and client/server SSL.");
-            return options;
-        }
-
-        public static void printUsage(Options options)
-        {
-            String usage = String.format("%s [options] <dir_path>", TOOL_NAME);
-            String header = System.lineSeparator() +
-                            "Bulk load the sstables found in the directory 
<dir_path> to the configured cluster." +
-                            "The parent directories of <dir_path> are used as 
the target keyspace/table name. " +
-                            "So for instance, to load an sstable named 
Standard1-g-1-Data.db into Keyspace1/Standard1, " +
-                            "you will need to have the files 
Standard1-g-1-Data.db and Standard1-g-1-Index.db into a directory 
/path/to/Keyspace1/Standard1/.";
-            String footer = System.lineSeparator() +
-                            "You can provide cassandra.yaml file with -f 
command line option to set up streaming throughput, client and server 
encryption options. " +
-                            "Only stream_throughput_outbound_megabits_per_sec, 
inter_dc_stream_throughput_outbound_megabits_per_sec, server_encryption_options 
and client_encryption_options are read from yaml. " +
-                            "You can override options read from cassandra.yaml 
with corresponding command line options.";
-            new HelpFormatter().printHelp(usage, header, options, footer);
-        }
-    }
-
     public static class CmdLineOptions extends Options
     {
         /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d15a6da/src/java/org/apache/cassandra/tools/LoaderOptions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/LoaderOptions.java 
b/src/java/org/apache/cassandra/tools/LoaderOptions.java
new file mode 100644
index 0000000..6a6e575
--- /dev/null
+++ b/src/java/org/apache/cassandra/tools/LoaderOptions.java
@@ -0,0 +1,537 @@
+package org.apache.cassandra.tools;
+
+import java.io.File;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.net.*;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.cassandra.config.*;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.tools.BulkLoader.CmdLineOptions;
+
+import com.datastax.driver.core.AuthProvider;
+import com.datastax.driver.core.PlainTextAuthProvider;
+import org.apache.commons.cli.*;
+
+public class LoaderOptions
+{
+
+    public static final String HELP_OPTION = "help";
+    public static final String VERBOSE_OPTION = "verbose";
+    public static final String NOPROGRESS_OPTION = "no-progress";
+    public static final String NATIVE_PORT_OPTION = "port";
+    public static final String USER_OPTION = "username";
+    public static final String PASSWD_OPTION = "password";
+    public static final String AUTH_PROVIDER_OPTION = "auth-provider";
+    public static final String INITIAL_HOST_ADDRESS_OPTION = "nodes";
+    public static final String IGNORE_NODES_OPTION = "ignore";
+    public static final String CONNECTIONS_PER_HOST = "connections-per-host";
+    public static final String CONFIG_PATH = "conf-path";
+    public static final String THROTTLE_MBITS = "throttle";
+    public static final String INTER_DC_THROTTLE_MBITS = "inter-dc-throttle";
+    public static final String TOOL_NAME = "sstableloader";
+
+    /* client encryption options */
+    public static final String SSL_TRUSTSTORE = "truststore";
+    public static final String SSL_TRUSTSTORE_PW = "truststore-password";
+    public static final String SSL_KEYSTORE = "keystore";
+    public static final String SSL_KEYSTORE_PW = "keystore-password";
+    public static final String SSL_PROTOCOL = "ssl-protocol";
+    public static final String SSL_ALGORITHM = "ssl-alg";
+    public static final String SSL_STORE_TYPE = "store-type";
+    public static final String SSL_CIPHER_SUITES = "ssl-ciphers";
+
+    public final File directory;
+    public final boolean debug;
+    public final boolean verbose;
+    public final boolean noProgress;
+    public final int nativePort;
+    public final String user;
+    public final String passwd;
+    public final AuthProvider authProvider;
+    public final int throttle;
+    public final int interDcThrottle;
+    public final int storagePort;
+    public final int sslStoragePort;
+    public final EncryptionOptions encOptions;
+    public final int connectionsPerHost;
+    public final EncryptionOptions.ServerEncryptionOptions serverEncOptions;
+    public final Set<InetAddress> hosts;
+    public final Set<InetAddress> ignores = new HashSet<>();
+
+    LoaderOptions(Builder builder)
+    {
+        directory = builder.directory;
+        debug = builder.debug;
+        verbose = builder.verbose;
+        noProgress = builder.noProgress;
+        nativePort = builder.nativePort;
+        user = builder.user;
+        passwd = builder.passwd;
+        authProvider = builder.authProvider;
+        throttle = builder.throttle;
+        interDcThrottle = builder.interDcThrottle;
+        storagePort = builder.storagePort;
+        sslStoragePort = builder.sslStoragePort;
+        encOptions = builder.encOptions;
+        connectionsPerHost = builder.connectionsPerHost;
+        serverEncOptions = builder.serverEncOptions;
+        hosts = builder.hosts;
+    }
+
+    static class Builder
+    {
+        File directory;
+        boolean debug;
+        boolean verbose;
+        boolean noProgress;
+        int nativePort = 9042;
+        String user;
+        String passwd;
+        String authProviderName;
+        AuthProvider authProvider;
+        int throttle = 0;
+        int interDcThrottle = 0;
+        int storagePort;
+        int sslStoragePort;
+        EncryptionOptions encOptions = new 
EncryptionOptions.ClientEncryptionOptions();
+        int connectionsPerHost = 1;
+        EncryptionOptions.ServerEncryptionOptions serverEncOptions = new 
EncryptionOptions.ServerEncryptionOptions();
+        Set<InetAddress> hosts = new HashSet<>();
+        Set<InetAddress> ignores = new HashSet<>();
+
+        Builder()
+        {
+            //
+        }
+
+        public LoaderOptions build()
+        {
+            constructAuthProvider();
+            return new LoaderOptions(this);
+        }
+
+        public Builder directory(File directory)
+        {
+            this.directory = directory;
+            return this;
+        }
+
+        public Builder debug(boolean debug)
+        {
+            this.debug = debug;
+            return this;
+        }
+
+        public Builder verbose(boolean verbose)
+        {
+            this.verbose = verbose;
+            return this;
+        }
+
+        public Builder noProgress(boolean noProgress)
+        {
+            this.noProgress = noProgress;
+            return this;
+        }
+
+        public Builder nativePort(int nativePort)
+        {
+            this.nativePort = nativePort;
+            return this;
+        }
+
+        public Builder user(String user)
+        {
+            this.user = user;
+            return this;
+        }
+
+        public Builder password(String passwd)
+        {
+            this.passwd = passwd;
+            return this;
+        }
+
+        public Builder authProvider(AuthProvider authProvider)
+        {
+            this.authProvider = authProvider;
+            return this;
+        }
+
+        public Builder throttle(int throttle)
+        {
+            this.throttle = throttle;
+            return this;
+        }
+
+        public Builder interDcThrottle(int interDcThrottle)
+        {
+            this.interDcThrottle = interDcThrottle;
+            return this;
+        }
+
+        public Builder storagePort(int storagePort)
+        {
+            this.storagePort = storagePort;
+            return this;
+        }
+
+        public Builder sslStoragePort(int sslStoragePort)
+        {
+            this.sslStoragePort = sslStoragePort;
+            return this;
+        }
+
+        public Builder encOptions(EncryptionOptions encOptions)
+        {
+            this.encOptions = encOptions;
+            return this;
+        }
+
+        public Builder connectionsPerHost(int connectionsPerHost)
+        {
+            this.connectionsPerHost = connectionsPerHost;
+            return this;
+        }
+
+        public Builder 
serverEncOptions(EncryptionOptions.ServerEncryptionOptions serverEncOptions)
+        {
+            this.serverEncOptions = serverEncOptions;
+            return this;
+        }
+
+        public Builder hosts(Set<InetAddress> hosts)
+        {
+            this.hosts = hosts;
+            return this;
+        }
+
+        public Builder host(InetAddress host)
+        {
+            hosts.add(host);
+            return this;
+        }
+
+        public Builder ignore(Set<InetAddress> ignores)
+        {
+            this.ignores = ignores;
+            return this;
+        }
+
+        public Builder ignore(InetAddress ignore)
+        {
+            ignores.add(ignore);
+            return this;
+        }
+
+        public Builder parseArgs(String cmdArgs[])
+        {
+            CommandLineParser parser = new GnuParser();
+            CmdLineOptions options = getCmdLineOptions();
+            try
+            {
+                CommandLine cmd = parser.parse(options, cmdArgs, false);
+
+                if (cmd.hasOption(HELP_OPTION))
+                {
+                    printUsage(options);
+                    System.exit(0);
+                }
+
+                String[] args = cmd.getArgs();
+                if (args.length == 0)
+                {
+                    System.err.println("Missing sstable directory argument");
+                    printUsage(options);
+                    System.exit(1);
+                }
+
+                if (args.length > 1)
+                {
+                    System.err.println("Too many arguments");
+                    printUsage(options);
+                    System.exit(1);
+                }
+
+                String dirname = args[0];
+                File dir = new File(dirname);
+
+                if (!dir.exists())
+                {
+                    errorMsg("Unknown directory: " + dirname, options);
+                }
+
+                if (!dir.isDirectory())
+                {
+                    errorMsg(dirname + " is not a directory", options);
+                }
+
+                directory = dir;
+
+                verbose = cmd.hasOption(VERBOSE_OPTION);
+                noProgress = cmd.hasOption(NOPROGRESS_OPTION);
+
+                if (cmd.hasOption(NATIVE_PORT_OPTION))
+                {
+                    nativePort = 
Integer.parseInt(cmd.getOptionValue(NATIVE_PORT_OPTION));
+                }
+
+                if (cmd.hasOption(USER_OPTION))
+                {
+                    user = cmd.getOptionValue(USER_OPTION);
+                }
+
+                if (cmd.hasOption(PASSWD_OPTION))
+                {
+                    passwd = cmd.getOptionValue(PASSWD_OPTION);
+                }
+
+                if (cmd.hasOption(AUTH_PROVIDER_OPTION))
+                {
+                    authProviderName = 
cmd.getOptionValue(AUTH_PROVIDER_OPTION);
+                }
+
+                if (cmd.hasOption(INITIAL_HOST_ADDRESS_OPTION))
+                {
+                    String[] nodes = 
cmd.getOptionValue(INITIAL_HOST_ADDRESS_OPTION).split(",");
+                    try
+                    {
+                        for (String node : nodes)
+                        {
+                            hosts.add(InetAddress.getByName(node.trim()));
+                        }
+                    } catch (UnknownHostException e)
+                    {
+                        errorMsg("Unknown host: " + e.getMessage(), options);
+                    }
+
+                } else
+                {
+                    System.err.println("Initial hosts must be specified (-d)");
+                    printUsage(options);
+                    System.exit(1);
+                }
+
+                if (cmd.hasOption(IGNORE_NODES_OPTION))
+                {
+                    String[] nodes = 
cmd.getOptionValue(IGNORE_NODES_OPTION).split(",");
+                    try
+                    {
+                        for (String node : nodes)
+                        {
+                            ignores.add(InetAddress.getByName(node.trim()));
+                        }
+                    } catch (UnknownHostException e)
+                    {
+                        errorMsg("Unknown host: " + e.getMessage(), options);
+                    }
+                }
+
+                if (cmd.hasOption(CONNECTIONS_PER_HOST))
+                {
+                    connectionsPerHost = 
Integer.parseInt(cmd.getOptionValue(CONNECTIONS_PER_HOST));
+                }
+
+                // try to load config file first, so that values can be
+                // rewritten with other option values.
+                // otherwise use default config.
+                Config config;
+                if (cmd.hasOption(CONFIG_PATH))
+                {
+                    File configFile = new 
File(cmd.getOptionValue(CONFIG_PATH));
+                    if (!configFile.exists())
+                    {
+                        errorMsg("Config file not found", options);
+                    }
+                    config = new 
YamlConfigurationLoader().loadConfig(configFile.toURI().toURL());
+                }
+                else
+                {
+                    config = new Config();
+                    // unthrottle stream by default
+                    config.stream_throughput_outbound_megabits_per_sec = 0;
+                    
config.inter_dc_stream_throughput_outbound_megabits_per_sec = 0;
+                }
+                storagePort = config.storage_port;
+                sslStoragePort = config.ssl_storage_port;
+                throttle = config.stream_throughput_outbound_megabits_per_sec;
+                encOptions = config.client_encryption_options;
+                serverEncOptions = config.server_encryption_options;
+
+                if (cmd.hasOption(THROTTLE_MBITS))
+                {
+                    throttle = 
Integer.parseInt(cmd.getOptionValue(THROTTLE_MBITS));
+                }
+
+                if (cmd.hasOption(INTER_DC_THROTTLE_MBITS))
+                {
+                    interDcThrottle = 
Integer.parseInt(cmd.getOptionValue(INTER_DC_THROTTLE_MBITS));
+                }
+
+                if (cmd.hasOption(SSL_TRUSTSTORE))
+                {
+                    encOptions.truststore = cmd.getOptionValue(SSL_TRUSTSTORE);
+                }
+
+                if (cmd.hasOption(SSL_TRUSTSTORE_PW))
+                {
+                    encOptions.truststore_password = 
cmd.getOptionValue(SSL_TRUSTSTORE_PW);
+                }
+
+                if (cmd.hasOption(SSL_KEYSTORE))
+                {
+                    encOptions.keystore = cmd.getOptionValue(SSL_KEYSTORE);
+                    // if a keystore was provided, lets assume we'll need to 
use
+                    // it
+                    encOptions.require_client_auth = true;
+                }
+
+                if (cmd.hasOption(SSL_KEYSTORE_PW))
+                {
+                    encOptions.keystore_password = 
cmd.getOptionValue(SSL_KEYSTORE_PW);
+                }
+
+                if (cmd.hasOption(SSL_PROTOCOL))
+                {
+                    encOptions.protocol = cmd.getOptionValue(SSL_PROTOCOL);
+                }
+
+                if (cmd.hasOption(SSL_ALGORITHM))
+                {
+                    encOptions.algorithm = cmd.getOptionValue(SSL_ALGORITHM);
+                }
+
+                if (cmd.hasOption(SSL_STORE_TYPE))
+                {
+                    encOptions.store_type = cmd.getOptionValue(SSL_STORE_TYPE);
+                }
+
+                if (cmd.hasOption(SSL_CIPHER_SUITES))
+                {
+                    encOptions.cipher_suites = 
cmd.getOptionValue(SSL_CIPHER_SUITES).split(",");
+                }
+
+                return this;
+            }
+            catch (ParseException | ConfigurationException | 
MalformedURLException e)
+            {
+                errorMsg(e.getMessage(), options);
+                return null;
+            }
+        }
+
+        private void constructAuthProvider()
+        {
+            // Both username and password need to be provided
+            if ((user != null) != (passwd != null))
+                errorMsg("Username and password must both be provided", 
getCmdLineOptions());
+
+            if (user != null)
+            {
+                // Support for 3rd party auth providers that support plain 
text credentials.
+                // In this case the auth provider must provide a constructor 
of the form:
+                //
+                // public MyAuthProvider(String username, String password)
+                if (authProviderName != null)
+                {
+                    try
+                    {
+                        Class authProviderClass = 
Class.forName(authProviderName);
+                        Constructor constructor = 
authProviderClass.getConstructor(String.class, String.class);
+                        authProvider = 
(AuthProvider)constructor.newInstance(user, passwd);
+                    }
+                    catch (ClassNotFoundException e)
+                    {
+                        errorMsg("Unknown auth provider: " + e.getMessage(), 
getCmdLineOptions());
+                    }
+                    catch (NoSuchMethodException e)
+                    {
+                        errorMsg("Auth provider does not support plain text 
credentials: " + e.getMessage(), getCmdLineOptions());
+                    }
+                    catch (InstantiationException | IllegalAccessException | 
IllegalArgumentException | InvocationTargetException e)
+                    {
+                        errorMsg("Could not create auth provider with plain 
text credentials: " + e.getMessage(), getCmdLineOptions());
+                    }
+                }
+                else
+                {
+                    // If a 3rd party auth provider wasn't provided use the 
driver plain text provider
+                    this.authProvider = new PlainTextAuthProvider(user, 
passwd);
+                }
+            }
+            // Alternate support for 3rd party auth providers that don't use 
plain text credentials.
+            // In this case the auth provider must provide a nullary 
constructor of the form:
+            //
+            // public MyAuthProvider()
+            else if (authProviderName != null)
+            {
+                try
+                {
+                    authProvider = 
(AuthProvider)Class.forName(authProviderName).newInstance();
+                }
+                catch (ClassNotFoundException | InstantiationException | 
IllegalAccessException e)
+                {
+                    errorMsg("Unknown auth provider: " + e.getMessage(), 
getCmdLineOptions());
+                }
+            }
+        }
+    }
+
+    public static Builder builder()
+    {
+        return new Builder();
+    }
+
+    private static void errorMsg(String msg, CmdLineOptions options)
+    {
+        System.err.println(msg);
+        printUsage(options);
+        System.exit(1);
+    }
+
+    private static CmdLineOptions getCmdLineOptions()
+    {
+        CmdLineOptions options = new CmdLineOptions();
+        options.addOption("v", VERBOSE_OPTION, "verbose output");
+        options.addOption("h", HELP_OPTION, "display this help message");
+        options.addOption(null, NOPROGRESS_OPTION, "don't display progress");
+        options.addOption("i", IGNORE_NODES_OPTION, "NODES", "don't stream to 
this (comma separated) list of nodes");
+        options.addOption("d", INITIAL_HOST_ADDRESS_OPTION, "initial hosts", 
"Required. try to connect to these hosts (comma separated) initially for ring 
information");
+        options.addOption("p", NATIVE_PORT_OPTION, "rpc port", "port used for 
native connection (default 9042)");
+        options.addOption("t", THROTTLE_MBITS, "throttle", "throttle speed in 
Mbits (default unlimited)");
+        options.addOption("idct", INTER_DC_THROTTLE_MBITS, 
"inter-dc-throttle", "inter-datacenter throttle speed in Mbits (default 
unlimited)");
+        options.addOption("u", USER_OPTION, "username", "username for 
cassandra authentication");
+        options.addOption("pw", PASSWD_OPTION, "password", "password for 
cassandra authentication");
+        options.addOption("ap", AUTH_PROVIDER_OPTION, "auth provider", "custom 
AuthProvider class name for cassandra authentication");
+        options.addOption("cph", CONNECTIONS_PER_HOST, "connectionsPerHost", 
"number of concurrent connections-per-host.");
+        // ssl connection-related options
+        options.addOption("ts", SSL_TRUSTSTORE, "TRUSTSTORE", "Client SSL: 
full path to truststore");
+        options.addOption("tspw", SSL_TRUSTSTORE_PW, "TRUSTSTORE-PASSWORD", 
"Client SSL: password of the truststore");
+        options.addOption("ks", SSL_KEYSTORE, "KEYSTORE", "Client SSL: full 
path to keystore");
+        options.addOption("kspw", SSL_KEYSTORE_PW, "KEYSTORE-PASSWORD", 
"Client SSL: password of the keystore");
+        options.addOption("prtcl", SSL_PROTOCOL, "PROTOCOL", "Client SSL: 
connections protocol to use (default: TLS)");
+        options.addOption("alg", SSL_ALGORITHM, "ALGORITHM", "Client SSL: 
algorithm (default: SunX509)");
+        options.addOption("st", SSL_STORE_TYPE, "STORE-TYPE", "Client SSL: 
type of store");
+        options.addOption("ciphers", SSL_CIPHER_SUITES, "CIPHER-SUITES", 
"Client SSL: comma-separated list of encryption suites to use");
+        options.addOption("f", CONFIG_PATH, "path to config file", 
"cassandra.yaml file path for streaming throughput and client/server SSL.");
+        return options;
+    }
+
+    public static void printUsage(Options options)
+    {
+        String usage = String.format("%s [options] <dir_path>", TOOL_NAME);
+        String header = System.lineSeparator() +
+                "Bulk load the sstables found in the directory <dir_path> to 
the configured cluster." +
+                "The parent directories of <dir_path> are used as the target 
keyspace/table name. " +
+                "So for instance, to load an sstable named 
Standard1-g-1-Data.db into Keyspace1/Standard1, " +
+                "you will need to have the files Standard1-g-1-Data.db and 
Standard1-g-1-Index.db into a directory /path/to/Keyspace1/Standard1/.";
+        String footer = System.lineSeparator() +
+                "You can provide cassandra.yaml file with -f command line 
option to set up streaming throughput, client and server encryption options. " +
+                "Only stream_throughput_outbound_megabits_per_sec, 
server_encryption_options and client_encryption_options are read from yaml. " +
+                "You can override options read from cassandra.yaml with 
corresponding command line options.";
+        new HelpFormatter().printHelp(usage, header, options, footer);
+    }
+}
\ No newline at end of file

Reply via email to