http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkOutputFormat.java 
b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkOutputFormat.java
index 78080e2..3899f8c 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkOutputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkOutputFormat.java
@@ -22,11 +22,14 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
 
-import org.apache.cassandra.hadoop.AbstractBulkOutputFormat;
 import org.apache.cassandra.hadoop.ConfigHelper;
+import org.apache.cassandra.hadoop.HadoopCompat;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.util.Progressable;
@@ -35,7 +38,7 @@ import org.apache.hadoop.util.Progressable;
  * The <code>CqlBulkOutputFormat</code> acts as a Hadoop-specific
  * OutputFormat that allows reduce tasks to store keys (and corresponding
  * bound variable values) as CQL rows (and respective columns) in a given
- * ColumnFamily.
+ * table.
  *
  * <p>
  * As is the case with the {@link 
org.apache.cassandra.hadoop.cql3.CqlOutputFormat}, 
@@ -48,13 +51,14 @@ import org.apache.hadoop.util.Progressable;
  * simple.
  * </p>
  */
-public class CqlBulkOutputFormat extends AbstractBulkOutputFormat<Object, 
List<ByteBuffer>>
+public class CqlBulkOutputFormat extends OutputFormat<Object, List<ByteBuffer>>
+        implements org.apache.hadoop.mapred.OutputFormat<Object, 
List<ByteBuffer>>
 {   
   
-    private static final String OUTPUT_CQL_SCHEMA_PREFIX = 
"cassandra.columnfamily.schema.";
-    private static final String OUTPUT_CQL_INSERT_PREFIX = 
"cassandra.columnfamily.insert.";
+    private static final String OUTPUT_CQL_SCHEMA_PREFIX = 
"cassandra.table.schema.";
+    private static final String OUTPUT_CQL_INSERT_PREFIX = 
"cassandra.table.insert.";
     private static final String DELETE_SOURCE = 
"cassandra.output.delete.source";
-    private static final String COLUMNFAMILY_ALIAS_PREFIX = 
"cqlbulkoutputformat.columnfamily.alias.";
+    private static final String TABLE_ALIAS_PREFIX = 
"cqlbulkoutputformat.table.alias.";
   
     /** Fills the deprecated OutputFormat interface for streaming. */
     @Deprecated
@@ -75,33 +79,60 @@ public class CqlBulkOutputFormat extends 
AbstractBulkOutputFormat<Object, List<B
     {
         return new CqlBulkRecordWriter(context);
     }
+
+    @Override
+    public void checkOutputSpecs(JobContext context)
+    {
+        checkOutputSpecs(HadoopCompat.getConfiguration(context));
+    }
+
+    private void checkOutputSpecs(Configuration conf)
+    {
+        if (ConfigHelper.getOutputKeyspace(conf) == null)
+        {
+            throw new UnsupportedOperationException("you must set the keyspace 
with setTable()");
+        }
+    }
+
+    /** Fills the deprecated OutputFormat interface for streaming. */
+    @Deprecated
+    public void checkOutputSpecs(org.apache.hadoop.fs.FileSystem filesystem, 
org.apache.hadoop.mapred.JobConf job) throws IOException
+    {
+        checkOutputSpecs(job);
+    }
+
+    @Override
+    public OutputCommitter getOutputCommitter(TaskAttemptContext context) 
throws IOException, InterruptedException
+    {
+        return new NullOutputCommitter();
+    }
     
-    public static void setColumnFamilySchema(Configuration conf, String 
columnFamily, String schema)
+    public static void setTableSchema(Configuration conf, String columnFamily, 
String schema)
     {
         conf.set(OUTPUT_CQL_SCHEMA_PREFIX + columnFamily, schema);
     }
 
-    public static void setColumnFamilyInsertStatement(Configuration conf, 
String columnFamily, String insertStatement)
+    public static void setTableInsertStatement(Configuration conf, String 
columnFamily, String insertStatement)
     {
         conf.set(OUTPUT_CQL_INSERT_PREFIX + columnFamily, insertStatement);
     }
     
-    public static String getColumnFamilySchema(Configuration conf, String 
columnFamily)
+    public static String getTableSchema(Configuration conf, String 
columnFamily)
     {
         String schema = conf.get(OUTPUT_CQL_SCHEMA_PREFIX + columnFamily);
         if (schema == null)
         { 
-            throw new UnsupportedOperationException("You must set the 
ColumnFamily schema using setColumnFamilySchema.");
+            throw new UnsupportedOperationException("You must set the Table 
schema using setTableSchema.");
         }
         return schema; 
     }
     
-    public static String getColumnFamilyInsertStatement(Configuration conf, 
String columnFamily)
+    public static String getTableInsertStatement(Configuration conf, String 
columnFamily)
     {
         String insert = conf.get(OUTPUT_CQL_INSERT_PREFIX + columnFamily); 
         if (insert == null)
         {
-            throw new UnsupportedOperationException("You must set the 
ColumnFamily insert statement using setColumnFamilySchema.");
+            throw new UnsupportedOperationException("You must set the Table 
insert statement using setTableSchema.");
         }
         return insert;
     }
@@ -116,13 +147,31 @@ public class CqlBulkOutputFormat extends 
AbstractBulkOutputFormat<Object, List<B
         return conf.getBoolean(DELETE_SOURCE, false);
     }
     
-    public static void setColumnFamilyAlias(Configuration conf, String alias, 
String columnFamily)
+    public static void setTableAlias(Configuration conf, String alias, String 
columnFamily)
     {
-        conf.set(COLUMNFAMILY_ALIAS_PREFIX + alias, columnFamily);
+        conf.set(TABLE_ALIAS_PREFIX + alias, columnFamily);
     }
     
-    public static String getColumnFamilyForAlias(Configuration conf, String 
alias)
+    public static String getTableForAlias(Configuration conf, String alias)
     {
-        return conf.get(COLUMNFAMILY_ALIAS_PREFIX + alias);
+        return conf.get(TABLE_ALIAS_PREFIX + alias);
+    }
+
+    public static class NullOutputCommitter extends OutputCommitter
+    {
+        public void abortTask(TaskAttemptContext taskContext) { }
+
+        public void cleanupJob(JobContext jobContext) { }
+
+        public void commitTask(TaskAttemptContext taskContext) { }
+
+        public boolean needsTaskCommit(TaskAttemptContext taskContext)
+        {
+            return false;
+        }
+
+        public void setupJob(JobContext jobContext) { }
+
+        public void setupTask(TaskAttemptContext taskContext) { }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java 
b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
index 60cd511..e77c4c8 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
@@ -17,17 +17,22 @@
  */
 package org.apache.cassandra.hadoop.cql3;
 
+import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
+import java.util.*;
+import java.util.concurrent.*;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.exceptions.InvalidRequestException;
-import org.apache.cassandra.hadoop.AbstractBulkRecordWriter;
 import org.apache.cassandra.hadoop.BulkRecordWriter;
 import org.apache.cassandra.hadoop.ConfigHelper;
 import org.apache.cassandra.hadoop.HadoopCompat;
@@ -35,11 +40,12 @@ import org.apache.cassandra.io.sstable.CQLSSTableWriter;
 import org.apache.cassandra.io.sstable.SSTableLoader;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.streaming.StreamState;
+import org.apache.cassandra.utils.NativeSSTableLoaderClient;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.util.Progressable;
 
-
 /**
  * The <code>CqlBulkRecordWriter</code> maps the output &lt;key, value&gt;
  * pairs to a Cassandra column family. In particular, it applies the binded 
variables
@@ -54,10 +60,26 @@ import org.apache.hadoop.util.Progressable;
  *
  * @see CqlBulkOutputFormat
  */
-public class CqlBulkRecordWriter extends AbstractBulkRecordWriter<Object, 
List<ByteBuffer>>
+public class CqlBulkRecordWriter extends RecordWriter<Object, List<ByteBuffer>>
+        implements org.apache.hadoop.mapred.RecordWriter<Object, 
List<ByteBuffer>>
 {
+    public final static String OUTPUT_LOCATION = 
"mapreduce.output.bulkoutputformat.localdir";
+    public final static String BUFFER_SIZE_IN_MB = 
"mapreduce.output.bulkoutputformat.buffersize";
+    public final static String STREAM_THROTTLE_MBITS = 
"mapreduce.output.bulkoutputformat.streamthrottlembits";
+    public final static String MAX_FAILED_HOSTS = 
"mapreduce.output.bulkoutputformat.maxfailedhosts";
+
+    private final Logger logger = 
LoggerFactory.getLogger(CqlBulkRecordWriter.class);
+
+    protected final Configuration conf;
+    protected final int maxFailures;
+    protected final int bufferSize;
+    protected Closeable writer;
+    protected SSTableLoader loader;
+    protected Progressable progress;
+    protected TaskAttemptContext context;
+
     private String keyspace;
-    private String columnFamily;
+    private String table;
     private String schema;
     private String insertStatement;
     private File outputDir;
@@ -65,19 +87,25 @@ public class CqlBulkRecordWriter extends 
AbstractBulkRecordWriter<Object, List<B
 
     CqlBulkRecordWriter(TaskAttemptContext context) throws IOException
     {
-        super(context);
+        this(HadoopCompat.getConfiguration(context));
+        this.context = context;
         setConfigs();
     }
 
     CqlBulkRecordWriter(Configuration conf, Progressable progress) throws 
IOException
     {
-        super(conf, progress);
+        this(conf);
+        this.progress = progress;
         setConfigs();
     }
 
     CqlBulkRecordWriter(Configuration conf) throws IOException
     {
-        super(conf);
+        Config.setOutboundBindAny(true);
+        this.conf = conf;
+        
DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(Integer.parseInt(conf.get(STREAM_THROTTLE_MBITS,
 "0")));
+        maxFailures = Integer.parseInt(conf.get(MAX_FAILED_HOSTS, "0"));
+        bufferSize = Integer.parseInt(conf.get(BUFFER_SIZE_IN_MB, "64"));
         setConfigs();
     }
     
@@ -85,54 +113,55 @@ public class CqlBulkRecordWriter extends 
AbstractBulkRecordWriter<Object, List<B
     {
         // if anything is missing, exceptions will be thrown here, instead of 
on write()
         keyspace = ConfigHelper.getOutputKeyspace(conf);
-        columnFamily = ConfigHelper.getOutputColumnFamily(conf);
+        table = ConfigHelper.getOutputColumnFamily(conf);
         
-        // check if columnFamily is aliased
-        String aliasedCf = CqlBulkOutputFormat.getColumnFamilyForAlias(conf, 
columnFamily);
+        // check if table is aliased
+        String aliasedCf = CqlBulkOutputFormat.getTableForAlias(conf, table);
         if (aliasedCf != null)
-            columnFamily = aliasedCf;
+            table = aliasedCf;
         
-        schema = CqlBulkOutputFormat.getColumnFamilySchema(conf, columnFamily);
-        insertStatement = 
CqlBulkOutputFormat.getColumnFamilyInsertStatement(conf, columnFamily);
-        outputDir = getColumnFamilyDirectory();
+        schema = CqlBulkOutputFormat.getTableSchema(conf, table);
+        insertStatement = CqlBulkOutputFormat.getTableInsertStatement(conf, 
table);
+        outputDir = getTableDirectory();
         deleteSrc = CqlBulkOutputFormat.getDeleteSourceOnSuccess(conf);
     }
 
-    
+    protected String getOutputLocation() throws IOException
+    {
+        String dir = conf.get(OUTPUT_LOCATION, 
System.getProperty("java.io.tmpdir"));
+        if (dir == null)
+            throw new IOException("Output directory not defined, if hadoop is 
not setting java.io.tmpdir then define " + OUTPUT_LOCATION);
+        return dir;
+    }
+
     private void prepareWriter() throws IOException
     {
-        try
+        if (writer == null)
         {
-            if (writer == null)
-            {
-                writer = CQLSSTableWriter.builder()
-                    .forTable(schema)
-                    .using(insertStatement)
-                    .withPartitioner(ConfigHelper.getOutputPartitioner(conf))
-                    .inDirectory(outputDir)
-                    
.withBufferSizeInMB(Integer.parseInt(conf.get(BUFFER_SIZE_IN_MB, "64")))
-                    .build();
-            }
-            if (loader == null)
-            {
-                ExternalClient externalClient = new ExternalClient(conf);
-                
-                externalClient.addKnownCfs(keyspace, schema);
-
-                this.loader = new SSTableLoader(outputDir, externalClient, new 
BulkRecordWriter.NullOutputHandler()) {
-                    @Override
-                    public void onSuccess(StreamState finalState)
-                    {
-                        if (deleteSrc)
-                            FileUtils.deleteRecursive(outputDir);
-                    }
-                };
-            }
+            writer = CQLSSTableWriter.builder()
+                                     .forTable(schema)
+                                     .using(insertStatement)
+                                     
.withPartitioner(ConfigHelper.getOutputPartitioner(conf))
+                                     .inDirectory(outputDir)
+                                     
.withBufferSizeInMB(Integer.parseInt(conf.get(BUFFER_SIZE_IN_MB, "64")))
+                                     .build();
         }
-        catch (Exception e)
+
+        if (loader == null)
         {
-            throw new IOException(e);
-        }      
+            ExternalClient externalClient = new ExternalClient(conf);
+            externalClient.setTableMetadata(CFMetaData.compile(schema, 
keyspace));
+
+            loader = new SSTableLoader(outputDir, externalClient, new 
BulkRecordWriter.NullOutputHandler())
+            {
+                @Override
+                public void onSuccess(StreamState finalState)
+                {
+                    if (deleteSrc)
+                        FileUtils.deleteRecursive(outputDir);
+                }
+            };
+        }
     }
     
     /**
@@ -168,9 +197,9 @@ public class CqlBulkRecordWriter extends 
AbstractBulkRecordWriter<Object, List<B
         }
     }
     
-    private File getColumnFamilyDirectory() throws IOException
+    private File getTableDirectory() throws IOException
     {
-        File dir = new File(String.format("%s%s%s%s%s-%s", 
getOutputLocation(), File.separator, keyspace, File.separator, columnFamily, 
UUID.randomUUID().toString()));
+        File dir = new File(String.format("%s%s%s%s%s-%s", 
getOutputLocation(), File.separator, keyspace, File.separator, table, 
UUID.randomUUID().toString()));
         
         if (!dir.exists() && !dir.mkdirs())
         {
@@ -179,41 +208,83 @@ public class CqlBulkRecordWriter extends 
AbstractBulkRecordWriter<Object, List<B
         
         return dir;
     }
-    
-    public static class ExternalClient extends 
AbstractBulkRecordWriter.ExternalClient
+
+    @Override
+    public void close(TaskAttemptContext context) throws IOException, 
InterruptedException
     {
-        private Map<String, Map<String, CFMetaData>> knownCqlCfs = new 
HashMap<>();
-        
-        public ExternalClient(Configuration conf)
-        {
-            super(conf);
-        }
+        close();
+    }
+
+    /** Fills the deprecated RecordWriter interface for streaming. */
+    @Deprecated
+    public void close(org.apache.hadoop.mapred.Reporter reporter) throws 
IOException
+    {
+        close();
+    }
 
-        public void addKnownCfs(String keyspace, String cql)
+    private void close() throws IOException
+    {
+        if (writer != null)
         {
-            Map<String, CFMetaData> cfs = knownCqlCfs.get(keyspace);
-            
-            if (cfs == null)
+            writer.close();
+            Future<StreamState> future = loader.stream();
+            while (true)
             {
-                cfs = new HashMap<>();
-                knownCqlCfs.put(keyspace, cfs);
+                try
+                {
+                    future.get(1000, TimeUnit.MILLISECONDS);
+                    break;
+                }
+                catch (ExecutionException | TimeoutException te)
+                {
+                    if (null != progress)
+                        progress.progress();
+                    if (null != context)
+                        HadoopCompat.progress(context);
+                }
+                catch (InterruptedException e)
+                {
+                    throw new IOException(e);
+                }
+            }
+            if (loader.getFailedHosts().size() > 0)
+            {
+                if (loader.getFailedHosts().size() > maxFailures)
+                    throw new IOException("Too many hosts failed: " + 
loader.getFailedHosts());
+                else
+                    logger.warn("Some hosts failed: {}", 
loader.getFailedHosts());
             }
-            
-            CFMetaData metadata = CFMetaData.compile(cql, keyspace);
-            cfs.put(metadata.cfName, metadata);
         }
-        
-        @Override
-        public CFMetaData getCFMetaData(String keyspace, String cfName)
+    }
+    
+    public static class ExternalClient extends NativeSSTableLoaderClient
+    {
+        public ExternalClient(Configuration conf)
         {
-            CFMetaData metadata = super.getCFMetaData(keyspace, cfName);
-            if (metadata != null)
+            super(resolveHostAddresses(conf),
+                  CqlConfigHelper.getOutputNativePort(conf),
+                  ConfigHelper.getOutputKeyspaceUserName(conf),
+                  ConfigHelper.getOutputKeyspacePassword(conf),
+                  CqlConfigHelper.getSSLOptions(conf).orNull());
+        }
+
+        private static Collection<InetAddress> 
resolveHostAddresses(Configuration conf)
+        {
+            Set<InetAddress> addresses = new HashSet<>();
+
+            for (String host : 
ConfigHelper.getOutputInitialAddress(conf).split(","))
             {
-                return metadata;
+                try
+                {
+                    addresses.add(InetAddress.getByName(host));
+                }
+                catch (UnknownHostException e)
+                {
+                    throw new RuntimeException(e);
+                }
             }
-            
-            Map<String, CFMetaData> cfs = knownCqlCfs.get(keyspace);
-            return cfs != null ? cfs.get(cfName) : null;
+
+            return addresses;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java 
b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
index ac5a7e5..3033fa6 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
@@ -34,22 +34,23 @@ import javax.net.ssl.KeyManagerFactory;
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.TrustManagerFactory;
 
-import org.apache.cassandra.hadoop.ConfigHelper;
-import org.apache.cassandra.io.util.FileUtils;
+import com.google.common.base.Optional;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
 
 import com.datastax.driver.core.AuthProvider;
-import com.datastax.driver.core.PlainTextAuthProvider;
 import com.datastax.driver.core.Cluster;
 import com.datastax.driver.core.HostDistance;
+import com.datastax.driver.core.PlainTextAuthProvider;
+import com.datastax.driver.core.policies.LoadBalancingPolicy;
 import com.datastax.driver.core.PoolingOptions;
 import com.datastax.driver.core.ProtocolOptions;
 import com.datastax.driver.core.QueryOptions;
 import com.datastax.driver.core.SSLOptions;
 import com.datastax.driver.core.SocketOptions;
-import com.datastax.driver.core.policies.LoadBalancingPolicy;
-import com.google.common.base.Optional;
+import org.apache.cassandra.hadoop.ConfigHelper;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+
 
 public class CqlConfigHelper
 {
@@ -84,6 +85,7 @@ public class CqlConfigHelper
     private static final String INPUT_NATIVE_PROTOCOL_VERSION = 
"cassandra.input.native.protocol.version";
 
     private static final String OUTPUT_CQL = "cassandra.output.cql";
+    private static final String OUTPUT_NATIVE_PORT = 
"cassandra.output.native.port";
     
     /**
      * Set the CQL columns for the input of this job.
@@ -176,6 +178,11 @@ public class CqlConfigHelper
         return Integer.parseInt(conf.get(INPUT_NATIVE_PORT, "9042"));
     }
 
+    public static int getOutputNativePort(Configuration conf)
+    {
+        return Integer.parseInt(conf.get(OUTPUT_NATIVE_PORT, "9042"));
+    }
+
     public static Optional<Integer> 
getInputMinSimultReqPerConnections(Configuration conf)
     {
         return getIntSetting(INPUT_NATIVE_MIN_SIMULT_REQ_PER_CONNECTION, conf);
@@ -294,6 +301,22 @@ public class CqlConfigHelper
     public static Cluster getInputCluster(String[] hosts, Configuration conf)
     {
         int port = getInputNativePort(conf);
+        return getCluster(hosts, conf, port);
+    }
+
+    public static Cluster getOutputCluster(String host, Configuration conf)
+    {
+        return getOutputCluster(new String[]{host}, conf);
+    }
+
+    public static Cluster getOutputCluster(String[] hosts, Configuration conf)
+    {
+        int port = getOutputNativePort(conf);
+        return getCluster(hosts, conf, port);
+    }
+
+    public static Cluster getCluster(String[] hosts, Configuration conf, int 
port)
+    {
         Optional<AuthProvider> authProvider = getAuthProvider(conf);
         Optional<SSLOptions> sslOptions = getSSLOptions(conf);
         Optional<Integer> protocolVersion = getProtocolVersion(conf);
@@ -301,11 +324,11 @@ public class CqlConfigHelper
         SocketOptions socketOptions = getReadSocketOptions(conf);
         QueryOptions queryOptions = getReadQueryOptions(conf);
         PoolingOptions poolingOptions = getReadPoolingOptions(conf);
-        
+
         Cluster.Builder builder = Cluster.builder()
-                                         .addContactPoints(hosts)
-                                         .withPort(port)
-                                         
.withCompression(ProtocolOptions.Compression.NONE);
+                .addContactPoints(hosts)
+                .withPort(port)
+                .withCompression(ProtocolOptions.Compression.NONE);
 
         if (authProvider.isPresent())
             builder.withAuthProvider(authProvider.get());
@@ -316,14 +339,13 @@ public class CqlConfigHelper
             builder.withProtocolVersion(protocolVersion.get());
         }
         builder.withLoadBalancingPolicy(loadBalancingPolicy)
-               .withSocketOptions(socketOptions)
-               .withQueryOptions(queryOptions)
-               .withPoolingOptions(poolingOptions);
+                .withSocketOptions(socketOptions)
+                .withQueryOptions(queryOptions)
+                .withPoolingOptions(poolingOptions);
 
         return builder.build();
     }
 
-
     public static void setInputCoreConnections(Configuration conf, String 
connections)
     {
         conf.set(INPUT_NATIVE_CORE_CONNECTIONS_PER_HOST, connections);
@@ -502,7 +524,7 @@ public class CqlConfigHelper
         return Optional.of(getClientAuthProvider(authProvider.get(), conf));
     }
 
-    private static Optional<SSLOptions> getSSLOptions(Configuration conf)
+    public static Optional<SSLOptions> getSSLOptions(Configuration conf)
     {
         Optional<String> truststorePath = 
getInputNativeSSLTruststorePath(conf);
         Optional<String> keystorePath = getInputNativeSSLKeystorePath(conf);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java 
b/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java
index 0d09ca2..9a1cda6 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java
@@ -23,15 +23,15 @@ import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.cassandra.hadoop.AbstractColumnFamilyOutputFormat;
-import org.apache.cassandra.hadoop.ConfigHelper;
+import org.apache.cassandra.hadoop.*;
+import org.apache.hadoop.conf.*;
 import org.apache.hadoop.mapreduce.*;
 
 /**
- * The <code>ColumnFamilyOutputFormat</code> acts as a Hadoop-specific
+ * The <code>CqlOutputFormat</code> acts as a Hadoop-specific
  * OutputFormat that allows reduce tasks to store keys (and corresponding
  * bound variable values) as CQL rows (and respective columns) in a given
- * ColumnFamily.
+ * table.
  *
  * <p>
  * As is the case with the {@link 
org.apache.cassandra.hadoop.ColumnFamilyInputFormat}, 
@@ -52,8 +52,51 @@ import org.apache.hadoop.mapreduce.*;
  * to Cassandra.
  * </p>
  */
-public class CqlOutputFormat extends 
AbstractColumnFamilyOutputFormat<Map<String, ByteBuffer>, List<ByteBuffer>>
-{   
+public class CqlOutputFormat extends OutputFormat<Map<String, ByteBuffer>, 
List<ByteBuffer>>
+        implements org.apache.hadoop.mapred.OutputFormat<Map<String, 
ByteBuffer>, List<ByteBuffer>>
+{
+    /**
+     * Check for validity of the output-specification for the job.
+     *
+     * @param context
+     *            information about the job
+     */
+    public void checkOutputSpecs(JobContext context)
+    {
+        checkOutputSpecs(HadoopCompat.getConfiguration(context));
+    }
+
+    protected void checkOutputSpecs(Configuration conf)
+    {
+        if (ConfigHelper.getOutputKeyspace(conf) == null)
+            throw new UnsupportedOperationException("You must set the keyspace 
with setOutputKeyspace()");
+        if (ConfigHelper.getOutputPartitioner(conf) == null)
+            throw new UnsupportedOperationException("You must set the output 
partitioner to the one used by your Cassandra cluster");
+        if (ConfigHelper.getOutputInitialAddress(conf) == null)
+            throw new UnsupportedOperationException("You must set the initial 
output address to a Cassandra node");
+    }
+
+    /** Fills the deprecated OutputFormat interface for streaming. */
+    @Deprecated
+    public void checkOutputSpecs(org.apache.hadoop.fs.FileSystem filesystem, 
org.apache.hadoop.mapred.JobConf job) throws IOException
+    {
+        checkOutputSpecs(job);
+    }
+
+    /**
+     * The OutputCommitter for this format does not write any data to the DFS.
+     *
+     * @param context
+     *            the task context
+     * @return an output committer
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    public OutputCommitter getOutputCommitter(TaskAttemptContext context) 
throws IOException, InterruptedException
+    {
+        return new NullOutputCommitter();
+    }
+
     /** Fills the deprecated OutputFormat interface for streaming. */
     @Deprecated
     public CqlRecordWriter getRecordWriter(org.apache.hadoop.fs.FileSystem 
filesystem, org.apache.hadoop.mapred.JobConf job, String name, 
org.apache.hadoop.util.Progressable progress) throws IOException
@@ -73,4 +116,25 @@ public class CqlOutputFormat extends 
AbstractColumnFamilyOutputFormat<Map<String
     {
         return new CqlRecordWriter(context);
     }
+
+    /**
+     * An {@link OutputCommitter} that does nothing.
+     */
+    private static class NullOutputCommitter extends OutputCommitter
+    {
+        public void abortTask(TaskAttemptContext taskContext) { }
+
+        public void cleanupJob(JobContext jobContext) { }
+
+        public void commitTask(TaskAttemptContext taskContext) { }
+
+        public boolean needsTaskCommit(TaskAttemptContext taskContext)
+        {
+            return false;
+        }
+
+        public void setupJob(JobContext jobContext) { }
+
+        public void setupTask(TaskAttemptContext taskContext) { }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java 
b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
index 308bdf8..4a7bd59 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
@@ -37,13 +37,15 @@ import org.slf4j.LoggerFactory;
 import com.datastax.driver.core.Cluster;
 import com.datastax.driver.core.ColumnDefinitions;
 import com.datastax.driver.core.ColumnMetadata;
+import com.datastax.driver.core.Metadata;
 import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
+import com.datastax.driver.core.TableMetadata;
+import com.datastax.driver.core.Token;
 import com.datastax.driver.core.TupleValue;
 import com.datastax.driver.core.UDTValue;
-import org.apache.cassandra.schema.LegacySchemaTables;
-import org.apache.cassandra.db.SystemKeyspace;
+import com.google.common.reflect.TypeToken;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.hadoop.ColumnFamilySplit;
@@ -493,36 +495,72 @@ public class CqlRecordReader extends RecordReader<Long, 
Row>
         }
 
         @Override
+        public <T> List<T> getList(int i, TypeToken<T> typeToken)
+        {
+            return row.getList(i, typeToken);
+        }
+
+        @Override
         public <T> List<T> getList(String name, Class<T> elementsClass)
         {
             return row.getList(name, elementsClass);
         }
 
         @Override
+        public <T> List<T> getList(String s, TypeToken<T> typeToken)
+        {
+            return row.getList(s, typeToken);
+        }
+
+        @Override
         public <T> Set<T> getSet(int i, Class<T> elementsClass)
         {
             return row.getSet(i, elementsClass);
         }
 
         @Override
+        public <T> Set<T> getSet(int i, TypeToken<T> typeToken)
+        {
+            return row.getSet(i, typeToken);
+        }
+
+        @Override
         public <T> Set<T> getSet(String name, Class<T> elementsClass)
         {
             return row.getSet(name, elementsClass);
         }
 
         @Override
+        public <T> Set<T> getSet(String s, TypeToken<T> typeToken)
+        {
+            return row.getSet(s, typeToken);
+        }
+
+        @Override
         public <K, V> Map<K, V> getMap(int i, Class<K> keysClass, Class<V> 
valuesClass)
         {
             return row.getMap(i, keysClass, valuesClass);
         }
 
         @Override
+        public <K, V> Map<K, V> getMap(int i, TypeToken<K> typeToken, 
TypeToken<V> typeToken1)
+        {
+            return row.getMap(i, typeToken, typeToken1);
+        }
+
+        @Override
         public <K, V> Map<K, V> getMap(String name, Class<K> keysClass, 
Class<V> valuesClass)
         {
             return row.getMap(name, keysClass, valuesClass);
         }
 
         @Override
+        public <K, V> Map<K, V> getMap(String s, TypeToken<K> typeToken, 
TypeToken<V> typeToken1)
+        {
+            return row.getMap(s, typeToken, typeToken1);
+        }
+
+        @Override
         public UDTValue getUDTValue(int i)
         {
             return row.getUDTValue(i);
@@ -545,6 +583,24 @@ public class CqlRecordReader extends RecordReader<Long, 
Row>
         {
             return row.getTupleValue(name);
         }
+
+        @Override
+        public Token getToken(int i)
+        {
+            return row.getToken(i);
+        }
+
+        @Override
+        public Token getToken(String name)
+        {
+            return row.getToken(name);
+        }
+
+        @Override
+        public Token getPartitionKeyToken()
+        {
+            return row.getPartitionKeyToken();
+        }
     }
 
     /**
@@ -604,36 +660,21 @@ public class CqlRecordReader extends RecordReader<Long, 
Row>
 
     private void fetchKeys()
     {
-        String query = String.format("SELECT column_name, component_index, 
type " +
-                                     "FROM %s.%s " +
-                                     "WHERE keyspace_name = '%s' AND 
columnfamily_name = '%s'",
-                                     SystemKeyspace.NAME,
-                                     LegacySchemaTables.COLUMNS,
-                                     keyspace,
-                                     cfName);
-
         // get CF meta data
-        List<Row> rows = session.execute(query).all();
-        if (rows.isEmpty())
+        TableMetadata tableMetadata = session.getCluster()
+                                             .getMetadata()
+                                             
.getKeyspace(Metadata.quote(keyspace))
+                                             .getTable(Metadata.quote(cfName));
+        if (tableMetadata == null)
         {
             throw new RuntimeException("No table metadata found for " + 
keyspace + "." + cfName);
         }
-        int numberOfPartitionKeys = 0;
-        for (Row row : rows)
-            if (row.getString(2).equals("partition_key"))
-                numberOfPartitionKeys++;
-        String[] partitionKeyArray = new String[numberOfPartitionKeys];
-        for (Row row : rows)
+        //Here we assume that tableMetadata.getPartitionKey() always
+        //returns the list of columns in order of component_index
+        for (ColumnMetadata partitionKey : tableMetadata.getPartitionKey())
         {
-            String type = row.getString(2);
-            String column = row.getString(0);
-            if (type.equals("partition_key"))
-            {
-                int componentIndex = row.isNull(1) ? 0 : row.getInt(1);
-                partitionKeyArray[componentIndex] = column;
-            }
+            partitionKeys.add(partitionKey.getName());
         }
-        partitionKeys.addAll(Arrays.asList(partitionKeyArray));
     }
 
     private String quote(String identifier)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java 
b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
index dbbeb47..1d8436b 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
@@ -21,37 +21,39 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.*;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.schema.LegacySchemaTables;
-import org.apache.cassandra.db.SystemKeyspace;
+
+import com.datastax.driver.core.exceptions.AuthenticationException;
+import com.datastax.driver.core.exceptions.InvalidQueryException;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.ColumnMetadata;
+import com.datastax.driver.core.Host;
+import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.TableMetadata;
+import com.datastax.driver.core.TokenRange;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.CompositeType;
 import org.apache.cassandra.db.marshal.LongType;
 import org.apache.cassandra.db.marshal.TypeParser;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.dht.*;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.SyntaxException;
-import org.apache.cassandra.hadoop.AbstractColumnFamilyOutputFormat;
-import org.apache.cassandra.hadoop.AbstractColumnFamilyRecordWriter;
-import org.apache.cassandra.hadoop.ConfigHelper;
-import org.apache.cassandra.hadoop.HadoopCompat;
-import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.hadoop.*;
+import org.apache.cassandra.utils.*;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.*;
 import org.apache.hadoop.util.Progressable;
-import org.apache.thrift.TException;
-import org.apache.thrift.transport.TTransport;
 
 /**
- * The <code>ColumnFamilyRecordWriter</code> maps the output &lt;key, value&gt;
- * pairs to a Cassandra column family. In particular, it applies the binded 
variables
+ * The <code>CqlRecordWriter</code> maps the output &lt;key, value&gt;
+ * pairs to a Cassandra table. In particular, it applies the binded variables
  * in the value to the prepared statement, which it associates with the key, 
and in 
  * turn the responsible endpoint.
  *
@@ -63,21 +65,38 @@ import org.apache.thrift.transport.TTransport;
  *
  * @see CqlOutputFormat
  */
-class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String, 
ByteBuffer>, List<ByteBuffer>> implements AutoCloseable
+class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, 
List<ByteBuffer>> implements
+        org.apache.hadoop.mapred.RecordWriter<Map<String, ByteBuffer>, 
List<ByteBuffer>>, AutoCloseable
 {
     private static final Logger logger = 
LoggerFactory.getLogger(CqlRecordWriter.class);
 
+    // The configuration this writer is associated with.
+    protected final Configuration conf;
+    // The number of mutations to buffer per endpoint
+    protected final int queueSize;
+
+    protected final long batchThreshold;
+
+    protected Progressable progressable;
+    protected TaskAttemptContext context;
+
+    // The ring cache that describes the token ranges each node in the ring is
+    // responsible for. This is what allows us to group the mutations by
+    // the endpoints they should be targeted at. The targeted endpoint
+    // essentially
+    // acts as the primary replica for the rows being affected by the 
mutations.
+    private final NativeRingCache ringCache;
+
     // handles for clients for each range running in the threadpool
     protected final Map<InetAddress, RangeClient> clients;
 
     // host to prepared statement id mappings
-    protected final ConcurrentHashMap<Cassandra.Client, Integer> 
preparedStatements = new ConcurrentHashMap<Cassandra.Client, Integer>();
+    protected final ConcurrentHashMap<Session, PreparedStatement> 
preparedStatements = new ConcurrentHashMap<Session, PreparedStatement>();
 
     protected final String cql;
 
-    protected AbstractType<?> keyValidator;
-    protected String [] partitionKeyColumns;
-    protected List<String> clusterColumns;
+    protected List<ColumnMetadata> partitionKeyColumns;
+    protected List<ColumnMetadata> clusterColumns;
 
     /**
      * Upon construction, obtain the map that this writer will use to collect
@@ -100,28 +119,28 @@ class CqlRecordWriter extends 
AbstractColumnFamilyRecordWriter<Map<String, ByteB
 
     CqlRecordWriter(Configuration conf)
     {
-        super(conf);
+        this.conf = conf;
+        this.queueSize = conf.getInt(ColumnFamilyOutputFormat.QUEUE_SIZE, 32 * 
FBUtilities.getAvailableProcessors());
+        batchThreshold = 
conf.getLong(ColumnFamilyOutputFormat.BATCH_THRESHOLD, 32);
         this.clients = new HashMap<>();
 
         try
         {
-            Cassandra.Client client = 
ConfigHelper.getClientFromOutputAddressList(conf);
+            String keyspace = ConfigHelper.getOutputKeyspace(conf);
+            Session client = 
CqlConfigHelper.getOutputCluster(ConfigHelper.getOutputInitialAddress(conf), 
conf).connect(keyspace);
+            ringCache = new NativeRingCache(conf);
             if (client != null)
             {
-                client.set_keyspace(ConfigHelper.getOutputKeyspace(conf));
-                String user = ConfigHelper.getOutputKeyspaceUserName(conf);
-                String password = ConfigHelper.getOutputKeyspacePassword(conf);
-                if ((user != null) && (password != null))
-                    AbstractColumnFamilyOutputFormat.login(user, password, 
client);
-                retrievePartitionKeyValidator(client);
+                TableMetadata tableMetadata = 
client.getCluster().getMetadata().getKeyspace(client.getLoggedKeyspace()).getTable(ConfigHelper.getOutputColumnFamily(conf));
+                clusterColumns = tableMetadata.getClusteringColumns();
+                partitionKeyColumns = tableMetadata.getPartitionKey();
+
                 String cqlQuery = CqlConfigHelper.getOutputCql(conf).trim();
                 if (cqlQuery.toLowerCase().startsWith("insert"))
                     throw new UnsupportedOperationException("INSERT with 
CqlRecordWriter is not supported, please use UPDATE/DELETE statement");
                 cql = appendKeyWhereClauses(cqlQuery);
 
-                TTransport transport = 
client.getOutputProtocol().getTransport();
-                if (transport.isOpen())
-                    transport.close();
+                client.close();
             }
             else
             {
@@ -133,7 +152,26 @@ class CqlRecordWriter extends 
AbstractColumnFamilyRecordWriter<Map<String, ByteB
             throw new RuntimeException(e);
         }
     }
-    
+
+    /**
+     * Close this <code>RecordWriter</code> to future operations, but not 
before
+     * flushing out the batched mutations.
+     *
+     * @param context the context of the task
+     * @throws IOException
+     */
+    public void close(TaskAttemptContext context) throws IOException, 
InterruptedException
+    {
+        close();
+    }
+
+    /** Fills the deprecated RecordWriter interface for streaming. */
+    @Deprecated
+    public void close(org.apache.hadoop.mapred.Reporter reporter) throws 
IOException
+    {
+        close();
+    }
+
     @Override
     public void close() throws IOException
     {
@@ -157,7 +195,7 @@ class CqlRecordWriter extends 
AbstractColumnFamilyRecordWriter<Map<String, ByteB
     
     /**
      * If the key is to be associated with a valid value, a mutation is created
-     * for it with the given column family and columns. In the event the value
+     * for it with the given table and columns. In the event the value
      * in the column is missing (i.e., null), then it is marked for
      * {@link Deletion}. Similarly, if the entire value for a key is missing
      * (i.e., null), then the entire key is marked for {@link Deletion}.
@@ -172,25 +210,25 @@ class CqlRecordWriter extends 
AbstractColumnFamilyRecordWriter<Map<String, ByteB
     @Override
     public void write(Map<String, ByteBuffer> keyColumns, List<ByteBuffer> 
values) throws IOException
     {
-        Range<Token> range = ringCache.getRange(getPartitionKey(keyColumns));
+        TokenRange range = ringCache.getRange(getPartitionKey(keyColumns));
 
         // get the client for the given range, or create a new one
-       final InetAddress address = ringCache.getEndpoint(range).get(0);
+       final InetAddress address = ringCache.getEndpoints(range).get(0);
         RangeClient client = clients.get(address);
         if (client == null)
         {
             // haven't seen keys for this range: create new client
-            client = new RangeClient(ringCache.getEndpoint(range));
+            client = new RangeClient(ringCache.getEndpoints(range));
             client.start();
             clients.put(address, client);
         }
 
         // add primary key columns to the bind variables
         List<ByteBuffer> allValues = new ArrayList<ByteBuffer>(values);
-        for (String column : partitionKeyColumns)
-            allValues.add(keyColumns.get(column));
-        for (String column : clusterColumns)
-            allValues.add(keyColumns.get(column));
+        for (ColumnMetadata column : partitionKeyColumns)
+            allValues.add(keyColumns.get(column.getName()));
+        for (ColumnMetadata column : clusterColumns)
+            allValues.add(keyColumns.get(column.getName()));
 
         client.put(allValues);
 
@@ -204,16 +242,50 @@ class CqlRecordWriter extends 
AbstractColumnFamilyRecordWriter<Map<String, ByteB
      * A client that runs in a threadpool and connects to the list of 
endpoints for a particular
      * range. Bound variables for keys in that range are sent to this client 
via a queue.
      */
-    public class RangeClient extends AbstractRangeClient<List<ByteBuffer>>
+    public class RangeClient extends Thread
     {
+        // The list of endpoints for this range
+        protected final List<InetAddress> endpoints;
+        protected Session client;
+        // A bounded queue of incoming mutations for this range
+        protected final BlockingQueue<List<ByteBuffer>> queue = new 
ArrayBlockingQueue<List<ByteBuffer>>(queueSize);
+
+        protected volatile boolean run = true;
+        // we want the caller to know if something went wrong, so we record 
any unrecoverable exception while writing
+        // so we can throw it on the caller's stack when he calls put() again, 
or if there are no more put calls,
+        // when the client is closed.
+        protected volatile IOException lastException;
+
         /**
          * Constructs an {@link RangeClient} for the given endpoints.
          * @param endpoints the possible endpoints to execute the mutations on
          */
         public RangeClient(List<InetAddress> endpoints)
         {
-            super(endpoints);
-         }
+            super("client-" + endpoints);
+            this.endpoints = endpoints;
+        }
+
+        /**
+         * enqueues the given value to Cassandra
+         */
+        public void put(List<ByteBuffer> value) throws IOException
+        {
+            while (true)
+            {
+                if (lastException != null)
+                    throw lastException;
+                try
+                {
+                    if (queue.offer(value, 100, TimeUnit.MILLISECONDS))
+                        break;
+                }
+                catch (InterruptedException e)
+                {
+                    throw new AssertionError(e);
+                }
+            }
+        }
         
         /**
          * Loops collecting cql binded variable values from the queue and 
sending to Cassandra
@@ -234,156 +306,138 @@ class CqlRecordWriter extends 
AbstractColumnFamilyRecordWriter<Map<String, ByteB
                     continue;
                 }
 
-                Iterator<InetAddress> iter = endpoints.iterator();
+                ListIterator<InetAddress> iter = endpoints.listIterator();
                 while (true)
                 {
                     // send the mutation to the last-used endpoint.  first 
time through, this will NPE harmlessly.
+
+                    // attempt to connect to a different endpoint
                     try
                     {
-                        int i = 0;
-                        int itemId = preparedStatement(client);
-                        while (bindVariables != null)
-                        {
-                            client.execute_prepared_cql3_query(itemId, 
bindVariables, ConsistencyLevel.ONE);
-                            i++;
-                            
-                            if (i >= batchThreshold)
-                                break;
-                            
-                            bindVariables = queue.poll();
-                        }
-                        
-                        break;
+                        InetAddress address = iter.next();
+                        String host = address.getHostName();
+                        client = CqlConfigHelper.getOutputCluster(host, 
conf).connect();
                     }
                     catch (Exception e)
                     {
+                        //If connection died due to Interrupt, just try 
connecting to the endpoint again.
+                        if (Thread.interrupted()) {
+                            lastException = new IOException(e);
+                            iter.previous();
+                        }
                         closeInternal();
-                        if (!iter.hasNext())
+
+                        // Most exceptions mean something unexpected went 
wrong to that endpoint, so
+                        // we should try again to another.  Other exceptions 
(auth or invalid request) are fatal.
+                        if ((e instanceof AuthenticationException || e 
instanceof InvalidQueryException) || !iter.hasNext())
                         {
                             lastException = new IOException(e);
                             break outer;
                         }
                     }
 
-                    // attempt to connect to a different endpoint
                     try
                     {
-                        InetAddress address = iter.next();
-                        String host = address.getHostName();
-                        int port = ConfigHelper.getOutputRpcPort(conf);
-                        client = 
CqlOutputFormat.createAuthenticatedClient(host, port, conf);
+                        int i = 0;
+                        PreparedStatement statement = 
preparedStatement(client);
+                        while (bindVariables != null)
+                        {
+                            BoundStatement boundStatement = new 
BoundStatement(statement);
+                            for (int columnPosition = 0; columnPosition < 
bindVariables.size(); columnPosition++)
+                            {
+                                boundStatement.setBytesUnsafe(columnPosition, 
bindVariables.get(columnPosition));
+                            }
+                            client.execute(boundStatement);
+                            i++;
+                            
+                            if (i >= batchThreshold)
+                                break;
+                            bindVariables = queue.poll();
+                        }
+                        break;
                     }
                     catch (Exception e)
                     {
                         closeInternal();
-                        // TException means something unexpected went wrong to 
that endpoint, so
-                        // we should try again to another.  Other exceptions 
(auth or invalid request) are fatal.
-                        if ((!(e instanceof TException)) || !iter.hasNext())
+                        if (!iter.hasNext())
                         {
                             lastException = new IOException(e);
                             break outer;
                         }
                     }
+
                 }
             }
-
             // close all our connections once we are done.
             closeInternal();
         }
 
         /** get prepared statement id from cache, otherwise prepare it from 
Cassandra server*/
-        private int preparedStatement(Cassandra.Client client)
+        private PreparedStatement preparedStatement(Session client)
         {
-            Integer itemId = preparedStatements.get(client);
-            if (itemId == null)
+            PreparedStatement statement = preparedStatements.get(client);
+            if (statement == null)
             {
-                CqlPreparedResult result;
+                PreparedStatement result;
                 try
                 {
-                    result = 
client.prepare_cql3_query(ByteBufferUtil.bytes(cql), Compression.NONE);
+                    result = client.prepare(cql);
                 }
-                catch (TException e)
+                catch (NoHostAvailableException e)
                 {
                     throw new RuntimeException("failed to prepare cql query " 
+ cql, e);
                 }
 
-                Integer previousId = preparedStatements.putIfAbsent(client, 
Integer.valueOf(result.itemId));
-                itemId = previousId == null ? result.itemId : previousId;
+                PreparedStatement previousId = 
preparedStatements.putIfAbsent(client, result);
+                statement = previousId == null ? result : previousId;
             }
-            return itemId;
+            return statement;
         }
-    }
 
-    private ByteBuffer getPartitionKey(Map<String, ByteBuffer> keyColumns)
-    {
-        ByteBuffer partitionKey;
-        if (keyValidator instanceof CompositeType)
+        public void close() throws IOException
         {
-            ByteBuffer[] keys = new ByteBuffer[partitionKeyColumns.length];
-            for (int i = 0; i< keys.length; i++)
-                keys[i] = keyColumns.get(partitionKeyColumns[i]);
+            // stop the run loop.  this will result in closeInternal being 
called by the time join() finishes.
+            run = false;
+            interrupt();
+            try
+            {
+                this.join();
+            }
+            catch (InterruptedException e)
+            {
+                throw new AssertionError(e);
+            }
 
-            partitionKey = CompositeType.build(keys);
-        }
-        else
-        {
-            partitionKey = keyColumns.get(partitionKeyColumns[0]);
+            if (lastException != null)
+                throw lastException;
         }
-        return partitionKey;
-    }
 
-    // FIXME
-    /** retrieve the key validator from system.schema_columnfamilies table */
-    private void retrievePartitionKeyValidator(Cassandra.Client client) throws 
Exception
-    {
-        String keyspace = ConfigHelper.getOutputKeyspace(conf);
-        String cfName = ConfigHelper.getOutputColumnFamily(conf);
-        String query = String.format("SELECT key_validator, key_aliases, 
column_aliases " +
-                                     "FROM %s.%s " +
-                                     "WHERE keyspace_name = '%s' and 
columnfamily_name = '%s'",
-                                     SystemKeyspace.NAME,
-                                     LegacySchemaTables.COLUMNFAMILIES,
-                                     keyspace,
-                                     cfName);
-        CqlResult result = 
client.execute_cql3_query(ByteBufferUtil.bytes(query), Compression.NONE, 
ConsistencyLevel.ONE);
-
-        Column rawKeyValidator = result.rows.get(0).columns.get(0);
-        String validator = 
ByteBufferUtil.string(ByteBuffer.wrap(rawKeyValidator.getValue()));
-        keyValidator = parseType(validator);
-        
-        Column rawPartitionKeys = result.rows.get(0).columns.get(1);
-        String keyString = 
ByteBufferUtil.string(ByteBuffer.wrap(rawPartitionKeys.getValue()));
-        logger.debug("partition keys: {}", keyString);
-
-        List<String> keys = FBUtilities.fromJsonList(keyString);
-        partitionKeyColumns = new String[keys.size()];
-        int i = 0;
-        for (String key : keys)
+
+        protected void closeInternal()
         {
-            partitionKeyColumns[i] = key;
-            i++;
+            if (client != null)
+            {
+                client.close();;
+            }
         }
-
-        Column rawClusterColumns = result.rows.get(0).columns.get(2);
-        String clusterColumnString = 
ByteBufferUtil.string(ByteBuffer.wrap(rawClusterColumns.getValue()));
-
-        logger.debug("cluster columns: {}", clusterColumnString);
-        clusterColumns = FBUtilities.fromJsonList(clusterColumnString);
     }
 
-    private AbstractType<?> parseType(String type) throws 
ConfigurationException
+    private ByteBuffer getPartitionKey(Map<String, ByteBuffer> keyColumns)
     {
-        try
+        ByteBuffer partitionKey;
+        if (partitionKeyColumns.size() > 1)
         {
-            // always treat counters like longs, specifically CCT.serialize is 
not what we need
-            if (type != null && 
type.equals("org.apache.cassandra.db.marshal.CounterColumnType"))
-                return LongType.instance;
-            return TypeParser.parse(type);
+            ByteBuffer[] keys = new ByteBuffer[partitionKeyColumns.size()];
+            for (int i = 0; i< keys.length; i++)
+                keys[i] = keyColumns.get(partitionKeyColumns.get(i).getName());
+
+            partitionKey = CompositeType.build(keys);
         }
-        catch (SyntaxException e)
+        else
         {
-            throw new ConfigurationException(e.getMessage(), e);
+            partitionKey = 
keyColumns.get(partitionKeyColumns.get(0).getName());
         }
+        return partitionKey;
     }
 
     /**
@@ -393,10 +447,10 @@ class CqlRecordWriter extends 
AbstractColumnFamilyRecordWriter<Map<String, ByteB
     {
         String keyWhereClause = "";
 
-        for (String partitionKey : partitionKeyColumns)
-            keyWhereClause += String.format("%s = ?", keyWhereClause.isEmpty() 
? quote(partitionKey) : (" AND " + quote(partitionKey)));
-        for (String clusterColumn : clusterColumns)
-            keyWhereClause += " AND " + quote(clusterColumn) + " = ?";
+        for (ColumnMetadata partitionKey : partitionKeyColumns)
+            keyWhereClause += String.format("%s = ?", keyWhereClause.isEmpty() 
? quote(partitionKey.getName()) : (" AND " + quote(partitionKey.getName())));
+        for (ColumnMetadata clusterColumn : clusterColumns)
+            keyWhereClause += " AND " + quote(clusterColumn.getName()) + " = 
?";
 
         return cqlQuery + " WHERE " + keyWhereClause;
     }
@@ -406,4 +460,60 @@ class CqlRecordWriter extends 
AbstractColumnFamilyRecordWriter<Map<String, ByteB
     {
         return "\"" + identifier.replaceAll("\"", "\"\"") + "\"";
     }
+
+    class NativeRingCache
+    {
+        private Map<TokenRange, Set<Host>> rangeMap;
+        private Metadata metadata;
+        private final IPartitioner partitioner;
+        private final Configuration conf;
+
+        public NativeRingCache(Configuration conf)
+        {
+            this.conf = conf;
+            this.partitioner = ConfigHelper.getOutputPartitioner(conf);
+            refreshEndpointMap();
+        }
+
+
+        private void refreshEndpointMap()
+        {
+            String keyspace = ConfigHelper.getOutputKeyspace(conf);
+            Session session = 
CqlConfigHelper.getOutputCluster(ConfigHelper.getOutputInitialAddress(conf), 
conf).connect(keyspace);
+            rangeMap = new HashMap<>();
+            metadata = session.getCluster().getMetadata();
+            Set<TokenRange> ranges = metadata.getTokenRanges();
+            for (TokenRange range : ranges)
+            {
+                rangeMap.put(range, metadata.getReplicas(keyspace, range));
+            }
+        }
+
+        public TokenRange getRange(ByteBuffer key)
+        {
+            Token t = partitioner.getToken(key);
+            com.datastax.driver.core.Token driverToken = 
metadata.newToken(partitioner.getTokenFactory().toString(t));
+            for (TokenRange range : rangeMap.keySet())
+            {
+                if (range.contains(driverToken))
+                {
+                    return range;
+                }
+            }
+
+            throw new RuntimeException("Invalid token information returned by 
describe_ring: " + rangeMap);
+        }
+
+        public List<InetAddress> getEndpoints(TokenRange range)
+        {
+            Set<Host> hostSet = rangeMap.get(range);
+            List<Host> hosts = Arrays.asList(rangeMap.get(range).toArray(new 
Host[rangeMap.get(range).size()]));
+            List<InetAddress> addresses = new ArrayList<>(hosts.size());
+            for (Host host: hosts)
+            {
+                addresses.add(host.getAddress());
+            }
+            return addresses;
+        }
+    }
 }

Reply via email to