Author: xedin Date: Mon Aug 22 15:31:59 2011 New Revision: 1160305 URL: http://svn.apache.org/viewvc?rev=1160305&view=rev Log: Add query-by-column mode to stress.java patch by Pavel Yaskevich; reviewed by Jonathan Ellis for CASSANDRA-3064
Modified: cassandra/trunk/CHANGES.txt cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/Session.java cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/Reader.java Modified: cassandra/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1160305&r1=1160304&r2=1160305&view=diff ============================================================================== --- cassandra/trunk/CHANGES.txt (original) +++ cassandra/trunk/CHANGES.txt Mon Aug 22 15:31:59 2011 @@ -37,7 +37,7 @@ and few other places responsible for work with SSTable files (CASSANDRA-3040) * Stop reading from sstables once we know we have the most recent columns, for query-by-name requests (CASSANDRA-2498) - + * Add query-by-column mode to stress.java (CASSANDRA-3064) 0.8.5 * fix NPE when encryption_options is unspecified (CASSANDRA-3007) Modified: cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/Session.java URL: http://svn.apache.org/viewvc/cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/Session.java?rev=1160305&r1=1160304&r2=1160305&view=diff ============================================================================== --- cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/Session.java (original) +++ cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/Session.java Mon Aug 22 15:31:59 2011 @@ -20,10 +20,14 @@ package org.apache.cassandra.stress; import java.io.*; import java.net.InetAddress; import java.net.UnknownHostException; +import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import org.apache.cassandra.config.ConfigurationException; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.marshal.TypeParser; import org.apache.commons.cli.*; import org.apache.cassandra.db.ColumnFamilyType; @@ -41,6 +45,9 @@ public class Session implements Serializ // command line options public static final Options availableOptions = new Options(); + public static final String DEFAULT_COMPARATOR = "AsciiType"; + public static final String DEFAULT_VALIDATOR = "BytesType"; + public final AtomicInteger operations; public final AtomicInteger keys; public final AtomicLong latency; @@ -78,6 +85,7 @@ public class Session implements Serializ availableOptions.addOption("V", "average-size-values", false, "Generate column values of average rather than specific size"); availableOptions.addOption("T", "send-to", true, "Send this as a request to the stress daemon at specified address."); availableOptions.addOption("I", "compression", false, "Use sstable compression when creating schema"); + availableOptions.addOption("Q", "query-names", true, "Comma-separated list of column names to retrieve from each row."); } private int numKeys = 1000 * 1000; @@ -109,6 +117,9 @@ public class Session implements Serializ private String replicationStrategy = "org.apache.cassandra.locator.SimpleStrategy"; private Map<String, String> replicationStrategyOptions = new HashMap<String, String>(); + // if we know exactly column names that we want to read (set by -Q option) + public final List<ByteBuffer> columnNames; + public final boolean averageSizeValues; // required by Gaussian distribution. @@ -275,11 +286,30 @@ public class Session implements Serializ { throw new RuntimeException(e); } + + if (cmd.hasOption("Q")) + { + AbstractType comparator = TypeParser.parse(DEFAULT_COMPARATOR); + + String[] names = StringUtils.split(cmd.getOptionValue("Q"), ","); + columnNames = new ArrayList<ByteBuffer>(names.length); + + for (String columnName : names) + columnNames.add(comparator.fromString(columnName)); + } + else + { + columnNames = null; + } } catch (ParseException e) { throw new IllegalArgumentException(e.getMessage(), e); } + catch (ConfigurationException e) + { + throw new IllegalStateException(e.getMessage(), e); + } mean = numDifferentKeys / 2; sigma = numDifferentKeys * STDev; @@ -417,8 +447,11 @@ public class Session implements Serializ // column family for standard columns CfDef standardCfDef = new CfDef("Keyspace1", "Standard1"); - System.out.println("Compression = " + compression); - standardCfDef.setComparator_type("AsciiType").setDefault_validation_class("BytesType").setCompression(compression); + + standardCfDef.setComparator_type(DEFAULT_COMPARATOR) + .setDefault_validation_class(DEFAULT_VALIDATOR) + .setCompression(compression); + if (indexType != null) { ColumnDef standardColumn = new ColumnDef(ByteBufferUtil.bytes("C1"), "BytesType"); @@ -428,7 +461,10 @@ public class Session implements Serializ // column family with super columns CfDef superCfDef = new CfDef("Keyspace1", "Super1").setColumn_type("Super"); - superCfDef.setComparator_type("AsciiType").setSubcomparator_type("AsciiType").setDefault_validation_class("BytesType").setCompression(compression); + superCfDef.setComparator_type(DEFAULT_COMPARATOR) + .setSubcomparator_type(DEFAULT_COMPARATOR) + .setDefault_validation_class(DEFAULT_VALIDATOR) + .setCompression(compression); // column family for standard counters CfDef counterCfDef = new CfDef("Keyspace1", "Counter1").setDefault_validation_class("CounterColumnType").setReplicate_on_write(replicateOnWrite).setCompression(compression); Modified: cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/Reader.java URL: http://svn.apache.org/viewvc/cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/Reader.java?rev=1160305&r1=1160304&r2=1160305&view=diff ============================================================================== --- cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/Reader.java (original) +++ cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/Reader.java Mon Aug 22 15:31:59 2011 @@ -37,16 +37,13 @@ public class Reader extends Operation public void run(Cassandra.Client client) throws IOException { - SliceRange sliceRange = new SliceRange(); - - // start/finish - sliceRange.setStart(new byte[] {}).setFinish(new byte[] {}); - - // reversed/count - sliceRange.setReversed(false).setCount(session.getColumnsPerKey()); - // initialize SlicePredicate with existing SliceRange - SlicePredicate predicate = new SlicePredicate().setSlice_range(sliceRange); + SlicePredicate predicate = new SlicePredicate(); + + if (session.columnNames == null) + predicate.setSlice_range(getSliceRange()); + else // see CASSANDRA-3064 about why this is useful + predicate.setColumn_names(session.columnNames); if (session.getColumnFamilyType() == ColumnFamilyType.Super) { @@ -150,4 +147,12 @@ public class Reader extends Operation session.latency.getAndAdd(System.currentTimeMillis() - start); } + private SliceRange getSliceRange() + { + return new SliceRange() + .setStart(new byte[] {}) + .setFinish(new byte[] {}) + .setReversed(false) + .setCount(session.getColumnsPerKey()); + } }