Author: xedin
Date: Mon Aug 22 15:31:59 2011
New Revision: 1160305

URL: http://svn.apache.org/viewvc?rev=1160305&view=rev
Log:
Add query-by-column mode to stress.java
patch by Pavel Yaskevich; reviewed by Jonathan Ellis for CASSANDRA-3064

Modified:
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/Session.java
    
cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/Reader.java

Modified: cassandra/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1160305&r1=1160304&r2=1160305&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Mon Aug 22 15:31:59 2011
@@ -37,7 +37,7 @@
    and few other places responsible for work with SSTable files 
(CASSANDRA-3040)
  * Stop reading from sstables once we know we have the most recent columns,
    for query-by-name requests (CASSANDRA-2498)
-
+ * Add query-by-column mode to stress.java (CASSANDRA-3064)
 
 0.8.5
  * fix NPE when encryption_options is unspecified (CASSANDRA-3007)

Modified: 
cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/Session.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/Session.java?rev=1160305&r1=1160304&r2=1160305&view=diff
==============================================================================
--- cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/Session.java 
(original)
+++ cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/Session.java 
Mon Aug 22 15:31:59 2011
@@ -20,10 +20,14 @@ package org.apache.cassandra.stress;
 import java.io.*;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.cassandra.config.ConfigurationException;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.TypeParser;
 import org.apache.commons.cli.*;
 
 import org.apache.cassandra.db.ColumnFamilyType;
@@ -41,6 +45,9 @@ public class Session implements Serializ
     // command line options
     public static final Options availableOptions = new Options();
 
+    public static final String DEFAULT_COMPARATOR = "AsciiType";
+    public static final String DEFAULT_VALIDATOR  = "BytesType";
+
     public final AtomicInteger operations;
     public final AtomicInteger keys;
     public final AtomicLong    latency;
@@ -78,6 +85,7 @@ public class Session implements Serializ
         availableOptions.addOption("V",  "average-size-values",  false,  
"Generate column values of average rather than specific size");
         availableOptions.addOption("T",  "send-to",              true,   "Send 
this as a request to the stress daemon at specified address.");
         availableOptions.addOption("I",  "compression",          false,  "Use 
sstable compression when creating schema");
+        availableOptions.addOption("Q",  "query-names",          true,   
"Comma-separated list of column names to retrieve from each row.");
     }
 
     private int numKeys          = 1000 * 1000;
@@ -109,6 +117,9 @@ public class Session implements Serializ
     private String replicationStrategy = 
"org.apache.cassandra.locator.SimpleStrategy";
     private Map<String, String> replicationStrategyOptions = new 
HashMap<String, String>();
 
+    // if we know exactly column names that we want to read (set by -Q option)
+    public final List<ByteBuffer> columnNames;
+
     public final boolean averageSizeValues;
 
     // required by Gaussian distribution.
@@ -275,11 +286,30 @@ public class Session implements Serializ
             {
                 throw new RuntimeException(e);
             }
+
+            if (cmd.hasOption("Q"))
+            {
+                AbstractType comparator = TypeParser.parse(DEFAULT_COMPARATOR);
+
+                String[] names = StringUtils.split(cmd.getOptionValue("Q"), 
",");
+                columnNames = new ArrayList<ByteBuffer>(names.length);
+
+                for (String columnName : names)
+                    columnNames.add(comparator.fromString(columnName));
+            }
+            else
+            {
+                columnNames = null;
+            }
         }
         catch (ParseException e)
         {
             throw new IllegalArgumentException(e.getMessage(), e);
         }
+        catch (ConfigurationException e)
+        {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
 
         mean  = numDifferentKeys / 2;
         sigma = numDifferentKeys * STDev;
@@ -417,8 +447,11 @@ public class Session implements Serializ
 
         // column family for standard columns
         CfDef standardCfDef = new CfDef("Keyspace1", "Standard1");
-        System.out.println("Compression = " + compression);
-        
standardCfDef.setComparator_type("AsciiType").setDefault_validation_class("BytesType").setCompression(compression);
+
+        standardCfDef.setComparator_type(DEFAULT_COMPARATOR)
+                     .setDefault_validation_class(DEFAULT_VALIDATOR)
+                     .setCompression(compression);
+
         if (indexType != null)
         {
             ColumnDef standardColumn = new 
ColumnDef(ByteBufferUtil.bytes("C1"), "BytesType");
@@ -428,7 +461,10 @@ public class Session implements Serializ
 
         // column family with super columns
         CfDef superCfDef = new CfDef("Keyspace1", 
"Super1").setColumn_type("Super");
-        
superCfDef.setComparator_type("AsciiType").setSubcomparator_type("AsciiType").setDefault_validation_class("BytesType").setCompression(compression);
+        superCfDef.setComparator_type(DEFAULT_COMPARATOR)
+                  .setSubcomparator_type(DEFAULT_COMPARATOR)
+                  .setDefault_validation_class(DEFAULT_VALIDATOR)
+                  .setCompression(compression);
 
         // column family for standard counters
         CfDef counterCfDef = new CfDef("Keyspace1", 
"Counter1").setDefault_validation_class("CounterColumnType").setReplicate_on_write(replicateOnWrite).setCompression(compression);

Modified: 
cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/Reader.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/Reader.java?rev=1160305&r1=1160304&r2=1160305&view=diff
==============================================================================
--- 
cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/Reader.java
 (original)
+++ 
cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/Reader.java
 Mon Aug 22 15:31:59 2011
@@ -37,16 +37,13 @@ public class Reader extends Operation
 
     public void run(Cassandra.Client client) throws IOException
     {
-        SliceRange sliceRange = new SliceRange();
-
-        // start/finish
-        sliceRange.setStart(new byte[] {}).setFinish(new byte[] {});
-
-        // reversed/count
-        sliceRange.setReversed(false).setCount(session.getColumnsPerKey());
-
         // initialize SlicePredicate with existing SliceRange
-        SlicePredicate predicate = new 
SlicePredicate().setSlice_range(sliceRange);
+        SlicePredicate predicate = new SlicePredicate();
+
+        if (session.columnNames == null)
+            predicate.setSlice_range(getSliceRange());
+        else // see CASSANDRA-3064 about why this is useful
+            predicate.setColumn_names(session.columnNames);
 
         if (session.getColumnFamilyType() == ColumnFamilyType.Super)
         {
@@ -150,4 +147,12 @@ public class Reader extends Operation
         session.latency.getAndAdd(System.currentTimeMillis() - start);
     }
 
+    private SliceRange getSliceRange()
+    {
+        return new SliceRange()
+                    .setStart(new byte[] {})
+                    .setFinish(new byte[] {})
+                    .setReversed(false)
+                    .setCount(session.getColumnsPerKey());
+    }
 }


Reply via email to