Merge branch 'cassandra-2.1' into cassandra-2.2

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8123b3b0
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8123b3b0
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8123b3b0

Branch: refs/heads/trunk
Commit: 8123b3b07dbeee3628d98447651724c731e1170b
Parents: 25de92e 31fc6d2
Author: Robert Stupp <sn...@snazy.de>
Authored: Thu Oct 1 14:16:40 2015 +0200
Committer: Robert Stupp <sn...@snazy.de>
Committed: Thu Oct 1 14:16:40 2015 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../hadoop/cql3/CqlBulkOutputFormat.java        | 32 ++++++++++++++++++++
 .../hadoop/cql3/CqlBulkRecordWriter.java        | 13 +++++++-
 3 files changed, 45 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/8123b3b0/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 45070b2,eec8161..9c70c74
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,16 -1,5 +1,17 @@@
 -2.1.10
 +2.2.2
 + * cqlsh prompt includes name of keyspace after failed `use` statement 
(CASSANDRA-10369)
 + * Configurable page size in cqlsh (CASSANDRA-9855)
 + * Defer default role manager setup until all nodes are on 2.2+ 
(CASSANDRA-9761)
 + * Cancel transaction for sstables we wont redistribute index summary
 +   for (CASSANDRA-10270)
 + * Handle missing RoleManager in config after upgrade to 2.2 
(CASSANDRA-10209) 
 + * Retry snapshot deletion after compaction and gc on Windows 
(CASSANDRA-10222)
 + * Fix failure to start with space in directory path on Windows 
(CASSANDRA-10239)
 + * Fix repair hang when snapshot failed (CASSANDRA-10057)
 + * Fall back to 1/4 commitlog volume for commitlog_total_space on small disks
 +   (CASSANDRA-10199)
 +Merged from 2.1:
+  * Bulk Loader API could not tolerate even node failure (CASSANDRA-10347)
   * Avoid misleading pushed notifications when multiple nodes
     share an rpc_address (CASSANDRA-10052)
   * Fix dropping undroppable when message queue is full (CASSANDRA-10113)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8123b3b0/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkOutputFormat.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/hadoop/cql3/CqlBulkOutputFormat.java
index 3899f8c,7fedb41..051447c
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkOutputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkOutputFormat.java
@@@ -20,10 -20,11 +20,11 @@@ package org.apache.cassandra.hadoop.cql
  
  import java.io.IOException;
  import java.nio.ByteBuffer;
++import java.util.Collection;
  import java.util.List;
  
 -import org.apache.cassandra.config.EncryptionOptions;
 -import org.apache.cassandra.hadoop.AbstractBulkOutputFormat;
  import org.apache.cassandra.hadoop.ConfigHelper;
 +import org.apache.cassandra.hadoop.HadoopCompat;
  import org.apache.hadoop.conf.Configuration;
  import org.apache.hadoop.fs.FileSystem;
  import org.apache.hadoop.mapred.JobConf;
@@@ -79,99 -85,124 +80,130 @@@ public class CqlBulkOutputFormat extend
      {
          return new CqlBulkRecordWriter(context);
      }
 -    
 -    public static void setColumnFamilySchema(Configuration conf, String 
columnFamily, String schema)
 -    {
 -        conf.set(OUTPUT_CQL_SCHEMA_PREFIX + columnFamily, schema);
 -    }
  
 -    public static void setColumnFamilyInsertStatement(Configuration conf, 
String columnFamily, String insertStatement)
 +    @Override
 +    public void checkOutputSpecs(JobContext context)
      {
 -        conf.set(OUTPUT_CQL_INSERT_PREFIX + columnFamily, insertStatement);
 +        checkOutputSpecs(HadoopCompat.getConfiguration(context));
      }
  
 -    public static void setStoragePort(Configuration conf, int port)
 +    private void checkOutputSpecs(Configuration conf)
      {
 -        conf.set(OUTPUT_CQL_STORAGE_PORT, "" + port);
 +        if (ConfigHelper.getOutputKeyspace(conf) == null)
 +        {
 +            throw new UnsupportedOperationException("you must set the 
keyspace with setTable()");
 +        }
      }
  
 -    public static void setSSLStoragePort(Configuration conf, int port)
 +    /** Fills the deprecated OutputFormat interface for streaming. */
 +    @Deprecated
 +    public void checkOutputSpecs(org.apache.hadoop.fs.FileSystem filesystem, 
org.apache.hadoop.mapred.JobConf job) throws IOException
      {
 -        conf.set(OUTPUT_CQL_SSL_STORAGE_PORT, "" + port);
 +        checkOutputSpecs(job);
      }
  
 -    public static void setInternodeEncryption(Configuration conf, String 
encrypt)
 +    @Override
 +    public OutputCommitter getOutputCommitter(TaskAttemptContext context) 
throws IOException, InterruptedException
      {
 -        conf.set(INTERNODE_ENCRYPTION, encrypt);
 +        return new NullOutputCommitter();
      }
 -
 -    public static void setServerKeystore(Configuration conf, String keystore)
 +    
 +    public static void setTableSchema(Configuration conf, String 
columnFamily, String schema)
      {
 -        conf.set(SERVER_KEYSTORE, keystore);
 +        conf.set(OUTPUT_CQL_SCHEMA_PREFIX + columnFamily, schema);
      }
  
 -    public static void setServerKeystorePassword(Configuration conf, String 
keystorePass)
 +    public static void setTableInsertStatement(Configuration conf, String 
columnFamily, String insertStatement)
      {
 -        conf.set(SERVER_KEYSTORE_PASSWORD, keystorePass);
 +        conf.set(OUTPUT_CQL_INSERT_PREFIX + columnFamily, insertStatement);
      }
 -
 -    public static void setServerTruststore(Configuration conf, String 
truststore)
 +    
 +    public static String getTableSchema(Configuration conf, String 
columnFamily)
      {
 -        conf.set(SERVER_TRUSTSTORE, truststore);
 +        String schema = conf.get(OUTPUT_CQL_SCHEMA_PREFIX + columnFamily);
 +        if (schema == null)
 +        { 
 +            throw new UnsupportedOperationException("You must set the Table 
schema using setTableSchema.");
 +        }
 +        return schema; 
      }
 -
 -    public static void setServerTruststorePassword(Configuration conf, String 
truststorePass)
 +    
 +    public static String getTableInsertStatement(Configuration conf, String 
columnFamily)
      {
 -        conf.set(SERVER_TRUSTSTORE_PASSWORD, truststorePass);
 +        String insert = conf.get(OUTPUT_CQL_INSERT_PREFIX + columnFamily); 
 +        if (insert == null)
 +        {
 +            throw new UnsupportedOperationException("You must set the Table 
insert statement using setTableSchema.");
 +        }
 +        return insert;
      }
 -
 -    public static void setServerCipherSuites(Configuration conf, String 
cipherSuites)
 +    
 +    public static void setDeleteSourceOnSuccess(Configuration conf, boolean 
deleteSrc)
      {
 -        conf.set(SERVER_CIPHER_SUITES, cipherSuites);
 +        conf.setBoolean(DELETE_SOURCE, deleteSrc);
      }
 -
 -    public static int getStoragePort(Configuration conf)
 +    
 +    public static boolean getDeleteSourceOnSuccess(Configuration conf)
      {
 -        return conf.getInt(OUTPUT_CQL_STORAGE_PORT, DEFAULT_STORAGE_PORT);
 +        return conf.getBoolean(DELETE_SOURCE, false);
      }
 -
 -    public static int getSSLStoragePort(Configuration conf)
 +    
 +    public static void setTableAlias(Configuration conf, String alias, String 
columnFamily)
      {
 -        return conf.getInt(OUTPUT_CQL_SSL_STORAGE_PORT, 
DEFAULT_SSL_STORAGE_PORT);
 +        conf.set(TABLE_ALIAS_PREFIX + alias, columnFamily);
      }
 -
 -    public static String getInternodeEncryption(Configuration conf)
 +    
 +    public static String getTableForAlias(Configuration conf, String alias)
      {
 -        return conf.get(INTERNODE_ENCRYPTION, 
EncryptionOptions.ServerEncryptionOptions.InternodeEncryption.none.name());
 +        return conf.get(TABLE_ALIAS_PREFIX + alias);
      }
  
 -    public static String getServerKeystore(Configuration conf)
++    /**
++     * Set the hosts to ignore as comma delimited values.
++     * Data will not be bulk loaded onto the ignored nodes.
++     * @param conf job configuration
++     * @param ignoreNodesCsv a comma delimited list of nodes to ignore
++     */
++    public static void setIgnoreHosts(Configuration conf, String 
ignoreNodesCsv)
+     {
 -        return conf.get(SERVER_KEYSTORE);
++        conf.set(CqlBulkRecordWriter.IGNORE_HOSTS, ignoreNodesCsv);
+     }
+ 
 -    public static String getServerTruststore(Configuration conf)
++    /**
++     * Set the hosts to ignore. Data will not be bulk loaded onto the ignored 
nodes.
++     * @param conf job configuration
++     * @param ignoreNodes the nodes to ignore
++     */
++    public static void setIgnoreHosts(Configuration conf, String... 
ignoreNodes)
+     {
 -        return conf.get(SERVER_TRUSTSTORE);
++        conf.setStrings(CqlBulkRecordWriter.IGNORE_HOSTS, ignoreNodes);
+     }
+ 
 -    public static String getServerKeystorePassword(Configuration conf)
++    /**
++     * Get the hosts to ignore as a collection of strings
++     * @param conf job configuration
++     * @return the nodes to ignore as a collection of stirngs
++     */
++    public static Collection<String> getIgnoreHosts(Configuration conf)
+     {
 -        return conf.get(SERVER_KEYSTORE_PASSWORD);
++        return conf.getStringCollection(CqlBulkRecordWriter.IGNORE_HOSTS);
+     }
+ 
 -    public static String getServerTruststorePassword(Configuration conf)
 +    public static class NullOutputCommitter extends OutputCommitter
      {
 -        return conf.get(SERVER_TRUSTSTORE_PASSWORD);
 -    }
 +        public void abortTask(TaskAttemptContext taskContext) { }
  
 -    public static String getServerCipherSuites(Configuration conf)
 -    {
 -        return conf.get(SERVER_CIPHER_SUITES);
 -    }
 +        public void cleanupJob(JobContext jobContext) { }
  
 -    public static String getColumnFamilySchema(Configuration conf, String 
columnFamily)
 -    {
 -        String schema = conf.get(OUTPUT_CQL_SCHEMA_PREFIX + columnFamily);
 -        if (schema == null)
 -        { 
 -            throw new UnsupportedOperationException("You must set the 
ColumnFamily schema using setColumnFamilySchema.");
 -        }
 -        return schema; 
 -    }
 -    
 -    public static String getColumnFamilyInsertStatement(Configuration conf, 
String columnFamily)
 -    {
 -        String insert = conf.get(OUTPUT_CQL_INSERT_PREFIX + columnFamily); 
 -        if (insert == null)
 +        public void commitTask(TaskAttemptContext taskContext) { }
 +
 +        public boolean needsTaskCommit(TaskAttemptContext taskContext)
          {
 -            throw new UnsupportedOperationException("You must set the 
ColumnFamily insert statement using setColumnFamilySchema.");
 +            return false;
          }
 -        return insert;
 -    }
 -    
 -    public static void setDeleteSourceOnSuccess(Configuration conf, boolean 
deleteSrc)
 -    {
 -        conf.setBoolean(DELETE_SOURCE, deleteSrc);
 -    }
 -    
 -    public static boolean getDeleteSourceOnSuccess(Configuration conf)
 -    {
 -        return conf.getBoolean(DELETE_SOURCE, false);
 +
 +        public void setupJob(JobContext jobContext) { }
 +
 +        public void setupTask(TaskAttemptContext taskContext) { }
      }
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8123b3b0/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
index 9e6e23b,ced8aa9..d064e27
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
@@@ -61,26 -59,10 +61,28 @@@ import org.apache.hadoop.util.Progressa
   *
   * @see CqlBulkOutputFormat
   */
 -public class CqlBulkRecordWriter extends AbstractBulkRecordWriter<Object, 
List<ByteBuffer>>
 +public class CqlBulkRecordWriter extends RecordWriter<Object, 
List<ByteBuffer>>
 +        implements org.apache.hadoop.mapred.RecordWriter<Object, 
List<ByteBuffer>>
  {
 +    public final static String OUTPUT_LOCATION = 
"mapreduce.output.bulkoutputformat.localdir";
 +    public final static String BUFFER_SIZE_IN_MB = 
"mapreduce.output.bulkoutputformat.buffersize";
 +    public final static String STREAM_THROTTLE_MBITS = 
"mapreduce.output.bulkoutputformat.streamthrottlembits";
 +    public final static String MAX_FAILED_HOSTS = 
"mapreduce.output.bulkoutputformat.maxfailedhosts";
++    public final static String IGNORE_HOSTS = 
"mapreduce.output.bulkoutputformat.ignorehosts";
 +
 +    private final Logger logger = 
LoggerFactory.getLogger(CqlBulkRecordWriter.class);
 +
 +    protected final Configuration conf;
 +    protected final int maxFailures;
 +    protected final int bufferSize;
 +    protected Closeable writer;
 +    protected SSTableLoader loader;
 +    protected Progressable progress;
 +    protected TaskAttemptContext context;
++    protected final Set<InetAddress> ignores = new HashSet<>();
 +
      private String keyspace;
 -    private String columnFamily;
 +    private String table;
      private String schema;
      private String insertStatement;
      private File outputDir;
@@@ -115,64 -90,45 +117,73 @@@
      {
          // if anything is missing, exceptions will be thrown here, instead of 
on write()
          keyspace = ConfigHelper.getOutputKeyspace(conf);
 -        columnFamily = ConfigHelper.getOutputColumnFamily(conf);
 -        schema = CqlBulkOutputFormat.getColumnFamilySchema(conf, 
columnFamily);
 -        insertStatement = 
CqlBulkOutputFormat.getColumnFamilyInsertStatement(conf, columnFamily);
 -        outputDir = getColumnFamilyDirectory();
 +        table = ConfigHelper.getOutputColumnFamily(conf);
 +        
 +        // check if table is aliased
 +        String aliasedCf = CqlBulkOutputFormat.getTableForAlias(conf, table);
 +        if (aliasedCf != null)
 +            table = aliasedCf;
 +        
 +        schema = CqlBulkOutputFormat.getTableSchema(conf, table);
 +        insertStatement = CqlBulkOutputFormat.getTableInsertStatement(conf, 
table);
 +        outputDir = getTableDirectory();
          deleteSrc = CqlBulkOutputFormat.getDeleteSourceOnSuccess(conf);
 +        try
 +        {
 +            partitioner = ConfigHelper.getInputPartitioner(conf);
 +        }
 +        catch (Exception e)
 +        {
 +            partitioner = Murmur3Partitioner.instance;
 +        }
++        try
++        {
++            for (String hostToIgnore : 
CqlBulkOutputFormat.getIgnoreHosts(conf))
++                ignores.add(InetAddress.getByName(hostToIgnore));
++        }
++        catch (UnknownHostException e)
++        {
++            throw new RuntimeException(("Unknown host: " + e.getMessage()));
++        }
 +    }
 +
 +    protected String getOutputLocation() throws IOException
 +    {
 +        String dir = conf.get(OUTPUT_LOCATION, 
System.getProperty("java.io.tmpdir"));
 +        if (dir == null)
 +            throw new IOException("Output directory not defined, if hadoop is 
not setting java.io.tmpdir then define " + OUTPUT_LOCATION);
 +        return dir;
      }
  
 -    
      private void prepareWriter() throws IOException
      {
 -        try
 +        if (writer == null)
          {
 -            if (writer == null)
 -            {
 -                writer = CQLSSTableWriter.builder()
 -                    .forTable(schema)
 -                    .using(insertStatement)
 -                    .withPartitioner(ConfigHelper.getOutputPartitioner(conf))
 -                    .inDirectory(outputDir)
 -                    
.withBufferSizeInMB(Integer.parseInt(conf.get(BUFFER_SIZE_IN_MB, "64")))
 -                    .build();
 -            }
 -            if (loader == null)
 -            {
 -                BulkLoader.ExternalClient externalClient = 
getExternalClient(conf);
 -                this.loader = new SSTableLoader(outputDir, externalClient, 
new BulkRecordWriter.NullOutputHandler()) {
 -                    @Override
 -                    public void onSuccess(StreamState finalState)
 -                    {
 -                        if (deleteSrc)
 -                            FileUtils.deleteRecursive(outputDir);
 -                    }
 -                };
 -            }
 +            writer = CQLSSTableWriter.builder()
 +                                     .forTable(schema)
 +                                     .using(insertStatement)
 +                                     
.withPartitioner(ConfigHelper.getOutputPartitioner(conf))
 +                                     .inDirectory(outputDir)
 +                                     
.withBufferSizeInMB(Integer.parseInt(conf.get(BUFFER_SIZE_IN_MB, "64")))
 +                                     .withPartitioner(partitioner)
 +                                     .build();
          }
 -        catch (Exception e)
 +
 +        if (loader == null)
          {
 -            throw new IOException(e);
 -        }      
 +            ExternalClient externalClient = new ExternalClient(conf);
 +            externalClient.setTableMetadata(CFMetaData.compile(schema, 
keyspace));
 +
 +            loader = new SSTableLoader(outputDir, externalClient, new 
BulkRecordWriter.NullOutputHandler())
 +            {
 +                @Override
 +                public void onSuccess(StreamState finalState)
 +                {
 +                    if (deleteSrc)
 +                        FileUtils.deleteRecursive(outputDir);
 +                }
 +            };
 +        }
      }
      
      /**
@@@ -220,82 -175,52 +231,82 @@@
          return dir;
      }
  
 -    private BulkLoader.ExternalClient getExternalClient(Configuration conf)
 +    @Override
 +    public void close(TaskAttemptContext context) throws IOException, 
InterruptedException
 +    {
 +        close();
 +    }
 +
 +    /** Fills the deprecated RecordWriter interface for streaming. */
 +    @Deprecated
 +    public void close(org.apache.hadoop.mapred.Reporter reporter) throws 
IOException
 +    {
 +        close();
 +    }
 +
 +    private void close() throws IOException
      {
 -        Set<InetAddress> hosts = new HashSet<InetAddress>();
 -        String outputAddress = ConfigHelper.getOutputInitialAddress(conf);
 -        if (outputAddress == null) outputAddress = "localhost";
 -        String[] nodes = outputAddress.split(",");
 -        for (String node : nodes)
 +        if (writer != null)
          {
 -            try
 +            writer.close();
-             Future<StreamState> future = loader.stream();
++            Future<StreamState> future = loader.stream(ignores);
 +            while (true)
              {
 -                hosts.add(InetAddress.getByName(node));
 +                try
 +                {
 +                    future.get(1000, TimeUnit.MILLISECONDS);
 +                    break;
 +                }
 +                catch (ExecutionException | TimeoutException te)
 +                {
 +                    if (null != progress)
 +                        progress.progress();
 +                    if (null != context)
 +                        HadoopCompat.progress(context);
 +                }
 +                catch (InterruptedException e)
 +                {
 +                    throw new IOException(e);
 +                }
              }
 -            catch (UnknownHostException e)
 +            if (loader.getFailedHosts().size() > 0)
              {
 -                throw new RuntimeException(e);
 +                if (loader.getFailedHosts().size() > maxFailures)
 +                    throw new IOException("Too many hosts failed: " + 
loader.getFailedHosts());
 +                else
 +                    logger.warn("Some hosts failed: {}", 
loader.getFailedHosts());
              }
          }
 -        int rpcPort = ConfigHelper.getOutputRpcPort(conf);
 -        String username = ConfigHelper.getOutputKeyspaceUserName(conf);
 -        String password = ConfigHelper.getOutputKeyspacePassword(conf);
 -        ITransportFactory transportFactory = 
ConfigHelper.getClientTransportFactory(conf);
 -        return new BulkLoader.ExternalClient(hosts,
 -                rpcPort,
 -                username,
 -                password,
 -                transportFactory,
 -                CqlBulkOutputFormat.getStoragePort(conf),
 -                CqlBulkOutputFormat.getSSLStoragePort(conf),
 -                getServerEncryptOpt(conf));
      }
 -
 -    private ServerEncryptionOptions getServerEncryptOpt(Configuration conf)
 +    
 +    public static class ExternalClient extends NativeSSTableLoaderClient
      {
 -        ServerEncryptionOptions encryptOpt = new ServerEncryptionOptions();
 -        String internodeEncrypt = 
CqlBulkOutputFormat.getInternodeEncryption(conf);
 -        if (StringUtils.isEmpty(internodeEncrypt))
 -            return encryptOpt;
 -
 -        encryptOpt.internode_encryption = 
EncryptionOptions.ServerEncryptionOptions.InternodeEncryption.valueOf(internodeEncrypt);
 -        encryptOpt.keystore = CqlBulkOutputFormat.getServerKeystore(conf);
 -        encryptOpt.truststore = CqlBulkOutputFormat.getServerTruststore(conf);
 -        encryptOpt.keystore_password = 
CqlBulkOutputFormat.getServerKeystorePassword(conf);
 -        encryptOpt.truststore_password = 
CqlBulkOutputFormat.getServerTruststorePassword(conf);
 -        String cipherSuites = CqlBulkOutputFormat.getServerCipherSuites(conf);
 -        if (!StringUtils.isEmpty(cipherSuites))
 -            encryptOpt.cipher_suites = cipherSuites.replace(" ", 
"").split(",");
 -        return encryptOpt;
 +        public ExternalClient(Configuration conf)
 +        {
 +            super(resolveHostAddresses(conf),
 +                  CqlConfigHelper.getOutputNativePort(conf),
 +                  ConfigHelper.getOutputKeyspaceUserName(conf),
 +                  ConfigHelper.getOutputKeyspacePassword(conf),
 +                  CqlConfigHelper.getSSLOptions(conf).orNull());
 +        }
 +
 +        private static Collection<InetAddress> 
resolveHostAddresses(Configuration conf)
 +        {
 +            Set<InetAddress> addresses = new HashSet<>();
 +
 +            for (String host : 
ConfigHelper.getOutputInitialAddress(conf).split(","))
 +            {
 +                try
 +                {
 +                    addresses.add(InetAddress.getByName(host));
 +                }
 +                catch (UnknownHostException e)
 +                {
 +                    throw new RuntimeException(e);
 +                }
 +            }
 +
 +            return addresses;
 +        }
      }
  }

Reply via email to