Repository: cassandra Updated Branches: refs/heads/trunk d40ac784d -> 47d3b7e7a
http://git-wip-us.apache.org/repos/asf/cassandra/blob/47d3b7e7/tools/stress/src/org/apache/cassandra/stress/StressProfile.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/StressProfile.java b/tools/stress/src/org/apache/cassandra/stress/StressProfile.java index 8b59bda..1964c27 100644 --- a/tools/stress/src/org/apache/cassandra/stress/StressProfile.java +++ b/tools/stress/src/org/apache/cassandra/stress/StressProfile.java @@ -29,15 +29,19 @@ import java.net.URI; import java.util.*; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; +import java.util.stream.Collectors; import com.google.common.base.Function; +import com.google.common.collect.Iterables; import com.google.common.util.concurrent.Uninterruptibles; import com.datastax.driver.core.*; import com.datastax.driver.core.exceptions.AlreadyExistsException; import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.cql3.statements.CreateKeyspaceStatement; +import org.apache.cassandra.cql3.statements.CreateTableStatement; import org.apache.cassandra.exceptions.RequestValidationException; import org.apache.cassandra.exceptions.SyntaxException; import org.apache.cassandra.stress.generate.*; @@ -57,12 +61,14 @@ import org.yaml.snakeyaml.Yaml; import org.yaml.snakeyaml.constructor.Constructor; import org.yaml.snakeyaml.error.YAMLException; +import static org.apache.cassandra.io.sstable.CQLSSTableWriter.parseStatement; + public class StressProfile implements Serializable { private String keyspaceCql; private String tableCql; private List<String> extraSchemaDefinitions; - private String seedStr; + public final String seedStr = "seed for stress"; public String keyspaceName; public String tableName; @@ -96,7 +102,6 @@ public class StressProfile implements Serializable keyspaceCql = yaml.keyspace_definition; tableName = yaml.table; tableCql = yaml.table_definition; - seedStr = "seed for stress"; queries = yaml.queries; tokenRangeQueries = yaml.token_range_queries; insert = yaml.insert; @@ -360,6 +365,87 @@ public class StressProfile implements Serializable return new TokenRangeQuery(timer, settings, tableMetaData, tokenRangeIterator, def, isWarmup); } + + public PartitionGenerator getOfflineGenerator() + { + CFMetaData cfMetaData = CFMetaData.compile(tableCql, keyspaceName); + + //Add missing column configs + Iterator<ColumnDefinition> it = cfMetaData.allColumnsInSelectOrder(); + while (it.hasNext()) + { + ColumnDefinition c = it.next(); + if (!columnConfigs.containsKey(c.name.toString())) + columnConfigs.put(c.name.toString(), new GeneratorConfig(seedStr + c.name.toString(), null, null, null)); + } + + List<Generator> partitionColumns = cfMetaData.partitionKeyColumns().stream() + .map(c -> new ColumnInfo(c.name.toString(), c.type.asCQL3Type().toString(), "", columnConfigs.get(c.name.toString()))) + .map(c -> c.getGenerator()) + .collect(Collectors.toList()); + + List<Generator> clusteringColumns = cfMetaData.clusteringColumns().stream() + .map(c -> new ColumnInfo(c.name.toString(), c.type.asCQL3Type().toString(), "", columnConfigs.get(c.name.toString()))) + .map(c -> c.getGenerator()) + .collect(Collectors.toList()); + + List<Generator> regularColumns = com.google.common.collect.Lists.newArrayList(cfMetaData.partitionColumns().selectOrderIterator()).stream() + .map(c -> new ColumnInfo(c.name.toString(), c.type.asCQL3Type().toString(), "", columnConfigs.get(c.name.toString()))) + .map(c -> c.getGenerator()) + .collect(Collectors.toList()); + + return new PartitionGenerator(partitionColumns, clusteringColumns, regularColumns, PartitionGenerator.Order.ARBITRARY); + } + + public CreateTableStatement.RawStatement getCreateStatement() + { + CreateTableStatement.RawStatement createStatement = parseStatement(tableCql, CreateTableStatement.RawStatement.class, "CREATE TABLE"); + createStatement.prepareKeyspace(keyspaceName); + + return createStatement; + } + + public SchemaInsert getOfflineInsert(Timer timer, PartitionGenerator generator, SeedManager seedManager, StressSettings settings) + { + assert tableCql != null; + + CFMetaData cfMetaData = CFMetaData.compile(tableCql, keyspaceName); + + List<ColumnDefinition> allColumns = com.google.common.collect.Lists.newArrayList(cfMetaData.allColumnsInSelectOrder()); + + StringBuilder sb = new StringBuilder(); + sb.append("INSERT INTO ").append(quoteIdentifier(keyspaceName) + "." + quoteIdentifier(tableName)).append(" ("); + StringBuilder value = new StringBuilder(); + for (ColumnDefinition c : allColumns) + { + sb.append(quoteIdentifier(c.name.toString())).append(", "); + value.append("?, "); + } + sb.delete(sb.lastIndexOf(","), sb.length()); + value.delete(value.lastIndexOf(","), value.length()); + sb.append(") ").append("values(").append(value).append(')'); + + + if (insert == null) + insert = new HashMap<>(); + lowerCase(insert); + + partitions = select(settings.insert.batchsize, "partitions", "fixed(1)", insert, OptionDistribution.BUILDER); + selectchance = select(settings.insert.selectRatio, "select", "fixed(1)/1", insert, OptionRatioDistribution.BUILDER); + rowPopulation = select(settings.insert.rowPopulationRatio, "row-population", "fixed(1)/1", insert, OptionRatioDistribution.BUILDER); + + if (generator.maxRowCount > 100 * 1000 * 1000) + System.err.printf("WARNING: You have defined a schema that permits very large partitions (%.0f max rows (>100M))%n", generator.maxRowCount); + + String statement = sb.toString(); + + //CQLTableWriter requires the keyspace name be in the create statement + String tableCreate = tableCql.replaceFirst("\\s+\"?"+tableName+"\"?\\s+", " \""+keyspaceName+"\".\""+tableName+"\" "); + + + return new SchemaInsert(timer, settings, generator, seedManager, selectchance.get(), rowPopulation.get(), thriftInsertId, statement, tableCreate); + } + public SchemaInsert getInsert(Timer timer, PartitionGenerator generator, SeedManager seedManager, StressSettings settings) { if (insertStatement == null) @@ -562,12 +648,18 @@ public class StressProfile implements Serializable Set<ColumnMetadata> keyColumns = com.google.common.collect.Sets.newHashSet(tableMetaData.getPrimaryKey()); for (ColumnMetadata metadata : tableMetaData.getPartitionKey()) - partitionKeys.add(new ColumnInfo(metadata.getName(), metadata.getType(), columnConfigs.get(metadata.getName()))); + partitionKeys.add(new ColumnInfo(metadata.getName(), metadata.getType().getName().toString(), + metadata.getType().isCollection() ? metadata.getType().getTypeArguments().get(0).getName().toString() : "", + columnConfigs.get(metadata.getName()))); for (ColumnMetadata metadata : tableMetaData.getClusteringColumns()) - clusteringColumns.add(new ColumnInfo(metadata.getName(), metadata.getType(), columnConfigs.get(metadata.getName()))); + clusteringColumns.add(new ColumnInfo(metadata.getName(), metadata.getType().getName().toString(), + metadata.getType().isCollection() ? metadata.getType().getTypeArguments().get(0).getName().toString() : "", + columnConfigs.get(metadata.getName()))); for (ColumnMetadata metadata : tableMetaData.getColumns()) if (!keyColumns.contains(metadata)) - valueColumns.add(new ColumnInfo(metadata.getName(), metadata.getType(), columnConfigs.get(metadata.getName()))); + valueColumns.add(new ColumnInfo(metadata.getName(), metadata.getType().getName().toString(), + metadata.getType().isCollection() ? metadata.getType().getTypeArguments().get(0).getName().toString() : "", + columnConfigs.get(metadata.getName()))); } PartitionGenerator newGenerator(StressSettings settings) @@ -587,66 +679,68 @@ public class StressProfile implements Serializable static class ColumnInfo { final String name; - final DataType type; + final String type; + final String collectionType; final GeneratorConfig config; - ColumnInfo(String name, DataType type, GeneratorConfig config) + ColumnInfo(String name, String type, String collectionType, GeneratorConfig config) { this.name = name; this.type = type; + this.collectionType = collectionType; this.config = config; } Generator getGenerator() { - return getGenerator(name, type, config); + return getGenerator(name, type, collectionType, config); } - static Generator getGenerator(final String name, final DataType type, GeneratorConfig config) + static Generator getGenerator(final String name, final String type, final String collectionType, GeneratorConfig config) { - switch (type.getName()) + switch (type.toUpperCase()) { - case ASCII: - case TEXT: - case VARCHAR: + case "ASCII": + case "TEXT": + case "VARCHAR": return new Strings(name, config); - case BIGINT: - case COUNTER: + case "BIGINT": + case "COUNTER": return new Longs(name, config); - case BLOB: + case "BLOB": return new Bytes(name, config); - case BOOLEAN: + case "BOOLEAN": return new Booleans(name, config); - case DECIMAL: + case "DECIMAL": return new BigDecimals(name, config); - case DOUBLE: + case "DOUBLE": return new Doubles(name, config); - case FLOAT: + case "FLOAT": return new Floats(name, config); - case INET: + case "INET": return new Inets(name, config); - case INT: + case "INT": return new Integers(name, config); - case VARINT: + case "VARINT": return new BigIntegers(name, config); - case TIMESTAMP: + case "TIMESTAMP": return new Dates(name, config); - case UUID: + case "UUID": return new UUIDs(name, config); - case TIMEUUID: + case "TIMEUUID": return new TimeUUIDs(name, config); - case TINYINT: + case "TINYINT": return new TinyInts(name, config); - case SMALLINT: + case "SMALLINT": return new SmallInts(name, config); - case TIME: + case "TIME": return new Times(name, config); - case DATE: + case "DATE": return new LocalDates(name, config); - case SET: - return new Sets(name, getGenerator(name, type.getTypeArguments().get(0), config), config); - case LIST: - return new Lists(name, getGenerator(name, type.getTypeArguments().get(0), config), config); + case "SET": + return new Sets(name, getGenerator(name, collectionType, null, config), config); + case "LIST": + return new Lists(name, getGenerator(name, collectionType, null, config), config); default: throw new UnsupportedOperationException("Because of this name: "+name+" if you removed it from the yaml and are still seeing this, make sure to drop table"); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/47d3b7e7/tools/stress/src/org/apache/cassandra/stress/generate/PartitionGenerator.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/PartitionGenerator.java b/tools/stress/src/org/apache/cassandra/stress/generate/PartitionGenerator.java index a7297c5..1230065 100644 --- a/tools/stress/src/org/apache/cassandra/stress/generate/PartitionGenerator.java +++ b/tools/stress/src/org/apache/cassandra/stress/generate/PartitionGenerator.java @@ -22,10 +22,8 @@ package org.apache.cassandra.stress.generate; import java.nio.ByteBuffer; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.NoSuchElementException; +import java.util.*; +import java.util.stream.Collectors; import com.google.common.collect.Iterables; @@ -71,7 +69,7 @@ public class PartitionGenerator } this.maxRowCount = maxRowCount; this.minRowCount = minRowCount; - this.indexMap = new HashMap<>(); + this.indexMap = new LinkedHashMap<>(); int i = 0; for (Generator generator : partitionKey) indexMap.put(generator.name, --i); @@ -110,4 +108,9 @@ public class PartitionGenerator return clusteringComponents.get(c).type.compose(v); return valueComponents.get(c - clusteringComponents.size()).type.compose(v); } + + public List<String> getColumnNames() + { + return indexMap.keySet().stream().collect(Collectors.toList()); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/47d3b7e7/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaInsert.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaInsert.java b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaInsert.java index d9fcac8..9eebce2 100644 --- a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaInsert.java +++ b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaInsert.java @@ -21,15 +21,27 @@ package org.apache.cassandra.stress.operations.userdefined; */ +import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.stream.Collectors; + +import com.google.common.util.concurrent.Uninterruptibles; import com.datastax.driver.core.BatchStatement; import com.datastax.driver.core.BoundStatement; import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.Statement; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.io.sstable.CQLSSTableWriter; +import org.apache.cassandra.stress.WorkManager; import org.apache.cassandra.stress.generate.*; import org.apache.cassandra.stress.settings.StressSettings; import org.apache.cassandra.stress.util.JavaDriverClient; @@ -39,12 +51,27 @@ import org.apache.cassandra.stress.util.Timer; public class SchemaInsert extends SchemaStatement { + private final String tableSchema; + private final String insertStatement; private final BatchStatement.Type batchType; public SchemaInsert(Timer timer, StressSettings settings, PartitionGenerator generator, SeedManager seedManager, Distribution batchSize, RatioDistribution useRatio, RatioDistribution rowPopulation, Integer thriftId, PreparedStatement statement, ConsistencyLevel cl, BatchStatement.Type batchType) { - super(timer, settings, new DataSpec(generator, seedManager, batchSize, useRatio, rowPopulation), statement, thriftId, cl); + super(timer, settings, new DataSpec(generator, seedManager, batchSize, useRatio, rowPopulation), statement, statement.getVariables().asList().stream().map(d -> d.getName()).collect(Collectors.toList()), thriftId, cl); this.batchType = batchType; + this.insertStatement = null; + this.tableSchema = null; + } + + /** + * Special constructor for offline use + */ + public SchemaInsert(Timer timer, StressSettings settings, PartitionGenerator generator, SeedManager seedManager, RatioDistribution useRatio, RatioDistribution rowPopulation, Integer thriftId, String statement, String tableSchema) + { + super(timer, settings, new DataSpec(generator, seedManager, new DistributionFixed(1), useRatio, rowPopulation), null, generator.getColumnNames(), thriftId, ConsistencyLevel.ONE); + this.batchType = BatchStatement.Type.UNLOGGED; + this.insertStatement = statement; + this.tableSchema = tableSchema; } private class JavaDriverRun extends Runner @@ -113,6 +140,31 @@ public class SchemaInsert extends SchemaStatement } } + private class OfflineRun extends Runner + { + final CQLSSTableWriter writer; + + OfflineRun(CQLSSTableWriter writer) + { + this.writer = writer; + } + + public boolean run() throws Exception + { + for (PartitionIterator iterator : partitions) + { + while (iterator.hasNext()) + { + Row row = iterator.next(); + writer.rawAddRow(thriftRowArgs(row)); + rowCount += 1; + } + } + + return true; + } + } + @Override public void run(JavaDriverClient client) throws IOException { @@ -130,4 +182,27 @@ public class SchemaInsert extends SchemaStatement timeWithRetry(new ThriftRun(client)); } + public CQLSSTableWriter createWriter(ColumnFamilyStore cfs, int bufferSize, boolean makeRangeAware) + { + return CQLSSTableWriter.builder() + .withCfs(cfs) + .withBufferSizeInMB(bufferSize) + .forTable(tableSchema) + .using(insertStatement) + .rangeAware(makeRangeAware) + .build(); + } + + public void runOffline(CQLSSTableWriter writer, WorkManager workManager) throws Exception + { + OfflineRun offline = new OfflineRun(writer); + + while (true) + { + if (ready(workManager) == 0) + break; + + offline.run(); + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/47d3b7e7/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaQuery.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaQuery.java b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaQuery.java index 9b5c4ae..0d8e756 100644 --- a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaQuery.java +++ b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaQuery.java @@ -26,6 +26,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.Random; +import java.util.stream.Collectors; import com.datastax.driver.core.BoundStatement; import com.datastax.driver.core.PreparedStatement; @@ -53,7 +54,8 @@ public class SchemaQuery extends SchemaStatement public SchemaQuery(Timer timer, StressSettings settings, PartitionGenerator generator, SeedManager seedManager, Integer thriftId, PreparedStatement statement, ConsistencyLevel cl, ArgSelect argSelect) { - super(timer, settings, new DataSpec(generator, seedManager, new DistributionFixed(1), settings.insert.rowPopulationRatio.get(), argSelect == ArgSelect.MULTIROW ? statement.getVariables().size() : 1), statement, thriftId, cl); + super(timer, settings, new DataSpec(generator, seedManager, new DistributionFixed(1), settings.insert.rowPopulationRatio.get(), argSelect == ArgSelect.MULTIROW ? statement.getVariables().size() : 1), statement, + statement.getVariables().asList().stream().map(d -> d.getName()).collect(Collectors.toList()), thriftId, cl); this.argSelect = argSelect; randomBuffer = new Object[argumentIndex.length][argumentIndex.length]; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/47d3b7e7/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java index 2e03c69..c83787b 100644 --- a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java +++ b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java @@ -38,7 +38,6 @@ import org.apache.cassandra.stress.util.Timer; public abstract class SchemaStatement extends PartitionOperation { - final PreparedStatement statement; final Integer thriftId; final ConsistencyLevel cl; @@ -47,24 +46,27 @@ public abstract class SchemaStatement extends PartitionOperation final ColumnDefinitions definitions; public SchemaStatement(Timer timer, StressSettings settings, DataSpec spec, - PreparedStatement statement, Integer thriftId, ConsistencyLevel cl) + PreparedStatement statement, List<String> bindNames, Integer thriftId, ConsistencyLevel cl) { super(timer, settings, spec); this.statement = statement; this.thriftId = thriftId; this.cl = cl; - argumentIndex = new int[statement.getVariables().size()]; + argumentIndex = new int[bindNames.size()]; bindBuffer = new Object[argumentIndex.length]; - definitions = statement.getVariables(); + definitions = statement != null ? statement.getVariables() : null; int i = 0; - for (ColumnDefinitions.Definition definition : definitions) - argumentIndex[i++] = spec.partitionGenerator.indexOf(definition.getName()); + for (String name : bindNames) + argumentIndex[i++] = spec.partitionGenerator.indexOf(name); - statement.setConsistencyLevel(JavaDriverClient.from(cl)); + if (statement != null) + statement.setConsistencyLevel(JavaDriverClient.from(cl)); } BoundStatement bindRow(Row row) { + assert statement != null; + for (int i = 0 ; i < argumentIndex.length ; i++) { Object value = row.get(argumentIndex[i]);