Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 ae5108645 -> c7b407357


(Pig) support BulkOutputFormat as a URL parameter

patch by Alex Liu; reviewed by Piotr Kołaczkowski for CASSANDRA-7410


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c7b40735
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c7b40735
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c7b40735

Branch: refs/heads/cassandra-2.1
Commit: c7b40735789c840529002eb3c11d8731f460d61c
Parents: ae51086
Author: Alex Liu <alex_li...@yahoo.com>
Authored: Tue Sep 15 16:06:18 2015 +0100
Committer: Aleksey Yeschenko <alek...@apache.org>
Committed: Tue Sep 15 16:08:54 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../hadoop/cql3/CqlBulkOutputFormat.java        |  93 +++++++-
 .../hadoop/cql3/CqlBulkRecordWriter.java        |  87 ++++----
 .../cassandra/hadoop/pig/CqlNativeStorage.java  | 213 +++++++++++++------
 .../apache/cassandra/hadoop/pig/CqlStorage.java |   1 -
 .../org/apache/cassandra/tools/BulkLoader.java  |   2 +-
 test/conf/cassandra.yaml                        |   1 +
 .../org/apache/cassandra/pig/CqlTableTest.java  |  36 ++++
 .../org/apache/cassandra/pig/PigTestBase.java   |   3 +-
 9 files changed, 336 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c7b40735/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index dff47fc..5f11049 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.10
+ * (Pig) support BulkOutputFormat as a URL parameter (CASSANDRA-7410)
  * BATCH statement is broken in cqlsh (CASSANDRA-10272)
  * Added configurable warning threshold for GC duration (CASSANDRA-8907)
  * (cqlsh) Make cqlsh PEP8 compliant (CASSANDRA-10066)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c7b40735/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkOutputFormat.java 
b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkOutputFormat.java
index 887fe8e..7fedb41 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkOutputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkOutputFormat.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
 
+import org.apache.cassandra.config.EncryptionOptions;
 import org.apache.cassandra.hadoop.AbstractBulkOutputFormat;
 import org.apache.cassandra.hadoop.ConfigHelper;
 import org.apache.hadoop.conf.Configuration;
@@ -54,6 +55,16 @@ public class CqlBulkOutputFormat extends 
AbstractBulkOutputFormat<Object, List<B
     private static final String OUTPUT_CQL_SCHEMA_PREFIX = 
"cassandra.columnfamily.schema.";
     private static final String OUTPUT_CQL_INSERT_PREFIX = 
"cassandra.columnfamily.insert.";
     private static final String DELETE_SOURCE = 
"cassandra.output.delete.source";
+    private static final String OUTPUT_CQL_STORAGE_PORT = 
"cassandra.storage.port";
+    private static final String OUTPUT_CQL_SSL_STORAGE_PORT = 
"cassandra.ssl.storage.port";
+    private static final String INTERNODE_ENCRYPTION = 
"cassandra.internode.encryption";
+    private static final String SERVER_KEYSTORE = "cassandra.server.keystore";
+    private static final String SERVER_KEYSTORE_PASSWORD = 
"cassandra.server.keystore.password";
+    private static final String SERVER_TRUSTSTORE = 
"cassandra.server.truststore";
+    private static final String SERVER_TRUSTSTORE_PASSWORD = 
"cassandra.server.truststore.password";
+    private static final String SERVER_CIPHER_SUITES = 
"cassandra.server.truststore.password";
+    public static final int DEFAULT_STORAGE_PORT = 7000;
+    public static final int DEFAULT_SSL_STORAGE_PORT = 7001;
   
     /** Fills the deprecated OutputFormat interface for streaming. */
     @Deprecated
@@ -84,7 +95,87 @@ public class CqlBulkOutputFormat extends 
AbstractBulkOutputFormat<Object, List<B
     {
         conf.set(OUTPUT_CQL_INSERT_PREFIX + columnFamily, insertStatement);
     }
-    
+
+    public static void setStoragePort(Configuration conf, int port)
+    {
+        conf.set(OUTPUT_CQL_STORAGE_PORT, "" + port);
+    }
+
+    public static void setSSLStoragePort(Configuration conf, int port)
+    {
+        conf.set(OUTPUT_CQL_SSL_STORAGE_PORT, "" + port);
+    }
+
+    public static void setInternodeEncryption(Configuration conf, String 
encrypt)
+    {
+        conf.set(INTERNODE_ENCRYPTION, encrypt);
+    }
+
+    public static void setServerKeystore(Configuration conf, String keystore)
+    {
+        conf.set(SERVER_KEYSTORE, keystore);
+    }
+
+    public static void setServerKeystorePassword(Configuration conf, String 
keystorePass)
+    {
+        conf.set(SERVER_KEYSTORE_PASSWORD, keystorePass);
+    }
+
+    public static void setServerTruststore(Configuration conf, String 
truststore)
+    {
+        conf.set(SERVER_TRUSTSTORE, truststore);
+    }
+
+    public static void setServerTruststorePassword(Configuration conf, String 
truststorePass)
+    {
+        conf.set(SERVER_TRUSTSTORE_PASSWORD, truststorePass);
+    }
+
+    public static void setServerCipherSuites(Configuration conf, String 
cipherSuites)
+    {
+        conf.set(SERVER_CIPHER_SUITES, cipherSuites);
+    }
+
+    public static int getStoragePort(Configuration conf)
+    {
+        return conf.getInt(OUTPUT_CQL_STORAGE_PORT, DEFAULT_STORAGE_PORT);
+    }
+
+    public static int getSSLStoragePort(Configuration conf)
+    {
+        return conf.getInt(OUTPUT_CQL_SSL_STORAGE_PORT, 
DEFAULT_SSL_STORAGE_PORT);
+    }
+
+    public static String getInternodeEncryption(Configuration conf)
+    {
+        return conf.get(INTERNODE_ENCRYPTION, 
EncryptionOptions.ServerEncryptionOptions.InternodeEncryption.none.name());
+    }
+
+    public static String getServerKeystore(Configuration conf)
+    {
+        return conf.get(SERVER_KEYSTORE);
+    }
+
+    public static String getServerTruststore(Configuration conf)
+    {
+        return conf.get(SERVER_TRUSTSTORE);
+    }
+
+    public static String getServerKeystorePassword(Configuration conf)
+    {
+        return conf.get(SERVER_KEYSTORE_PASSWORD);
+    }
+
+    public static String getServerTruststorePassword(Configuration conf)
+    {
+        return conf.get(SERVER_TRUSTSTORE_PASSWORD);
+    }
+
+    public static String getServerCipherSuites(Configuration conf)
+    {
+        return conf.get(SERVER_CIPHER_SUITES);
+    }
+
     public static String getColumnFamilySchema(Configuration conf, String 
columnFamily)
     {
         String schema = conf.get(OUTPUT_CQL_SCHEMA_PREFIX + columnFamily);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c7b40735/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java 
b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
index e60a240..ced8aa9 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
@@ -19,13 +19,16 @@ package org.apache.cassandra.hadoop.cql3;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
-import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 
-import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.EncryptionOptions;
+import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.hadoop.AbstractBulkRecordWriter;
 import org.apache.cassandra.hadoop.BulkRecordWriter;
@@ -35,6 +38,9 @@ import org.apache.cassandra.io.sstable.CQLSSTableWriter;
 import org.apache.cassandra.io.sstable.SSTableLoader;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.streaming.StreamState;
+import org.apache.cassandra.thrift.ITransportFactory;
+import org.apache.cassandra.tools.BulkLoader;
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.util.Progressable;
@@ -108,10 +114,7 @@ public class CqlBulkRecordWriter extends 
AbstractBulkRecordWriter<Object, List<B
             }
             if (loader == null)
             {
-                ExternalClient externalClient = new ExternalClient(conf);
-                
-                externalClient.addKnownCfs(keyspace, schema);
-
+                BulkLoader.ExternalClient externalClient = 
getExternalClient(conf);
                 this.loader = new SSTableLoader(outputDir, externalClient, new 
BulkRecordWriter.NullOutputHandler()) {
                     @Override
                     public void onSuccess(StreamState finalState)
@@ -171,41 +174,53 @@ public class CqlBulkRecordWriter extends 
AbstractBulkRecordWriter<Object, List<B
         
         return dir;
     }
-    
-    public static class ExternalClient extends 
AbstractBulkRecordWriter.ExternalClient
-    {
-        private Map<String, Map<String, CFMetaData>> knownCqlCfs = new 
HashMap<>();
-        
-        public ExternalClient(Configuration conf)
-        {
-            super(conf);
-        }
 
-        public void addKnownCfs(String keyspace, String cql)
+    private BulkLoader.ExternalClient getExternalClient(Configuration conf)
+    {
+        Set<InetAddress> hosts = new HashSet<InetAddress>();
+        String outputAddress = ConfigHelper.getOutputInitialAddress(conf);
+        if (outputAddress == null) outputAddress = "localhost";
+        String[] nodes = outputAddress.split(",");
+        for (String node : nodes)
         {
-            Map<String, CFMetaData> cfs = knownCqlCfs.get(keyspace);
-            
-            if (cfs == null)
+            try
             {
-                cfs = new HashMap<>();
-                knownCqlCfs.put(keyspace, cfs);
+                hosts.add(InetAddress.getByName(node));
             }
-            
-            CFMetaData metadata = CFMetaData.compile(cql, keyspace);
-            cfs.put(metadata.cfName, metadata);
-        }
-        
-        @Override
-        public CFMetaData getCFMetaData(String keyspace, String cfName)
-        {
-            CFMetaData metadata = super.getCFMetaData(keyspace, cfName);
-            if (metadata != null)
+            catch (UnknownHostException e)
             {
-                return metadata;
+                throw new RuntimeException(e);
             }
-            
-            Map<String, CFMetaData> cfs = knownCqlCfs.get(keyspace);
-            return cfs != null ? cfs.get(cfName) : null;
         }
+        int rpcPort = ConfigHelper.getOutputRpcPort(conf);
+        String username = ConfigHelper.getOutputKeyspaceUserName(conf);
+        String password = ConfigHelper.getOutputKeyspacePassword(conf);
+        ITransportFactory transportFactory = 
ConfigHelper.getClientTransportFactory(conf);
+        return new BulkLoader.ExternalClient(hosts,
+                rpcPort,
+                username,
+                password,
+                transportFactory,
+                CqlBulkOutputFormat.getStoragePort(conf),
+                CqlBulkOutputFormat.getSSLStoragePort(conf),
+                getServerEncryptOpt(conf));
+    }
+
+    private ServerEncryptionOptions getServerEncryptOpt(Configuration conf)
+    {
+        ServerEncryptionOptions encryptOpt = new ServerEncryptionOptions();
+        String internodeEncrypt = 
CqlBulkOutputFormat.getInternodeEncryption(conf);
+        if (StringUtils.isEmpty(internodeEncrypt))
+            return encryptOpt;
+
+        encryptOpt.internode_encryption = 
EncryptionOptions.ServerEncryptionOptions.InternodeEncryption.valueOf(internodeEncrypt);
+        encryptOpt.keystore = CqlBulkOutputFormat.getServerKeystore(conf);
+        encryptOpt.truststore = CqlBulkOutputFormat.getServerTruststore(conf);
+        encryptOpt.keystore_password = 
CqlBulkOutputFormat.getServerKeystorePassword(conf);
+        encryptOpt.truststore_password = 
CqlBulkOutputFormat.getServerTruststorePassword(conf);
+        String cipherSuites = CqlBulkOutputFormat.getServerCipherSuites(conf);
+        if (!StringUtils.isEmpty(cipherSuites))
+            encryptOpt.cipher_suites = cipherSuites.replace(" ", 
"").split(",");
+        return encryptOpt;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c7b40735/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java 
b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
index 7887085..5287bf5 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
@@ -24,18 +24,21 @@ import java.util.*;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
+import 
org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions.InternodeEncryption;
 import org.apache.cassandra.db.BufferCell;
 import org.apache.cassandra.db.Cell;
 import org.apache.cassandra.db.composites.CellNames;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.hadoop.AbstractBulkRecordWriter;
 import org.apache.cassandra.hadoop.ConfigHelper;
 import org.apache.cassandra.hadoop.HadoopCompat;
+import org.apache.cassandra.hadoop.cql3.CqlBulkOutputFormat;
 import org.apache.cassandra.hadoop.cql3.CqlConfigHelper;
 import org.apache.cassandra.hadoop.cql3.CqlRecordReader;
 import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.thrift.ConsistencyLevel;
 import org.apache.cassandra.utils.*;
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.mapreduce.*;
 import org.apache.pig.Expression;
 import org.apache.pig.ResourceSchema;
@@ -54,6 +57,7 @@ import com.datastax.driver.core.Row;
 public class CqlNativeStorage extends AbstractCassandraStorage
 {
     private static final Logger logger = 
LoggerFactory.getLogger(CqlNativeStorage.class);
+    public static String BULK_OUTPUT_FORMAT = 
"org.apache.cassandra.hadoop.cql3.CqlBulkOutputFormat";
     private int pageSize = 1000;
     private String columns;
     private String outputQuery;
@@ -83,6 +87,22 @@ public class CqlNativeStorage extends 
AbstractCassandraStorage
     private String nativeSSLCipherSuites;
     private String inputCql;
 
+    private boolean bulkOutputFormat = false;
+    private String bulkCfSchema;
+    private String bulkInsertStatement;
+    private String bulkOutputLocation;
+    private int bulkBuffSize = -1;
+    private int bulkStreamThrottle = -1;
+    private int bulkMaxFailedHosts = -1;
+    private int storagePort = CqlBulkOutputFormat.DEFAULT_STORAGE_PORT;
+    private int sslStoragePort = CqlBulkOutputFormat.DEFAULT_SSL_STORAGE_PORT;
+    private String serverKeystore;
+    private String serverKeystorePassword;
+    private String serverTruststore;
+    private String serverTruststorePassword;
+    private String serverCipherSuites;
+    private String internodeEncrypt;
+
     public CqlNativeStorage()
     {
         this(1000);
@@ -386,57 +406,22 @@ public class CqlNativeStorage extends 
AbstractCassandraStorage
         return keys;
     }
 
-
-    /** output: (((name, value), (name, value)), (value ... value), 
(value...value)) */
-    public void putNext(Tuple t) throws IOException
-    {
-        if (t.size() < 1)
-        {
-            // simply nothing here, we can't even delete without a key
-            logger.warn("Empty output skipped, filter empty tuples to suppress 
this warning");
-            return;
-        }
-
-        if (t.getType(0) == DataType.TUPLE)
-        {
-            if (t.getType(1) == DataType.TUPLE)
-            {
-                Map<String, ByteBuffer> key = tupleToKeyMap((Tuple)t.get(0));
-                cqlQueryFromTuple(key, t, 1);
-            }
-            else
-                throw new IOException("Second argument in output must be a 
tuple");
-        }
-        else
-            throw new IOException("First argument in output must be a tuple");
-    }
-
     /** convert key tuple to key map */
     private Map<String, ByteBuffer> tupleToKeyMap(Tuple t) throws IOException
     {
         Map<String, ByteBuffer> keys = new HashMap<String, ByteBuffer>();
         for (int i = 0; i < t.size(); i++)
         {
-            if (t.getType(i) == DataType.TUPLE)
-            {
-                Tuple inner = (Tuple) t.get(i);
-                if (inner.size() == 2)
-                {
-                    Object name = inner.get(0);
-                    if (name != null)
-                    {
-                        keys.put(name.toString(), objToBB(inner.get(1)));
-                    }
-                    else
-                        throw new IOException("Key name was empty");
-                }
-                else
-                    throw new IOException("Keys were not in name and value 
pairs");
-            }
-            else
-            {
+            if (t.getType(i) != DataType.TUPLE)
                 throw new IOException("keys was not a tuple");
-            }
+
+            Tuple inner = (Tuple) t.get(i);
+            if (inner.size() != 2)
+                throw new IOException("Keys were not in name and value pairs");
+            Object name = inner.get(0);
+            if (name == null)
+                throw new IOException("Key name was empty");
+            keys.put(name.toString(), objToBB(inner.get(1)));
         }
         return keys;
     }
@@ -446,21 +431,16 @@ public class CqlNativeStorage extends 
AbstractCassandraStorage
     {
         for (int i = offset; i < t.size(); i++)
         {
-            if (t.getType(i) == DataType.TUPLE)
-            {
-                Tuple inner = (Tuple) t.get(i);
-                if (inner.size() > 0)
-                {
-                    List<ByteBuffer> bindedVariables = 
bindedVariablesFromTuple(inner);
-                    if (bindedVariables.size() > 0)
-                        sendCqlQuery(key, bindedVariables);
-                    else
-                        throw new IOException("Missing binded variables");
-                }
-            }
-            else
-            {
+            if (t.getType(i) != DataType.TUPLE)
                 throw new IOException("Output type was not a tuple");
+
+            Tuple inner = (Tuple) t.get(i);
+            if (inner.size() > 0)
+            {
+                List<ByteBuffer> bindedVariables = 
bindedVariablesFromTuple(inner);
+                if (bindedVariables.size() <= 0)
+                    throw new IOException("Missing binded variables");
+                sendCqlQuery(key, bindedVariables);
             }
         }
     }
@@ -561,6 +541,37 @@ public class CqlNativeStorage extends 
AbstractCassandraStorage
         return property.getProperty(PARTITION_FILTER_SIGNATURE);
     }
 
+    /**
+     *  output: (((name, value), (name, value)), (value ... value), 
(value...value))
+     *  bulk output: ((value ... value), (value...value))
+     *
+     * */
+    public void putNext(Tuple t) throws IOException
+    {
+        if (t.size() < 1)
+        {
+            // simply nothing here, we can't even delete without a key
+            logger.warn("Empty output skipped, filter empty tuples to suppress 
this warning");
+            return;
+        }
+
+        if (t.getType(0) != DataType.TUPLE)
+            throw new IOException("First argument in output must be a tuple");
+
+        if (!bulkOutputFormat && t.getType(1) != DataType.TUPLE)
+            throw new IOException("Second argument in output must be a tuple");
+
+        if (bulkOutputFormat)
+        {
+            cqlQueryFromTuple(null, t, 0);
+        }
+        else
+        {
+            Map<String, ByteBuffer> key = tupleToKeyMap((Tuple)t.get(0));
+            cqlQueryFromTuple(key, t, 1);
+        }
+    }
+
     /** set read configuration settings */
     public void setLocation(String location, Job job) throws IOException
     {
@@ -688,6 +699,42 @@ public class CqlNativeStorage extends 
AbstractCassandraStorage
         ConfigHelper.setOutputColumnFamily(conf, keyspace, column_family);
         CqlConfigHelper.setOutputCql(conf, outputQuery);
 
+        if (bulkOutputFormat)
+        {
+            DEFAULT_OUTPUT_FORMAT = BULK_OUTPUT_FORMAT;
+            if (bulkCfSchema != null)
+                CqlBulkOutputFormat.setColumnFamilySchema(conf, column_family, 
bulkCfSchema);
+            else
+                throw new IOException("bulk_cf_schema is missing in input url 
parameter");
+            if (bulkInsertStatement != null)
+                CqlBulkOutputFormat.setColumnFamilyInsertStatement(conf, 
column_family, bulkInsertStatement);
+            else
+                throw new IOException("bulk_insert_statement is missing in 
input url parameter");
+            if (bulkOutputLocation != null)
+                conf.set(AbstractBulkRecordWriter.OUTPUT_LOCATION, 
bulkOutputLocation);
+            if (bulkBuffSize > 0)
+                conf.set(AbstractBulkRecordWriter.BUFFER_SIZE_IN_MB, 
String.valueOf(bulkBuffSize));
+            if (bulkStreamThrottle > 0)
+                conf.set(AbstractBulkRecordWriter.STREAM_THROTTLE_MBITS, 
String.valueOf(bulkStreamThrottle));
+            if (bulkMaxFailedHosts > 0)
+                conf.set(AbstractBulkRecordWriter.MAX_FAILED_HOSTS, 
String.valueOf(bulkMaxFailedHosts));
+            CqlBulkOutputFormat.setSSLStoragePort(conf, sslStoragePort);
+            CqlBulkOutputFormat.setStoragePort(conf, storagePort);
+            if (serverEncrypted())
+            {
+                if (!StringUtils.isEmpty(serverKeystore))
+                    CqlBulkOutputFormat.setServerKeystore(conf, 
serverKeystore);
+                if (!StringUtils.isEmpty(serverTruststore))
+                    CqlBulkOutputFormat.setServerTruststore(conf, 
serverTruststore);
+                if (!StringUtils.isEmpty(serverKeystorePassword))
+                    CqlBulkOutputFormat.setServerKeystorePassword(conf, 
serverKeystorePassword);
+                if (!StringUtils.isEmpty(serverTruststorePassword))
+                    CqlBulkOutputFormat.setServerTruststorePassword(conf, 
serverTruststorePassword);
+                if (!StringUtils.isEmpty(serverCipherSuites))
+                    CqlBulkOutputFormat.setServerCipherSuites(conf, 
serverCipherSuites);
+            }
+        }
+
         setConnectionInformation();
 
         if (ConfigHelper.getOutputRpcPort(conf) == 0)
@@ -700,6 +747,12 @@ public class CqlNativeStorage extends 
AbstractCassandraStorage
         initSchema(storeSignature);
     }
 
+    private boolean serverEncrypted()
+    {
+        return !StringUtils.isEmpty(internodeEncrypt) && 
+                InternodeEncryption.none != 
InternodeEncryption.valueOf(internodeEncrypt.toLowerCase());
+    }
+
     private void setLocationFromUri(String location) throws IOException
     {
         try
@@ -720,6 +773,37 @@ public class CqlNativeStorage extends 
AbstractCassandraStorage
                 if (urlQuery.containsKey("output_query"))
                     outputQuery = urlQuery.get("output_query");
 
+                if (urlQuery.containsKey("bulk_output_format"))
+                    bulkOutputFormat = 
Boolean.valueOf(urlQuery.get("bulk_output_format"));
+                if (urlQuery.containsKey("bulk_cf_schema"))
+                    bulkCfSchema = urlQuery.get("bulk_cf_schema");
+                if (urlQuery.containsKey("bulk_insert_statement"))
+                    bulkInsertStatement = 
urlQuery.get("bulk_insert_statement");
+                if (urlQuery.containsKey("bulk_output_location"))
+                    bulkOutputLocation = urlQuery.get("bulk_output_location");
+                if (urlQuery.containsKey("bulk_buff_size"))
+                    bulkBuffSize = 
Integer.valueOf(urlQuery.get("bulk_buff_size"));
+                if (urlQuery.containsKey("bulk_stream_throttle"))
+                    bulkStreamThrottle = 
Integer.valueOf(urlQuery.get("bulk_stream_throttle"));
+                if (urlQuery.containsKey("bulk_max_failed_hosts"))
+                    bulkMaxFailedHosts = 
Integer.valueOf(urlQuery.get("bulk_max_failed_hosts"));
+                if (urlQuery.containsKey("storage_port"))
+                    storagePort = 
Integer.valueOf(urlQuery.get("storage_port"));
+                if (urlQuery.containsKey("ssl_storage_port"))
+                    sslStoragePort = 
Integer.valueOf(urlQuery.get("ssl_storage_port"));
+                if (urlQuery.containsKey("internode_encrypt"))
+                    internodeEncrypt = urlQuery.get("internode_encrypt");
+                if (urlQuery.containsKey("server_keystore"))
+                    serverKeystore = urlQuery.get("server_keystore");
+                if (urlQuery.containsKey("server_truststore"))
+                    serverTruststore = urlQuery.get("server_truststore");
+                if (urlQuery.containsKey("server_keystore_pass"))
+                    serverKeystorePassword = 
urlQuery.get("server_keystore_pass");
+                if (urlQuery.containsKey("server_truststore_pass"))
+                    serverTruststorePassword = 
urlQuery.get("server_truststore_pass");
+                if (urlQuery.containsKey("server_cipher_suites"))
+                    serverCipherSuites = urlQuery.get("server_cipher_suites");
+
                 //split size
                 if (urlQuery.containsKey("split_size"))
                     splitSize = Integer.parseInt(urlQuery.get("split_size"));
@@ -804,8 +888,15 @@ public class CqlNativeStorage extends 
AbstractCassandraStorage
                     
"[&keep_alive=<keep_alive>][&auth_provider=<auth_provider>][&trust_store_path=<trust_store_path>]"
 +
                     
"[&key_store_path=<key_store_path>][&trust_store_password=<trust_store_password>]"
 +
                     
"[&key_store_password=<key_store_password>][&cipher_suites=<cipher_suites>][&input_cql=<input_cql>]"
 +
-                    "[columns=<columns>][where_clause=<where_clause>]]': " + 
e.getMessage());
-        }
+                    "[columns=<columns>][where_clause=<where_clause>]" +
+                    
"[&bulk_cf_schema=bulk_cf_schema][&bulk_insert_statement=bulk_insert_statement]"
 +
+                    
"[&bulk_output_location=<bulk_output_location>][&bulk_buff_size=<bulk_buff_size>]"
 +
+                    
"[&storage_port=<storage_port>][&ssl_storage_port=<ssl_storage_port>]" +
+                    
"[&server_keystore=<server_keystore>][&server_keystore_pass=<server_keystore_pass>]"
 +
+                    
"[&server_truststore=<server_truststore>][&server_truststore_pass=<server_truststore_pass>]"
 +
+                    
"[&server_cipher_suites=<server_cipher_suites>][&internode_encrypt=<internode_encrypt>]"
 +
+                    
"[&bulk_stream_throttle=<bulk_stream_throttle>][&bulk_max_failed_hosts=<bulk_max_failed_hosts>]]':
 " +  e.getMessage());
+         }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c7b40735/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java 
b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
index c7277fa..66583ec 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
@@ -38,4 +38,3 @@ public class CqlStorage extends CqlNativeStorage
         super(pageSize);
     }
 }
-

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c7b40735/src/java/org/apache/cassandra/tools/BulkLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/BulkLoader.java 
b/src/java/org/apache/cassandra/tools/BulkLoader.java
index 88a4404..f4b30cb 100644
--- a/src/java/org/apache/cassandra/tools/BulkLoader.java
+++ b/src/java/org/apache/cassandra/tools/BulkLoader.java
@@ -254,7 +254,7 @@ public class BulkLoader
         }
     }
 
-    static class ExternalClient extends SSTableLoader.Client
+    public static class ExternalClient extends SSTableLoader.Client
     {
         private final Map<String, CFMetaData> knownCfs = new HashMap<>();
         private final Set<InetAddress> hosts;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c7b40735/test/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/test/conf/cassandra.yaml b/test/conf/cassandra.yaml
index ec988e2..7be72dd 100644
--- a/test/conf/cassandra.yaml
+++ b/test/conf/cassandra.yaml
@@ -10,6 +10,7 @@ commitlog_segment_size_in_mb: 5
 partitioner: org.apache.cassandra.dht.ByteOrderedPartitioner
 listen_address: 127.0.0.1
 storage_port: 7010
+ssl_storage_port: 7011
 rpc_port: 9170
 start_native_transport: true
 native_transport_port: 9042

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c7b40735/test/pig/org/apache/cassandra/pig/CqlTableTest.java
----------------------------------------------------------------------
diff --git a/test/pig/org/apache/cassandra/pig/CqlTableTest.java 
b/test/pig/org/apache/cassandra/pig/CqlTableTest.java
index 4ca043d..2e1758e 100644
--- a/test/pig/org/apache/cassandra/pig/CqlTableTest.java
+++ b/test/pig/org/apache/cassandra/pig/CqlTableTest.java
@@ -53,6 +53,10 @@ public class CqlTableTest extends PigTestBase
             "CREATE INDEX test_b on test (b);",
 
             "CREATE TABLE moredata (x int PRIMARY KEY, y int);",
+            "CREATE TABLE test_bulk (a int PRIMARY KEY, b int);",
+            "INSERT INTO test_bulk (a,b) VALUES (1,1);",
+            "INSERT INTO test_bulk (a,b) VALUES (2,2);",
+            "INSERT INTO test_bulk (a,b) VALUES (3,3);",
             "INSERT INTO test (a,b) VALUES (1,1);",
             "INSERT INTO test (a,b) VALUES (2,2);",
             "INSERT INTO test (a,b) VALUES (3,3);",
@@ -160,10 +164,13 @@ public class CqlTableTest extends PigTestBase
         //input_cql=select * from test where token(a) > ? and token(a) <= ?
         pig.registerQuery("result= LOAD 'cql://cql3ks/test?" + 
defaultParameters + nativeParameters + 
"&input_cql=select%20*%20from%20test%20where%20token(a)%20%3E%20%3F%20and%20token(a)%20%3C%3D%20%3F'
 USING CqlNativeStorage();");
         Iterator<Tuple> it = pig.openIterator("result");
+        int count = 0;
         while (it.hasNext()) {
             Tuple t = it.next();
             Assert.assertEquals(t.get(0), t.get(1));
+            count ++;
         }
+        Assert.assertEquals(6, count);
     }
 
     @Test
@@ -310,4 +317,33 @@ public class CqlTableTest extends PigTestBase
             Assert.fail("Can't fetch any data");
         }
     }
+
+    @Test
+    public void testCqlStorageSingleKeyTableBulkLoad()
+    throws AuthenticationException, AuthorizationException, 
InvalidRequestException, UnavailableException, TimedOutException, TException, 
NotFoundException, SchemaDisagreementException, IOException
+    {
+        pig.setBatchOn();
+        //input_cql=select * from moredata where token(x) > ? and token(x) <= ?
+        pig.registerQuery("moretestvalues= LOAD 'cql://cql3ks/moredata?" + 
defaultParameters + nativeParameters + 
"&input_cql=select%20*%20from%20moredata%20where%20token(x)%20%3E%20%3F%20and%20token(x)%20%3C%3D%20%3F'
 USING CqlNativeStorage();");
+        pig.registerQuery("insertformat= FOREACH moretestvalues GENERATE 
TOTUPLE(x, y);");
+        pig.registerQuery("STORE insertformat INTO 'cql://cql3ks/test_bulk?" + 
defaultParameters + nativeParameters +  
"&bulk_output_format=true&bulk_cf_schema=CREATE%20TABLE%20cql3ks.test_bulk%20(a%20int%20PRIMARY%20KEY%2C%20b%20int)&bulk_insert_statement=Insert%20into%20cql3ks.test_bulk(a%2C%20b)%20values(%3F%2C%3F)'
 USING CqlNativeStorage();");
+        pig.executeBatch();
+
+        //(5,5)
+        //(6,6)
+        //(4,4)
+        //(2,2)
+        //(3,3)
+        //(1,1)
+        //input_cql=select * from test_bulk1 where token(a) > ? and token(a) 
<= ?
+        pig.registerQuery("result= LOAD 'cql://cql3ks/test_bulk?" + 
defaultParameters + nativeParameters + 
"&input_cql=select%20*%20from%20test_bulk%20where%20token(a)%20%3E%20%3F%20and%20token(a)%20%3C%3D%20%3F'
 USING CqlNativeStorage();");
+        Iterator<Tuple> it = pig.openIterator("result");
+        int count = 0;
+        while (it.hasNext()) {
+            Tuple t = it.next();
+            Assert.assertEquals(t.get(0), t.get(1));
+            count ++;
+        }
+        Assert.assertEquals(6, count);
+     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c7b40735/test/pig/org/apache/cassandra/pig/PigTestBase.java
----------------------------------------------------------------------
diff --git a/test/pig/org/apache/cassandra/pig/PigTestBase.java 
b/test/pig/org/apache/cassandra/pig/PigTestBase.java
index 4b3e422..e6964f8 100644
--- a/test/pig/org/apache/cassandra/pig/PigTestBase.java
+++ b/test/pig/org/apache/cassandra/pig/PigTestBase.java
@@ -65,7 +65,8 @@ public class PigTestBase extends SchemaLoader
     protected static Configuration conf;
     protected static MiniCluster cluster; 
     protected static PigServer pig;
-    protected static String defaultParameters= 
"init_address=localhost&rpc_port=9170&partitioner=org.apache.cassandra.dht.ByteOrderedPartitioner";
+    protected static String defaultParameters= 
"init_address=localhost&rpc_port=9170&partitioner=org.apache.cassandra.dht.ByteOrderedPartitioner"
 +
+                                               
"&storage_port=7010&ssl_storage_port=7011&internode_encrypt=NONE";
     protected static String nativeParameters = 
"&core_conns=2&max_conns=10&min_simult_reqs=3&max_simult_reqs=10&native_timeout=10000000"
  +
                                                
"&native_read_timeout=10000000&send_buff_size=4096&receive_buff_size=4096&solinger=3"
 +
                                                
"&tcp_nodelay=true&reuse_address=true&keep_alive=true&native_port=9042";

Reply via email to