Repository: cassandra
Updated Branches:
  refs/heads/trunk d40ac784d -> 47d3b7e7a


http://git-wip-us.apache.org/repos/asf/cassandra/blob/47d3b7e7/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressProfile.java 
b/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
index 8b59bda..1964c27 100644
--- a/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
+++ b/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
@@ -29,15 +29,19 @@ import java.net.URI;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 
 import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
 import com.google.common.util.concurrent.Uninterruptibles;
 
 import com.datastax.driver.core.*;
 import com.datastax.driver.core.exceptions.AlreadyExistsException;
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.statements.CreateKeyspaceStatement;
+import org.apache.cassandra.cql3.statements.CreateTableStatement;
 import org.apache.cassandra.exceptions.RequestValidationException;
 import org.apache.cassandra.exceptions.SyntaxException;
 import org.apache.cassandra.stress.generate.*;
@@ -57,12 +61,14 @@ import org.yaml.snakeyaml.Yaml;
 import org.yaml.snakeyaml.constructor.Constructor;
 import org.yaml.snakeyaml.error.YAMLException;
 
+import static org.apache.cassandra.io.sstable.CQLSSTableWriter.parseStatement;
+
 public class StressProfile implements Serializable
 {
     private String keyspaceCql;
     private String tableCql;
     private List<String> extraSchemaDefinitions;
-    private String seedStr;
+    public final String seedStr = "seed for stress";
 
     public String keyspaceName;
     public String tableName;
@@ -96,7 +102,6 @@ public class StressProfile implements Serializable
         keyspaceCql = yaml.keyspace_definition;
         tableName = yaml.table;
         tableCql = yaml.table_definition;
-        seedStr = "seed for stress";
         queries = yaml.queries;
         tokenRangeQueries = yaml.token_range_queries;
         insert = yaml.insert;
@@ -360,6 +365,87 @@ public class StressProfile implements Serializable
         return new TokenRangeQuery(timer, settings, tableMetaData, 
tokenRangeIterator, def, isWarmup);
     }
 
+
+    public PartitionGenerator getOfflineGenerator()
+    {
+        CFMetaData cfMetaData = CFMetaData.compile(tableCql, keyspaceName);
+
+        //Add missing column configs
+        Iterator<ColumnDefinition> it = cfMetaData.allColumnsInSelectOrder();
+        while (it.hasNext())
+        {
+            ColumnDefinition c = it.next();
+            if (!columnConfigs.containsKey(c.name.toString()))
+                columnConfigs.put(c.name.toString(), new 
GeneratorConfig(seedStr + c.name.toString(), null, null, null));
+        }
+
+        List<Generator> partitionColumns = 
cfMetaData.partitionKeyColumns().stream()
+                                                     .map(c -> new 
ColumnInfo(c.name.toString(), c.type.asCQL3Type().toString(), "", 
columnConfigs.get(c.name.toString())))
+                                                     .map(c -> 
c.getGenerator())
+                                                     
.collect(Collectors.toList());
+
+        List<Generator> clusteringColumns = 
cfMetaData.clusteringColumns().stream()
+                                                             .map(c -> new 
ColumnInfo(c.name.toString(), c.type.asCQL3Type().toString(), "", 
columnConfigs.get(c.name.toString())))
+                                                             .map(c -> 
c.getGenerator())
+                                                             
.collect(Collectors.toList());
+
+        List<Generator> regularColumns = 
com.google.common.collect.Lists.newArrayList(cfMetaData.partitionColumns().selectOrderIterator()).stream()
+                                                                               
                              .map(c -> new ColumnInfo(c.name.toString(), 
c.type.asCQL3Type().toString(), "", columnConfigs.get(c.name.toString())))
+                                                                               
                              .map(c -> c.getGenerator())
+                                                                               
                              .collect(Collectors.toList());
+
+        return new PartitionGenerator(partitionColumns, clusteringColumns, 
regularColumns, PartitionGenerator.Order.ARBITRARY);
+    }
+
+    public CreateTableStatement.RawStatement getCreateStatement()
+    {
+        CreateTableStatement.RawStatement createStatement = 
parseStatement(tableCql, CreateTableStatement.RawStatement.class, "CREATE 
TABLE");
+        createStatement.prepareKeyspace(keyspaceName);
+
+        return createStatement;
+    }
+
+    public SchemaInsert getOfflineInsert(Timer timer, PartitionGenerator 
generator, SeedManager seedManager, StressSettings settings)
+    {
+        assert tableCql != null;
+
+        CFMetaData cfMetaData = CFMetaData.compile(tableCql, keyspaceName);
+
+        List<ColumnDefinition> allColumns = 
com.google.common.collect.Lists.newArrayList(cfMetaData.allColumnsInSelectOrder());
+
+        StringBuilder sb = new StringBuilder();
+        sb.append("INSERT INTO ").append(quoteIdentifier(keyspaceName) + "." + 
quoteIdentifier(tableName)).append(" (");
+        StringBuilder value = new StringBuilder();
+        for (ColumnDefinition c : allColumns)
+        {
+            sb.append(quoteIdentifier(c.name.toString())).append(", ");
+            value.append("?, ");
+        }
+        sb.delete(sb.lastIndexOf(","), sb.length());
+        value.delete(value.lastIndexOf(","), value.length());
+        sb.append(") ").append("values(").append(value).append(')');
+
+
+        if (insert == null)
+            insert = new HashMap<>();
+        lowerCase(insert);
+
+        partitions = select(settings.insert.batchsize, "partitions", 
"fixed(1)", insert, OptionDistribution.BUILDER);
+        selectchance = select(settings.insert.selectRatio, "select", 
"fixed(1)/1", insert, OptionRatioDistribution.BUILDER);
+        rowPopulation = select(settings.insert.rowPopulationRatio, 
"row-population", "fixed(1)/1", insert, OptionRatioDistribution.BUILDER);
+
+        if (generator.maxRowCount > 100 * 1000 * 1000)
+            System.err.printf("WARNING: You have defined a schema that permits 
very large partitions (%.0f max rows (>100M))%n", generator.maxRowCount);
+
+        String statement = sb.toString();
+
+        //CQLTableWriter requires the keyspace name be in the create statement
+        String tableCreate = 
tableCql.replaceFirst("\\s+\"?"+tableName+"\"?\\s+", " 
\""+keyspaceName+"\".\""+tableName+"\" ");
+
+
+        return new SchemaInsert(timer, settings, generator, seedManager, 
selectchance.get(), rowPopulation.get(), thriftInsertId, statement, 
tableCreate);
+    }
+
     public SchemaInsert getInsert(Timer timer, PartitionGenerator generator, 
SeedManager seedManager, StressSettings settings)
     {
         if (insertStatement == null)
@@ -562,12 +648,18 @@ public class StressProfile implements Serializable
             Set<ColumnMetadata> keyColumns = 
com.google.common.collect.Sets.newHashSet(tableMetaData.getPrimaryKey());
 
             for (ColumnMetadata metadata : tableMetaData.getPartitionKey())
-                partitionKeys.add(new ColumnInfo(metadata.getName(), 
metadata.getType(), columnConfigs.get(metadata.getName())));
+                partitionKeys.add(new ColumnInfo(metadata.getName(), 
metadata.getType().getName().toString(),
+                                                 
metadata.getType().isCollection() ? 
metadata.getType().getTypeArguments().get(0).getName().toString() : "",
+                                                 
columnConfigs.get(metadata.getName())));
             for (ColumnMetadata metadata : 
tableMetaData.getClusteringColumns())
-                clusteringColumns.add(new ColumnInfo(metadata.getName(), 
metadata.getType(), columnConfigs.get(metadata.getName())));
+                clusteringColumns.add(new ColumnInfo(metadata.getName(), 
metadata.getType().getName().toString(),
+                                                     
metadata.getType().isCollection() ? 
metadata.getType().getTypeArguments().get(0).getName().toString() : "",
+                                                     
columnConfigs.get(metadata.getName())));
             for (ColumnMetadata metadata : tableMetaData.getColumns())
                 if (!keyColumns.contains(metadata))
-                    valueColumns.add(new ColumnInfo(metadata.getName(), 
metadata.getType(), columnConfigs.get(metadata.getName())));
+                    valueColumns.add(new ColumnInfo(metadata.getName(), 
metadata.getType().getName().toString(),
+                                                    
metadata.getType().isCollection() ? 
metadata.getType().getTypeArguments().get(0).getName().toString() : "",
+                                                    
columnConfigs.get(metadata.getName())));
         }
 
         PartitionGenerator newGenerator(StressSettings settings)
@@ -587,66 +679,68 @@ public class StressProfile implements Serializable
     static class ColumnInfo
     {
         final String name;
-        final DataType type;
+        final String type;
+        final String collectionType;
         final GeneratorConfig config;
 
-        ColumnInfo(String name, DataType type, GeneratorConfig config)
+        ColumnInfo(String name, String type, String collectionType, 
GeneratorConfig config)
         {
             this.name = name;
             this.type = type;
+            this.collectionType = collectionType;
             this.config = config;
         }
 
         Generator getGenerator()
         {
-            return getGenerator(name, type, config);
+            return getGenerator(name, type, collectionType, config);
         }
 
-        static Generator getGenerator(final String name, final DataType type, 
GeneratorConfig config)
+        static Generator getGenerator(final String name, final String type, 
final String collectionType, GeneratorConfig config)
         {
-            switch (type.getName())
+            switch (type.toUpperCase())
             {
-                case ASCII:
-                case TEXT:
-                case VARCHAR:
+                case "ASCII":
+                case "TEXT":
+                case "VARCHAR":
                     return new Strings(name, config);
-                case BIGINT:
-                case COUNTER:
+                case "BIGINT":
+                case "COUNTER":
                     return new Longs(name, config);
-                case BLOB:
+                case "BLOB":
                     return new Bytes(name, config);
-                case BOOLEAN:
+                case "BOOLEAN":
                     return new Booleans(name, config);
-                case DECIMAL:
+                case "DECIMAL":
                     return new BigDecimals(name, config);
-                case DOUBLE:
+                case "DOUBLE":
                     return new Doubles(name, config);
-                case FLOAT:
+                case "FLOAT":
                     return new Floats(name, config);
-                case INET:
+                case "INET":
                     return new Inets(name, config);
-                case INT:
+                case "INT":
                     return new Integers(name, config);
-                case VARINT:
+                case "VARINT":
                     return new BigIntegers(name, config);
-                case TIMESTAMP:
+                case "TIMESTAMP":
                     return new Dates(name, config);
-                case UUID:
+                case "UUID":
                     return new UUIDs(name, config);
-                case TIMEUUID:
+                case "TIMEUUID":
                     return new TimeUUIDs(name, config);
-                case TINYINT:
+                case "TINYINT":
                     return new TinyInts(name, config);
-                case SMALLINT:
+                case "SMALLINT":
                     return new SmallInts(name, config);
-                case TIME:
+                case "TIME":
                     return new Times(name, config);
-                case DATE:
+                case "DATE":
                     return new LocalDates(name, config);
-                case SET:
-                    return new Sets(name, getGenerator(name, 
type.getTypeArguments().get(0), config), config);
-                case LIST:
-                    return new Lists(name, getGenerator(name, 
type.getTypeArguments().get(0), config), config);
+                case "SET":
+                    return new Sets(name, getGenerator(name, collectionType, 
null, config), config);
+                case "LIST":
+                    return new Lists(name, getGenerator(name, collectionType, 
null, config), config);
                 default:
                     throw new UnsupportedOperationException("Because of this 
name: "+name+" if you removed it from the yaml and are still seeing this, make 
sure to drop table");
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47d3b7e7/tools/stress/src/org/apache/cassandra/stress/generate/PartitionGenerator.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/generate/PartitionGenerator.java 
b/tools/stress/src/org/apache/cassandra/stress/generate/PartitionGenerator.java
index a7297c5..1230065 100644
--- 
a/tools/stress/src/org/apache/cassandra/stress/generate/PartitionGenerator.java
+++ 
b/tools/stress/src/org/apache/cassandra/stress/generate/PartitionGenerator.java
@@ -22,10 +22,8 @@ package org.apache.cassandra.stress.generate;
 
 
 import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
+import java.util.*;
+import java.util.stream.Collectors;
 
 import com.google.common.collect.Iterables;
 
@@ -71,7 +69,7 @@ public class PartitionGenerator
         }
         this.maxRowCount = maxRowCount;
         this.minRowCount = minRowCount;
-        this.indexMap = new HashMap<>();
+        this.indexMap = new LinkedHashMap<>();
         int i = 0;
         for (Generator generator : partitionKey)
             indexMap.put(generator.name, --i);
@@ -110,4 +108,9 @@ public class PartitionGenerator
             return clusteringComponents.get(c).type.compose(v);
         return valueComponents.get(c - 
clusteringComponents.size()).type.compose(v);
     }
+
+    public List<String> getColumnNames()
+    {
+        return indexMap.keySet().stream().collect(Collectors.toList());
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47d3b7e7/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaInsert.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaInsert.java
 
b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaInsert.java
index d9fcac8..9eebce2 100644
--- 
a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaInsert.java
+++ 
b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaInsert.java
@@ -21,15 +21,27 @@ package org.apache.cassandra.stress.operations.userdefined;
  */
 
 
+import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
+
+import com.google.common.util.concurrent.Uninterruptibles;
 
 import com.datastax.driver.core.BatchStatement;
 import com.datastax.driver.core.BoundStatement;
 import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.Statement;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.io.sstable.CQLSSTableWriter;
+import org.apache.cassandra.stress.WorkManager;
 import org.apache.cassandra.stress.generate.*;
 import org.apache.cassandra.stress.settings.StressSettings;
 import org.apache.cassandra.stress.util.JavaDriverClient;
@@ -39,12 +51,27 @@ import org.apache.cassandra.stress.util.Timer;
 public class SchemaInsert extends SchemaStatement
 {
 
+    private final String tableSchema;
+    private final String insertStatement;
     private final BatchStatement.Type batchType;
 
     public SchemaInsert(Timer timer, StressSettings settings, 
PartitionGenerator generator, SeedManager seedManager, Distribution batchSize, 
RatioDistribution useRatio, RatioDistribution rowPopulation, Integer thriftId, 
PreparedStatement statement, ConsistencyLevel cl, BatchStatement.Type batchType)
     {
-        super(timer, settings, new DataSpec(generator, seedManager, batchSize, 
useRatio, rowPopulation), statement, thriftId, cl);
+        super(timer, settings, new DataSpec(generator, seedManager, batchSize, 
useRatio, rowPopulation), statement, 
statement.getVariables().asList().stream().map(d -> 
d.getName()).collect(Collectors.toList()), thriftId, cl);
         this.batchType = batchType;
+        this.insertStatement = null;
+        this.tableSchema = null;
+    }
+
+    /**
+     * Special constructor for offline use
+     */
+    public SchemaInsert(Timer timer, StressSettings settings, 
PartitionGenerator generator, SeedManager seedManager, RatioDistribution 
useRatio, RatioDistribution rowPopulation, Integer thriftId, String statement, 
String tableSchema)
+    {
+        super(timer, settings, new DataSpec(generator, seedManager, new 
DistributionFixed(1), useRatio, rowPopulation), null, 
generator.getColumnNames(), thriftId, ConsistencyLevel.ONE);
+        this.batchType = BatchStatement.Type.UNLOGGED;
+        this.insertStatement = statement;
+        this.tableSchema = tableSchema;
     }
 
     private class JavaDriverRun extends Runner
@@ -113,6 +140,31 @@ public class SchemaInsert extends SchemaStatement
         }
     }
 
+    private class OfflineRun extends Runner
+    {
+        final CQLSSTableWriter writer;
+
+        OfflineRun(CQLSSTableWriter writer)
+        {
+            this.writer = writer;
+        }
+
+        public boolean run() throws Exception
+        {
+            for (PartitionIterator iterator : partitions)
+            {
+                while (iterator.hasNext())
+                {
+                    Row row = iterator.next();
+                    writer.rawAddRow(thriftRowArgs(row));
+                    rowCount += 1;
+                }
+            }
+
+            return true;
+        }
+    }
+
     @Override
     public void run(JavaDriverClient client) throws IOException
     {
@@ -130,4 +182,27 @@ public class SchemaInsert extends SchemaStatement
         timeWithRetry(new ThriftRun(client));
     }
 
+    public CQLSSTableWriter createWriter(ColumnFamilyStore cfs, int 
bufferSize, boolean makeRangeAware)
+    {
+        return CQLSSTableWriter.builder()
+                               .withCfs(cfs)
+                               .withBufferSizeInMB(bufferSize)
+                               .forTable(tableSchema)
+                               .using(insertStatement)
+                               .rangeAware(makeRangeAware)
+                               .build();
+    }
+
+    public void runOffline(CQLSSTableWriter writer, WorkManager workManager) 
throws Exception
+    {
+        OfflineRun offline = new OfflineRun(writer);
+
+        while (true)
+        {
+            if (ready(workManager) == 0)
+                break;
+
+            offline.run();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47d3b7e7/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaQuery.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaQuery.java
 
b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaQuery.java
index 9b5c4ae..0d8e756 100644
--- 
a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaQuery.java
+++ 
b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaQuery.java
@@ -26,6 +26,7 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
+import java.util.stream.Collectors;
 
 import com.datastax.driver.core.BoundStatement;
 import com.datastax.driver.core.PreparedStatement;
@@ -53,7 +54,8 @@ public class SchemaQuery extends SchemaStatement
 
     public SchemaQuery(Timer timer, StressSettings settings, 
PartitionGenerator generator, SeedManager seedManager, Integer thriftId, 
PreparedStatement statement, ConsistencyLevel cl, ArgSelect argSelect)
     {
-        super(timer, settings, new DataSpec(generator, seedManager, new 
DistributionFixed(1), settings.insert.rowPopulationRatio.get(), argSelect == 
ArgSelect.MULTIROW ? statement.getVariables().size() : 1), statement, thriftId, 
cl);
+        super(timer, settings, new DataSpec(generator, seedManager, new 
DistributionFixed(1), settings.insert.rowPopulationRatio.get(), argSelect == 
ArgSelect.MULTIROW ? statement.getVariables().size() : 1), statement,
+              statement.getVariables().asList().stream().map(d -> 
d.getName()).collect(Collectors.toList()), thriftId, cl);
         this.argSelect = argSelect;
         randomBuffer = new Object[argumentIndex.length][argumentIndex.length];
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47d3b7e7/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java
 
b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java
index 2e03c69..c83787b 100644
--- 
a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java
+++ 
b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java
@@ -38,7 +38,6 @@ import org.apache.cassandra.stress.util.Timer;
 
 public abstract class SchemaStatement extends PartitionOperation
 {
-
     final PreparedStatement statement;
     final Integer thriftId;
     final ConsistencyLevel cl;
@@ -47,24 +46,27 @@ public abstract class SchemaStatement extends 
PartitionOperation
     final ColumnDefinitions definitions;
 
     public SchemaStatement(Timer timer, StressSettings settings, DataSpec spec,
-                           PreparedStatement statement, Integer thriftId, 
ConsistencyLevel cl)
+                           PreparedStatement statement, List<String> 
bindNames, Integer thriftId, ConsistencyLevel cl)
     {
         super(timer, settings, spec);
         this.statement = statement;
         this.thriftId = thriftId;
         this.cl = cl;
-        argumentIndex = new int[statement.getVariables().size()];
+        argumentIndex = new int[bindNames.size()];
         bindBuffer = new Object[argumentIndex.length];
-        definitions = statement.getVariables();
+        definitions = statement != null ? statement.getVariables() : null;
         int i = 0;
-        for (ColumnDefinitions.Definition definition : definitions)
-            argumentIndex[i++] = 
spec.partitionGenerator.indexOf(definition.getName());
+        for (String name : bindNames)
+            argumentIndex[i++] = spec.partitionGenerator.indexOf(name);
 
-        statement.setConsistencyLevel(JavaDriverClient.from(cl));
+        if (statement != null)
+            statement.setConsistencyLevel(JavaDriverClient.from(cl));
     }
 
     BoundStatement bindRow(Row row)
     {
+        assert statement != null;
+
         for (int i = 0 ; i < argumentIndex.length ; i++)
         {
             Object value = row.get(argumentIndex[i]);

Reply via email to