Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 e6f027979 -> 3bee990ca
  refs/heads/trunk 47964b766 -> ed0840299


cassandra-stress supports validation operations over user profiles

patch by benedict; reviewed by snazy for CASSANDRA-8773


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3bee990c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3bee990c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3bee990c

Branch: refs/heads/cassandra-2.1
Commit: 3bee990ca2e46bf0fd5742c56b5d00cc0566950b
Parents: e6f0279
Author: Benedict Elliott Smith <bened...@apache.org>
Authored: Mon May 4 18:01:38 2015 +0100
Committer: Benedict Elliott Smith <bened...@apache.org>
Committed: Mon May 4 18:01:38 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/stress/Operation.java  |  13 +-
 .../apache/cassandra/stress/StressAction.java   |   4 +-
 .../apache/cassandra/stress/StressProfile.java  |  25 +-
 .../stress/generate/PartitionGenerator.java     |   9 +
 .../stress/generate/PartitionIterator.java      | 166 +++++++--
 .../apache/cassandra/stress/generate/Row.java   |  15 +-
 .../SampledOpDistributionFactory.java           |  26 +-
 .../operations/userdefined/SchemaInsert.java    |  14 +-
 .../operations/userdefined/SchemaQuery.java     |   7 +-
 .../operations/userdefined/SchemaStatement.java |  35 +-
 .../userdefined/ValidatingSchemaQuery.java      | 359 +++++++++++++++++++
 .../SettingsCommandPreDefinedMixed.java         |   9 +-
 .../stress/settings/SettingsCommandUser.java    |   9 +-
 .../stress/settings/ValidationType.java         |  29 --
 15 files changed, 581 insertions(+), 140 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bee990c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e7689ab..3a2daa7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.6
+ * cassandra-stress supports validation operations over user profiles 
(CASSANDRA-8773)
  * Add support for rate limiting log messages (CASSANDRA-9029)
  * Log the partition key with tombstone warnings (CASSANDRA-8561)
  * Reduce runWithCompactionsDisabled poll interval to 1ms (CASSANDRA-9271)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bee990c/tools/stress/src/org/apache/cassandra/stress/Operation.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/Operation.java 
b/tools/stress/src/org/apache/cassandra/stress/Operation.java
index f4ac5ee..1179f71 100644
--- a/tools/stress/src/org/apache/cassandra/stress/Operation.java
+++ b/tools/stress/src/org/apache/cassandra/stress/Operation.java
@@ -104,10 +104,7 @@ public abstract class Operation
                 if (seed == null)
                     break;
 
-                if (spec.useRatio == null)
-                    success = partitionCache.get(i).reset(seed, 
spec.targetCount, isWrite());
-                else
-                    success = partitionCache.get(i).reset(seed, 
spec.useRatio.next(), isWrite());
+                success = reset(seed, partitionCache.get(i));
             }
         }
         partitionCount = i;
@@ -119,6 +116,14 @@ public abstract class Operation
         return !partitions.isEmpty();
     }
 
+    protected boolean reset(Seed seed, PartitionIterator iterator)
+    {
+        if (spec.useRatio == null)
+            return iterator.reset(seed, spec.targetCount, isWrite());
+        else
+            return iterator.reset(seed, spec.useRatio.next(), isWrite());
+    }
+
     public boolean isWrite()
     {
         return false;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bee990c/tools/stress/src/org/apache/cassandra/stress/StressAction.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressAction.java 
b/tools/stress/src/org/apache/cassandra/stress/StressAction.java
index f906a55..158a278 100644
--- a/tools/stress/src/org/apache/cassandra/stress/StressAction.java
+++ b/tools/stress/src/org/apache/cassandra/stress/StressAction.java
@@ -88,7 +88,9 @@ public class StressAction implements Runnable
         // warmup - do 50k iterations; by default hotspot compiles methods 
after 10k invocations
         PrintStream warmupOutput = new PrintStream(new OutputStream() { 
@Override public void write(int b) throws IOException { } } );
         int iterations = 50000 * settings.node.nodes.size();
-        int threads = 20;
+        int threads = 100;
+        if (iterations > settings.command.count && settings.command.count > 0)
+            return;
 
         if (settings.rate.maxThreads > 0)
             threads = Math.min(threads, settings.rate.maxThreads);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bee990c/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressProfile.java 
b/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
index 6c73214..49c4682 100644
--- a/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
+++ b/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
@@ -43,6 +43,7 @@ import org.apache.cassandra.stress.generate.*;
 import org.apache.cassandra.stress.generate.values.*;
 import org.apache.cassandra.stress.operations.userdefined.SchemaInsert;
 import org.apache.cassandra.stress.operations.userdefined.SchemaQuery;
+import 
org.apache.cassandra.stress.operations.userdefined.ValidatingSchemaQuery;
 import org.apache.cassandra.stress.settings.*;
 import org.apache.cassandra.stress.util.JavaDriverClient;
 import org.apache.cassandra.stress.util.ThriftClient;
@@ -75,6 +76,7 @@ public class StressProfile implements Serializable
     transient volatile RatioDistributionFactory selectchance;
     transient volatile PreparedStatement insertStatement;
     transient volatile Integer thriftInsertId;
+    transient volatile List<ValidatingSchemaQuery.Factory> validationFactories;
 
     transient volatile Map<String, SchemaQuery.ArgSelect> argSelects;
     transient volatile Map<String, PreparedStatement> queryStatements;
@@ -265,12 +267,11 @@ public class StressProfile implements Serializable
             }
         }
 
-        // TODO validation
         name = name.toLowerCase();
         if (!queryStatements.containsKey(name))
             throw new IllegalArgumentException("No query defined with name " + 
name);
         return new SchemaQuery(timer, settings, generator, seeds, 
thriftQueryIds.get(name), queryStatements.get(name),
-                               
ThriftConversion.fromThrift(settings.command.consistencyLevel), 
ValidationType.NOT_FAIL, argSelects.get(name));
+                               
ThriftConversion.fromThrift(settings.command.consistencyLevel), 
argSelects.get(name));
     }
 
     public SchemaInsert getInsert(Timer timer, PartitionGenerator generator, 
SeedManager seedManager, StressSettings settings)
@@ -388,6 +389,26 @@ public class StressProfile implements Serializable
         return new SchemaInsert(timer, settings, generator, seedManager, 
partitions.get(), selectchance.get(), thriftInsertId, insertStatement, 
ThriftConversion.fromThrift(settings.command.consistencyLevel), batchType);
     }
 
+    public List<ValidatingSchemaQuery> getValidate(Timer timer, 
PartitionGenerator generator, SeedManager seedManager, StressSettings settings)
+    {
+        if (validationFactories == null)
+        {
+            synchronized (this)
+            {
+                if (validationFactories == null)
+                {
+                    maybeLoadSchemaInfo(settings);
+                    validationFactories = 
ValidatingSchemaQuery.create(tableMetaData, settings);
+                }
+            }
+        }
+
+        List<ValidatingSchemaQuery> queries = new ArrayList<>();
+        for (ValidatingSchemaQuery.Factory factory : validationFactories)
+            queries.add(factory.create(timer, settings, generator, 
seedManager, ThriftConversion.fromThrift(settings.command.consistencyLevel)));
+        return queries;
+    }
+
     private static <E> E select(E first, String key, String defValue, 
Map<String, String> map, Function<String, E> builder)
     {
         String val = map.remove(key);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bee990c/tools/stress/src/org/apache/cassandra/stress/generate/PartitionGenerator.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/generate/PartitionGenerator.java 
b/tools/stress/src/org/apache/cassandra/stress/generate/PartitionGenerator.java
index 9f88068..a7297c5 100644
--- 
a/tools/stress/src/org/apache/cassandra/stress/generate/PartitionGenerator.java
+++ 
b/tools/stress/src/org/apache/cassandra/stress/generate/PartitionGenerator.java
@@ -101,4 +101,13 @@ public class PartitionGenerator
             return clusteringComponents.get(c).type.decompose(v);
         return valueComponents.get(c - 
clusteringComponents.size()).type.decompose(v);
     }
+
+    public Object convert(int c, ByteBuffer v)
+    {
+        if (c < 0)
+            return partitionKey.get(-1-c).type.compose(v);
+        if (c < clusteringComponents.size())
+            return clusteringComponents.get(c).type.compose(v);
+        return valueComponents.get(c - 
clusteringComponents.size()).type.compose(v);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bee990c/tools/stress/src/org/apache/cassandra/stress/generate/PartitionIterator.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/generate/PartitionIterator.java 
b/tools/stress/src/org/apache/cassandra/stress/generate/PartitionIterator.java
index 0466edb..4906b95 100644
--- 
a/tools/stress/src/org/apache/cassandra/stress/generate/PartitionIterator.java
+++ 
b/tools/stress/src/org/apache/cassandra/stress/generate/PartitionIterator.java
@@ -36,10 +36,12 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 
+import com.google.common.collect.Iterables;
+
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.BytesType;
-import org.apache.cassandra.stress.Operation;
 import org.apache.cassandra.stress.generate.values.Generator;
+import org.apache.cassandra.utils.Pair;
 
 // a partition is re-used to reduce garbage generation, as is its internal 
RowIterator
 // TODO: we should batch the generation of clustering components so we can 
bound the time and size necessary to
@@ -50,8 +52,11 @@ import org.apache.cassandra.stress.generate.values.Generator;
 public abstract class PartitionIterator implements Iterator<Row>
 {
 
-    abstract boolean reset(double useChance, int targetCount, boolean isWrite);
+    abstract boolean reset(double useChance, int targetCount, boolean isWrite, 
PartitionGenerator.Order order);
+    // picks random (inclusive) bounds to iterate, and returns them
+    public abstract Pair<Row, Row> resetToBounds(Seed seed, int 
clusteringComponentDepth);
 
+    PartitionGenerator.Order order;
     long idseed;
     Seed seed;
 
@@ -78,7 +83,7 @@ public abstract class PartitionIterator implements 
Iterator<Row>
         this.row = new Row(partitionKey, new 
Object[generator.clusteringComponents.size() + 
generator.valueComponents.size()]);
     }
 
-    private void setSeed(Seed seed)
+    void setSeed(Seed seed)
     {
         long idseed = 0;
         for (int i = 0 ; i < partitionKey.length ; i++)
@@ -98,13 +103,15 @@ public abstract class PartitionIterator implements 
Iterator<Row>
     public boolean reset(Seed seed, double useChance, boolean isWrite)
     {
         setSeed(seed);
-        return reset(useChance, 0, isWrite);
+        this.order = generator.order;
+        return reset(useChance, 0, isWrite, PartitionIterator.this.order);
     }
 
     public boolean reset(Seed seed, int targetCount, boolean isWrite)
     {
         setSeed(seed);
-        return reset(Double.NaN, targetCount, isWrite);
+        this.order = generator.order;
+        return reset(Double.NaN, targetCount, isWrite, 
PartitionIterator.this.order);
     }
 
     static class SingleRowIterator extends PartitionIterator
@@ -117,7 +124,15 @@ public abstract class PartitionIterator implements 
Iterator<Row>
             super(generator, seedManager);
         }
 
-        boolean reset(double useChance, int targetCount, boolean isWrite)
+        public Pair<Row, Row> resetToBounds(Seed seed, int 
clusteringComponentDepth)
+        {
+            assert clusteringComponentDepth == 0;
+            setSeed(seed);
+            reset(1d, 1, false, PartitionGenerator.Order.SORTED);
+            return Pair.create(new Row(partitionKey), new Row(partitionKey));
+        }
+
+        boolean reset(double useChance, int targetCount, boolean isWrite, 
PartitionGenerator.Order order)
         {
             done = false;
             this.isWrite = isWrite;
@@ -201,16 +216,11 @@ public abstract class PartitionIterator implements 
Iterator<Row>
          *
          * @return true if there is data to return, false otherwise
          */
-        boolean reset(double useChance, int targetCount, boolean isWrite)
+        boolean reset(double useChance, int targetCount, boolean isWrite, 
PartitionGenerator.Order order)
         {
             this.isWrite = isWrite;
-            if (this.useChance < 1d)
-            {
-                // we clear our prior roll-modifiers if the use chance was 
previously less-than zero
-                Arrays.fill(rollmodifier, 1d);
-                Arrays.fill(chancemodifier, 1d);
-            }
 
+            this.order = order;
             // set the seed for the first clustering component
             generator.clusteringComponents.get(0).setSeed(idseed);
 
@@ -229,7 +239,7 @@ public abstract class PartitionIterator implements 
Iterator<Row>
 
             if (Double.isNaN(useChance))
                 useChance = Math.max(0d, Math.min(1d, targetCount / (double) 
expectedRowCount));
-            this.useChance = useChance;
+            setUseChance(useChance);
 
             while (true)
             {
@@ -239,8 +249,7 @@ public abstract class PartitionIterator implements 
Iterator<Row>
 
                 for (Queue<?> q : clusteringComponents)
                     q.clear();
-                clusteringSeeds[0] = idseed;
-                fill(clusteringComponents[0], firstComponentCount, 
generator.clusteringComponents.get(0));
+                fill(0);
 
                 if (!isWrite)
                 {
@@ -249,8 +258,7 @@ public abstract class PartitionIterator implements 
Iterator<Row>
                     return true;
                 }
 
-
-                int count = Math.max(1, expectedRowCount / seed.visits);
+                int count = seed.visits == 1 ? 1 + (int) generator.maxRowCount 
: Math.max(1, expectedRowCount / seed.visits);
                 position = seed.moveForwards(count);
                 isFirstWrite = position == 0;
                 setLastRow(position + count - 1);
@@ -266,6 +274,44 @@ public abstract class PartitionIterator implements 
Iterator<Row>
             }
         }
 
+        void setUseChance(double useChance)
+        {
+            if (this.useChance < 1d)
+            {
+                // we clear our prior roll-modifiers if the use chance was 
previously less-than zero
+                Arrays.fill(rollmodifier, 1d);
+                Arrays.fill(chancemodifier, 1d);
+            }
+            this.useChance = useChance;
+        }
+
+        public Pair<Row, Row> resetToBounds(Seed seed, int 
clusteringComponentDepth)
+        {
+            setSeed(seed);
+            setUseChance(1d);
+            if (clusteringComponentDepth == 0)
+            {
+                reset(1d, -1, false, PartitionGenerator.Order.SORTED);
+                return Pair.create(new Row(partitionKey), new 
Row(partitionKey));
+            }
+
+            this.order = PartitionGenerator.Order.SORTED;
+            assert clusteringComponentDepth <= clusteringComponents.length;
+            for (Queue<?> q : clusteringComponents)
+                q.clear();
+
+            fill(0);
+            Pair<int[], Object[]> bound1 = 
randomBound(clusteringComponentDepth);
+            Pair<int[], Object[]> bound2 = 
randomBound(clusteringComponentDepth);
+            if (compare(bound1.left, bound2.left) > 0) { Pair<int[], Object[]> 
tmp = bound1; bound1 = bound2; bound2 = tmp;}
+            Arrays.fill(lastRow, 0);
+            System.arraycopy(bound2.left, 0, lastRow, 0, bound2.left.length);
+            Arrays.fill(currentRow, 0);
+            System.arraycopy(bound1.left, 0, currentRow, 0, 
bound1.left.length);
+            seekToCurrentRow();
+            return Pair.create(new Row(partitionKey, bound1.right), new 
Row(partitionKey, bound2.right));
+        }
+
         // returns expected row count
         private int setNoLastRow(int firstComponentCount)
         {
@@ -296,12 +342,39 @@ public abstract class PartitionIterator implements 
Iterator<Row>
         // OR if that row does not exist, it is the last row prior to it
         private int compareToLastRow(int depth)
         {
+            int prev = 0;
             for (int i = 0 ; i <= depth ; i++)
             {
                 int p = currentRow[i], l = lastRow[i], r = 
clusteringComponents[i].size();
-                if ((p == l) | (r == 1))
-                    continue;
-                return p - l;
+                if (prev < 0)
+                {
+                    // if we're behind our last position in theory, and have 
known more items to visit in practice
+                    // we're definitely behind our last row
+                    if (r > 1)
+                        return -1;
+                    // otherwise move forwards to see if we might have more to 
visit
+                }
+                else if (p > l)
+                {
+                    // prev must be == 0, so if p > l, we're after our last row
+                    return 1;
+                }
+                else if (p == l)
+                {
+                    // if we're equal to our last row up to our current depth, 
then we need to loop and look forwards
+                }
+                else if (r == 1)
+                {
+                    // if this is our last item in practice, store if we're 
behind our theoretical position
+                    // and move forwards; if every remaining practical item is 
1, we're at the last row
+                    // otherwise we're before it
+                    prev = p - l;
+                }
+                else
+                {
+                    // p < l, and r > 1, so we're definitely not at the end
+                    return -1;
+                }
             }
             return 0;
         }
@@ -330,6 +403,14 @@ public abstract class PartitionIterator implements 
Iterator<Row>
             }
         }
 
+        private static int compare(int[] l, int[] r)
+        {
+            for (int i = 0 ; i < l.length ; i++)
+                if (l[i] != r[i])
+                    return Integer.compare(l[i], r[i]);
+            return 0;
+        }
+
         static enum State
         {
             END_OF_PARTITION, AFTER_LIMIT, SUCCESS;
@@ -349,9 +430,12 @@ public abstract class PartitionIterator implements 
Iterator<Row>
                 clusteringComponents[0].addFirst(this);
                 return setHasNext(advance(0, true));
             }
-
+            decompose(scalar, this.currentRow);
+            return seekToCurrentRow();
+        }
+        private State seekToCurrentRow()
+        {
             int[] position = this.currentRow;
-            decompose(scalar, position);
             for (int i = 0 ; i < position.length ; i++)
             {
                 if (i != 0)
@@ -399,7 +483,7 @@ public abstract class PartitionIterator implements 
Iterator<Row>
 
         // normal method for moving the iterator forward; maintains the row 
object, and delegates to advance(int)
         // to move the iterator to the next item
-        void advance()
+        Row advance()
         {
             // we are always at the leaf level when this method is invoked
             // so we calculate the seed for generating the row by combining 
the seed that generated the clustering components
@@ -407,16 +491,18 @@ public abstract class PartitionIterator implements 
Iterator<Row>
             long parentSeed = clusteringSeeds[depth];
             long rowSeed = seed(clusteringComponents[depth].peek(), 
generator.clusteringComponents.get(depth).type, parentSeed);
 
+            Row result = row.copy();
             // and then fill the row with the _non-clustering_ values for the 
position we _were_ at, as this is what we'll deliver
             for (int i = clusteringSeeds.length ; i < row.row.length ; i++)
             {
                 Generator gen = generator.valueComponents.get(i - 
clusteringSeeds.length);
                 gen.setSeed(rowSeed);
-                row.row[i] = gen.generate();
+                result.row[i] = gen.generate();
             }
 
             // then we advance the leaf level
             setHasNext(advance(depth, false));
+            return result;
         }
 
         private boolean advance(int depth, boolean first)
@@ -481,15 +567,33 @@ public abstract class PartitionIterator implements 
Iterator<Row>
             }
         }
 
+        private Pair<int[], Object[]> randomBound(int clusteringComponentDepth)
+        {
+            ThreadLocalRandom rnd = ThreadLocalRandom.current();
+            int[] position = new int[clusteringComponentDepth];
+            Object[] bound = new Object[clusteringComponentDepth];
+            position[0] = rnd.nextInt(clusteringComponents[0].size());
+            bound[0] = Iterables.get(clusteringComponents[0], position[0]);
+            for (int d = 1 ; d < clusteringComponentDepth ; d++)
+            {
+                fill(d);
+                position[d] = rnd.nextInt(clusteringComponents[d].size());
+                bound[d] = Iterables.get(clusteringComponents[d], position[d]);
+            }
+            for (int d = 1 ; d < clusteringComponentDepth ; d++)
+                clusteringComponents[d].clear();
+            return Pair.create(position, bound);
+        }
+
         // generate the clustering components for the provided depth; requires 
preceding components
         // to have been generated and their seeds populated into 
clusteringSeeds
         void fill(int depth)
         {
-            long seed = clusteringSeeds[depth - 1];
+            long seed = depth == 0 ? idseed : clusteringSeeds[depth - 1];
             Generator gen = generator.clusteringComponents.get(depth);
             gen.setSeed(seed);
-            clusteringSeeds[depth] = seed(clusteringComponents[depth - 
1].peek(), generator.clusteringComponents.get(depth - 1).type, seed);
             fill(clusteringComponents[depth], (int) 
gen.clusteringDistribution.next(), gen);
+            clusteringSeeds[depth] = seed(clusteringComponents[depth].peek(), 
generator.clusteringComponents.get(depth).type, seed);
         }
 
         // generate the clustering components into the queue
@@ -501,7 +605,7 @@ public abstract class PartitionIterator implements 
Iterator<Row>
                 return;
             }
 
-            switch (this.generator.order)
+            switch (order)
             {
                 case SORTED:
                     if (Comparable.class.isAssignableFrom(generator.clazz))
@@ -511,7 +615,7 @@ public abstract class PartitionIterator implements 
Iterator<Row>
                             tosort.add(generator.generate());
                         Collections.sort((List<Comparable>) (List<?>) tosort);
                         for (int i = 0 ; i < count ; i++)
-                            if (i == 0 || ((Comparable) tosort.get(i - 
1)).compareTo(i) < 0)
+                            if (i == 0 || ((Comparable) tosort.get(i - 
1)).compareTo(tosort.get(i)) < 0)
                                 queue.add(tosort.get(i));
                         break;
                     }
@@ -556,8 +660,7 @@ public abstract class PartitionIterator implements 
Iterator<Row>
         {
             if (!hasNext())
                 throw new NoSuchElementException();
-            advance();
-            return row;
+            return advance();
         }
 
         public boolean finishedPartition()
@@ -646,5 +749,4 @@ public abstract class PartitionIterator implements 
Iterator<Row>
     {
         return generator.partitionKey.get(0).type.decompose(partitionKey[0]);
     }
-
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bee990c/tools/stress/src/org/apache/cassandra/stress/generate/Row.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/Row.java 
b/tools/stress/src/org/apache/cassandra/stress/generate/Row.java
index 421dbbf..bf547c8 100644
--- a/tools/stress/src/org/apache/cassandra/stress/generate/Row.java
+++ b/tools/stress/src/org/apache/cassandra/stress/generate/Row.java
@@ -23,9 +23,16 @@ package org.apache.cassandra.stress.generate;
 
 public class Row
 {
+    private static final Object[] EMPTY_ROW_DATA = new Object[0];
 
-    final Object[] partitionKey;
-    final Object[] row;
+    public final Object[] partitionKey;
+    public final Object[] row;
+
+    public Row(Object[] partitionKey)
+    {
+        this.partitionKey = partitionKey;
+        this.row = EMPTY_ROW_DATA;
+    }
 
     public Row(Object[] partitionKey, Object[] row)
     {
@@ -40,4 +47,8 @@ public class Row
         return row[column];
     }
 
+    public Row copy()
+    {
+        return new Row(partitionKey.clone(), row.clone());
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bee990c/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java
 
b/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java
index 10191a6..a20272a 100644
--- 
a/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java
+++ 
b/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java
@@ -21,19 +21,14 @@ package org.apache.cassandra.stress.operations;
  */
 
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
+import org.apache.cassandra.stress.generate.*;
 import org.apache.cassandra.stress.util.Timing;
 import org.apache.commons.math3.distribution.EnumeratedDistribution;
 import org.apache.commons.math3.util.Pair;
 
 import org.apache.cassandra.stress.Operation;
-import org.apache.cassandra.stress.generate.DistributionFactory;
-import org.apache.cassandra.stress.generate.PartitionGenerator;
-import org.apache.cassandra.stress.generate.SeedManager;
 import org.apache.cassandra.stress.util.Timer;
 
 public abstract class SampledOpDistributionFactory<T> implements 
OpDistributionFactory
@@ -47,7 +42,7 @@ public abstract class SampledOpDistributionFactory<T> 
implements OpDistributionF
         this.clustering = clustering;
     }
 
-    protected abstract Operation get(Timer timer, PartitionGenerator 
generator, T key);
+    protected abstract List<? extends Operation> get(Timer timer, 
PartitionGenerator generator, T key);
     protected abstract PartitionGenerator newGenerator();
 
     public OpDistribution get(Timing timing, int sampleCount)
@@ -55,8 +50,11 @@ public abstract class SampledOpDistributionFactory<T> 
implements OpDistributionF
         PartitionGenerator generator = newGenerator();
         List<Pair<Operation, Double>> operations = new ArrayList<>();
         for (Map.Entry<T, Double> ratio : ratios.entrySet())
-            operations.add(new 
Pair<>(get(timing.newTimer(ratio.getKey().toString(), sampleCount), generator, 
ratio.getKey()),
-                                      ratio.getValue()));
+        {
+            List<? extends Operation> ops = 
get(timing.newTimer(ratio.getKey().toString(), sampleCount), generator, 
ratio.getKey());
+            for (Operation op : ops)
+                operations.add(new Pair<>(op, ratio.getValue() / ops.size()));
+        }
         return new SampledOpDistribution(new 
EnumeratedDistribution<>(operations), clustering.get());
     }
 
@@ -77,7 +75,13 @@ public abstract class SampledOpDistributionFactory<T> 
implements OpDistributionF
             {
                 public OpDistribution get(Timing timing, int sampleCount)
                 {
-                    return new 
FixedOpDistribution(SampledOpDistributionFactory.this.get(timing.newTimer(ratio.getKey().toString(),
 sampleCount), newGenerator(), ratio.getKey()));
+                    List<? extends Operation> ops = 
SampledOpDistributionFactory.this.get(timing.newTimer(ratio.getKey().toString(),
 sampleCount), newGenerator(), ratio.getKey());
+                    if (ops.size() == 1)
+                        return new FixedOpDistribution(ops.get(0));
+                    List<Pair<Operation, Double>> ratios = new ArrayList<>();
+                    for (Operation op : ops)
+                        ratios.add(new Pair<>(op, 1d / ops.size()));
+                    return new SampledOpDistribution(new 
EnumeratedDistribution<Operation>(ratios), new DistributionFixed(1));
                 }
 
                 public String desc()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bee990c/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaInsert.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaInsert.java
 
b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaInsert.java
index a915d93..ef4d53f 100644
--- 
a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaInsert.java
+++ 
b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaInsert.java
@@ -32,7 +32,6 @@ import com.datastax.driver.core.Statement;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.stress.generate.*;
 import org.apache.cassandra.stress.settings.StressSettings;
-import org.apache.cassandra.stress.settings.ValidationType;
 import org.apache.cassandra.stress.util.JavaDriverClient;
 import org.apache.cassandra.stress.util.ThriftClient;
 import org.apache.cassandra.stress.util.Timer;
@@ -44,7 +43,7 @@ public class SchemaInsert extends SchemaStatement
 
     public SchemaInsert(Timer timer, StressSettings settings, 
PartitionGenerator generator, SeedManager seedManager, Distribution batchSize, 
RatioDistribution useRatio, Integer thriftId, PreparedStatement statement, 
ConsistencyLevel cl, BatchStatement.Type batchType)
     {
-        super(timer, settings, new DataSpec(generator, seedManager, batchSize, 
useRatio), statement, thriftId, cl, ValidationType.NOT_FAIL);
+        super(timer, settings, new DataSpec(generator, seedManager, batchSize, 
useRatio), statement, thriftId, cl);
         this.batchType = batchType;
     }
 
@@ -85,14 +84,7 @@ public class SchemaInsert extends SchemaStatement
                     stmt = batch;
                 }
 
-                try
-                {
-                    validate(client.getSession().execute(stmt));
-                }
-                catch (ClassCastException e)
-                {
-                    e.printStackTrace();
-                }
+                client.getSession().execute(stmt);
             }
             return true;
         }
@@ -113,7 +105,7 @@ public class SchemaInsert extends SchemaStatement
             {
                 while (iterator.hasNext())
                 {
-                    validate(client.execute_prepared_cql3_query(thriftId, 
iterator.getToken(), thriftRowArgs(iterator.next()), 
settings.command.consistencyLevel));
+                    client.execute_prepared_cql3_query(thriftId, 
iterator.getToken(), thriftRowArgs(iterator.next()), 
settings.command.consistencyLevel);
                     rowCount += 1;
                 }
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bee990c/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaQuery.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaQuery.java
 
b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaQuery.java
index a51bac4..58f5307 100644
--- 
a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaQuery.java
+++ 
b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaQuery.java
@@ -33,7 +33,6 @@ import com.datastax.driver.core.ResultSet;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.stress.generate.*;
 import org.apache.cassandra.stress.settings.StressSettings;
-import org.apache.cassandra.stress.settings.ValidationType;
 import org.apache.cassandra.stress.util.JavaDriverClient;
 import org.apache.cassandra.stress.util.ThriftClient;
 import org.apache.cassandra.stress.util.Timer;
@@ -52,9 +51,9 @@ public class SchemaQuery extends SchemaStatement
     final Object[][] randomBuffer;
     final Random random = new Random();
 
-    public SchemaQuery(Timer timer, StressSettings settings, 
PartitionGenerator generator, SeedManager seedManager, Integer thriftId, 
PreparedStatement statement, ConsistencyLevel cl, ValidationType 
validationType, ArgSelect argSelect)
+    public SchemaQuery(Timer timer, StressSettings settings, 
PartitionGenerator generator, SeedManager seedManager, Integer thriftId, 
PreparedStatement statement, ConsistencyLevel cl, ArgSelect argSelect)
     {
-        super(timer, settings, new DataSpec(generator, seedManager, new 
DistributionFixed(1), argSelect == ArgSelect.MULTIROW ? 
statement.getVariables().size() : 1), statement, thriftId, cl, validationType);
+        super(timer, settings, new DataSpec(generator, seedManager, new 
DistributionFixed(1), argSelect == ArgSelect.MULTIROW ? 
statement.getVariables().size() : 1), statement, thriftId, cl);
         this.argSelect = argSelect;
         randomBuffer = new Object[argumentIndex.length][argumentIndex.length];
     }
@@ -71,7 +70,6 @@ public class SchemaQuery extends SchemaStatement
         public boolean run() throws Exception
         {
             ResultSet rs = client.getSession().execute(bindArgs());
-            validate(rs);
             rowCount = rs.all().size();
             partitionCount = Math.min(1, rowCount);
             return true;
@@ -90,7 +88,6 @@ public class SchemaQuery extends SchemaStatement
         public boolean run() throws Exception
         {
             CqlResult rs = client.execute_prepared_cql3_query(thriftId, 
partitions.get(0).getToken(), thriftArgs(), ThriftConversion.toThrift(cl));
-            validate(rs);
             rowCount = rs.getRowsSize();
             partitionCount = Math.min(1, rowCount);
             return true;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bee990c/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java
 
b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java
index 4305151..e90de23 100644
--- 
a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java
+++ 
b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java
@@ -34,7 +34,6 @@ import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.stress.Operation;
 import org.apache.cassandra.stress.generate.Row;
 import org.apache.cassandra.stress.settings.StressSettings;
-import org.apache.cassandra.stress.settings.ValidationType;
 import org.apache.cassandra.stress.util.JavaDriverClient;
 import org.apache.cassandra.stress.util.Timer;
 import org.apache.cassandra.thrift.CqlResult;
@@ -46,18 +45,16 @@ public abstract class SchemaStatement extends Operation
     final PreparedStatement statement;
     final Integer thriftId;
     final ConsistencyLevel cl;
-    final ValidationType validationType;
     final int[] argumentIndex;
     final Object[] bindBuffer;
 
     public SchemaStatement(Timer timer, StressSettings settings, DataSpec spec,
-                           PreparedStatement statement, Integer thriftId, 
ConsistencyLevel cl, ValidationType validationType)
+                           PreparedStatement statement, Integer thriftId, 
ConsistencyLevel cl)
     {
         super(timer, settings, spec);
         this.statement = statement;
         this.thriftId = thriftId;
         this.cl = cl;
-        this.validationType = validationType;
         argumentIndex = new int[statement.getVariables().size()];
         bindBuffer = new Object[argumentIndex.length];
         int i = 0;
@@ -86,36 +83,6 @@ public abstract class SchemaStatement extends Operation
         return args;
     }
 
-    void validate(ResultSet rs)
-    {
-        switch (validationType)
-        {
-            case NOT_FAIL:
-                return;
-            case NON_ZERO:
-                if (rs.all().size() == 0)
-                    throw new IllegalStateException("Expected non-zero 
results");
-                break;
-            default:
-                throw new IllegalStateException("Unsupported validation type");
-        }
-    }
-
-    void validate(CqlResult rs)
-    {
-        switch (validationType)
-        {
-            case NOT_FAIL:
-                return;
-            case NON_ZERO:
-                if (rs.getRowsSize() == 0)
-                    throw new IllegalStateException("Expected non-zero 
results");
-                break;
-            default:
-                throw new IllegalStateException("Unsupported validation type");
-        }
-    }
-
     @Override
     public void run(SimpleClient client) throws IOException
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bee990c/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/ValidatingSchemaQuery.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/ValidatingSchemaQuery.java
 
b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/ValidatingSchemaQuery.java
new file mode 100644
index 0000000..2cbdcb4
--- /dev/null
+++ 
b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/ValidatingSchemaQuery.java
@@ -0,0 +1,359 @@
+package org.apache.cassandra.stress.operations.userdefined;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+
+import com.datastax.driver.core.*;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.stress.Operation;
+import org.apache.cassandra.stress.generate.*;
+import org.apache.cassandra.stress.generate.Row;
+import org.apache.cassandra.stress.settings.StressSettings;
+import org.apache.cassandra.stress.util.JavaDriverClient;
+import org.apache.cassandra.stress.util.ThriftClient;
+import org.apache.cassandra.stress.util.Timer;
+import org.apache.cassandra.thrift.Compression;
+import org.apache.cassandra.thrift.CqlResult;
+import org.apache.cassandra.thrift.CqlRow;
+import org.apache.cassandra.thrift.ThriftConversion;
+import org.apache.cassandra.transport.SimpleClient;
+import org.apache.cassandra.utils.Pair;
+import org.apache.thrift.TException;
+
+public class ValidatingSchemaQuery extends Operation
+{
+    final Random random = new Random();
+    private Pair<Row, Row> bounds;
+
+    final int clusteringComponents;
+    final ValidatingStatement[] statements;
+    final ConsistencyLevel cl;
+    final int[] argumentIndex;
+    final Object[] bindBuffer;
+
+    @Override
+    public void run(SimpleClient client) throws IOException
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    private ValidatingSchemaQuery(Timer timer, StressSettings settings, 
PartitionGenerator generator, SeedManager seedManager, ValidatingStatement[] 
statements, ConsistencyLevel cl, int clusteringComponents)
+    {
+        super(timer, settings, new DataSpec(generator, seedManager, new 
DistributionFixed(1), 1));
+        this.statements = statements;
+        this.cl = cl;
+        argumentIndex = new int[statements[0].statement.getVariables().size()];
+        bindBuffer = new Object[argumentIndex.length];
+        int i = 0;
+        for (ColumnDefinitions.Definition definition : 
statements[0].statement.getVariables())
+            argumentIndex[i++] = 
spec.partitionGenerator.indexOf(definition.getName());
+
+        for (ValidatingStatement statement : statements)
+            statement.statement.setConsistencyLevel(JavaDriverClient.from(cl));
+        this.clusteringComponents = clusteringComponents;
+    }
+
+    protected boolean reset(Seed seed, PartitionIterator iterator)
+    {
+        bounds = iterator.resetToBounds(seed, clusteringComponents);
+        return true;
+    }
+
+    abstract class Runner implements RunOp
+    {
+        int partitionCount;
+        int rowCount;
+        final PartitionIterator iter;
+        final int statementIndex;
+
+        protected Runner(PartitionIterator iter)
+        {
+            this.iter = iter;
+            statementIndex = 
ThreadLocalRandom.current().nextInt(statements.length);
+        }
+
+        @Override
+        public int partitionCount()
+        {
+            return partitionCount;
+        }
+
+        @Override
+        public int rowCount()
+        {
+            return rowCount;
+        }
+    }
+
+    private class JavaDriverRun extends Runner
+    {
+        final JavaDriverClient client;
+
+        private JavaDriverRun(JavaDriverClient client, PartitionIterator iter)
+        {
+            super(iter);
+            this.client = client;
+        }
+
+        public boolean run() throws Exception
+        {
+            ResultSet rs = client.getSession().execute(bind(statementIndex));
+            int[] valueIndex = new int[rs.getColumnDefinitions().size()];
+            {
+                int i = 0;
+                for (ColumnDefinitions.Definition definition : 
rs.getColumnDefinitions())
+                    valueIndex[i++] = 
spec.partitionGenerator.indexOf(definition.getName());
+            }
+
+            List<Object[]> prev1 = new ArrayList<>();
+            List<Object[]> prev2 = new ArrayList<>();
+            rowCount = 0;
+            Iterator<com.datastax.driver.core.Row> results = rs.iterator();
+            if (!statements[statementIndex].inclusiveStart && iter.hasNext())
+                iter.next();
+            while (iter.hasNext())
+            {
+                Row expectedRow = iter.next();
+                if (!statements[statementIndex].inclusiveEnd && 
!iter.hasNext())
+                    break;
+
+                if (!results.hasNext())
+                    return false;
+
+                rowCount++;
+                com.datastax.driver.core.Row actualRow = results.next();
+                Object[] vs1 = new 
Object[actualRow.getColumnDefinitions().size()];
+                Object[] vs2 = vs1.clone();
+                for (int i = 0 ; i < actualRow.getColumnDefinitions().size() ; 
i++)
+                {
+                    Object expectedValue = expectedRow.get(valueIndex[i]);
+                    Object actualValue = 
spec.partitionGenerator.convert(valueIndex[i], actualRow.getBytesUnsafe(i));
+                    vs1[i] = expectedValue;
+                    vs2[i] = actualValue;
+                    if (!expectedValue.equals(actualValue))
+                        return false;
+                }
+                prev1.add(vs1);
+                prev2.add(vs2);
+            }
+            partitionCount = Math.min(1, rowCount);
+            if (!rs.isExhausted())
+                return false;
+            return true;
+        }
+    }
+
+    private class ThriftRun extends Runner
+    {
+        final ThriftClient client;
+
+        private ThriftRun(ThriftClient client, PartitionIterator iter)
+        {
+            super(iter);
+            this.client = client;
+        }
+
+        public boolean run() throws Exception
+        {
+            CqlResult rs = 
client.execute_prepared_cql3_query(statements[statementIndex].thriftId, 
partitions.get(0).getToken(), thriftArgs(), ThriftConversion.toThrift(cl));
+            int[] valueIndex = new int[rs.getSchema().name_types.size()];
+                for (int i = 0 ; i < valueIndex.length ; i++)
+                    valueIndex[i] = 
spec.partitionGenerator.indexOf(rs.fieldForId(i).getFieldName());
+            int r = 0;
+            if (!statements[statementIndex].inclusiveStart && iter.hasNext())
+                iter.next();
+            while (iter.hasNext())
+            {
+                Row expectedRow = iter.next();
+                if (!statements[statementIndex].inclusiveEnd && 
!iter.hasNext())
+                    break;
+
+                if (r == rs.num)
+                    return false;
+
+                rowCount++;
+                CqlRow actualRow = rs.getRows().get(r++);
+                for (int i = 0 ; i < actualRow.getColumnsSize() ; i++)
+                {
+                    ByteBuffer expectedValue = 
spec.partitionGenerator.convert(valueIndex[i], expectedRow.get(valueIndex[i]));
+                    ByteBuffer actualValue = 
actualRow.getColumns().get(i).value;
+                    if (!expectedValue.equals(actualValue))
+                        return false;
+                }
+            }
+            assert r == rs.num;
+            partitionCount = Math.min(1, rowCount);
+            return true;
+        }
+    }
+
+    BoundStatement bind(int statementIndex)
+    {
+        int pkc = bounds.left.partitionKey.length;
+        System.arraycopy(bounds.left.partitionKey, 0, bindBuffer, 0, pkc);
+        int ccc = bounds.left.row.length;
+        System.arraycopy(bounds.left.row, 0, bindBuffer, pkc, ccc);
+        System.arraycopy(bounds.right.row, 0, bindBuffer, pkc + ccc, ccc);
+        return statements[statementIndex].statement.bind(bindBuffer);
+    }
+
+    List<ByteBuffer> thriftArgs()
+    {
+        List<ByteBuffer> args = new ArrayList<>();
+        int pkc = bounds.left.partitionKey.length;
+        for (int i = 0 ; i < pkc ; i++)
+            args.add(spec.partitionGenerator.convert(-i, 
bounds.left.partitionKey[i]));
+        int ccc = bounds.left.row.length;
+        for (int i = 0 ; i < ccc ; i++)
+            args.add(spec.partitionGenerator.convert(i, bounds.left.get(i)));
+        for (int i = 0 ; i < ccc ; i++)
+            args.add(spec.partitionGenerator.convert(i, bounds.right.get(i)));
+        return args;
+    }
+
+    @Override
+    public void run(JavaDriverClient client) throws IOException
+    {
+        timeWithRetry(new JavaDriverRun(client, partitions.get(0)));
+    }
+
+    @Override
+    public void run(ThriftClient client) throws IOException
+    {
+        timeWithRetry(new ThriftRun(client, partitions.get(0)));
+    }
+
+    public static class Factory
+    {
+        final ValidatingStatement[] statements;
+        final int clusteringComponents;
+
+        public Factory(ValidatingStatement[] statements, int 
clusteringComponents)
+        {
+            this.statements = statements;
+            this.clusteringComponents = clusteringComponents;
+        }
+
+        public ValidatingSchemaQuery create(Timer timer, StressSettings 
settings, PartitionGenerator generator, SeedManager seedManager, 
ConsistencyLevel cl)
+        {
+            return new ValidatingSchemaQuery(timer, settings, generator, 
seedManager, statements, cl, clusteringComponents);
+        }
+    }
+
+    public static List<Factory> create(TableMetadata metadata, StressSettings 
settings)
+    {
+        List<Factory> factories = new ArrayList<>();
+        StringBuilder sb = new StringBuilder();
+        boolean first = true;
+        sb.append("SELECT * FROM ");
+        sb.append(metadata.getName());
+        sb.append(" WHERE");
+        for (ColumnMetadata pk : metadata.getPartitionKey())
+        {
+            sb.append(first ? " " : " AND ");
+            sb.append(pk.getName());
+            sb.append(" = ?");
+            first = false;
+        }
+        String base = sb.toString();
+
+        factories.add(new Factory(new ValidatingStatement[] { 
prepare(settings, base, true, true) }, 0));
+
+        int maxDepth = metadata.getClusteringColumns().size() - 1;
+        for (int depth = 0 ; depth <= maxDepth  ; depth++)
+        {
+            StringBuilder cc = new StringBuilder();
+            StringBuilder arg = new StringBuilder();
+            cc.append("("); arg.append("(");
+            for (int d = 0 ; d <= depth ; d++)
+            {
+                if (d > 0) { cc.append(","); arg.append(","); }
+                cc.append(metadata.getClusteringColumns().get(d).getName());
+                arg.append("?");
+            }
+            cc.append(")"); arg.append(")");
+
+            ValidatingStatement[] statements = new ValidatingStatement[depth < 
maxDepth ? 1 : 4];
+            int i = 0;
+            for (boolean incLb : depth < maxDepth ? new boolean[] { true } : 
new boolean[] { true, false } )
+            {
+                for (boolean incUb : depth < maxDepth ? new boolean[] { false 
} : new boolean[] { true, false } )
+                {
+                    String lb = incLb ? ">=" : ">";
+                    String ub = incUb ? "<=" : "<";
+                    sb.setLength(0);
+                    sb.append(base);
+                    sb.append(" AND ");
+                    sb.append(cc);
+                    sb.append(lb);
+                    sb.append(arg);
+                    sb.append(" AND ");
+                    sb.append(cc);
+                    sb.append(ub);
+                    sb.append(arg);
+                    statements[i++] = prepare(settings, sb.toString(), incLb, 
incUb);
+                }
+            }
+            factories.add(new Factory(statements, depth + 1));
+        }
+
+        return factories;
+    }
+
+    private static class ValidatingStatement
+    {
+        final PreparedStatement statement;
+        final Integer thriftId;
+        final boolean inclusiveStart;
+        final boolean inclusiveEnd;
+        private ValidatingStatement(PreparedStatement statement, Integer 
thriftId, boolean inclusiveStart, boolean inclusiveEnd)
+        {
+            this.statement = statement;
+            this.thriftId = thriftId;
+            this.inclusiveStart = inclusiveStart;
+            this.inclusiveEnd = inclusiveEnd;
+        }
+    }
+
+    private static ValidatingStatement prepare(StressSettings settings, String 
cql, boolean incLb, boolean incUb)
+    {
+        JavaDriverClient jclient = settings.getJavaDriverClient();
+        ThriftClient tclient = settings.getThriftClient();
+        PreparedStatement statement = jclient.prepare(cql);
+        try
+        {
+            Integer thriftId = tclient.prepare_cql3_query(cql, 
Compression.NONE);
+            return new ValidatingStatement(statement, thriftId, incLb, incUb);
+        }
+        catch (TException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bee990c/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefinedMixed.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefinedMixed.java
 
b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefinedMixed.java
index 3a1d552..861b1a4 100644
--- 
a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefinedMixed.java
+++ 
b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefinedMixed.java
@@ -21,10 +21,7 @@ package org.apache.cassandra.stress.settings;
  */
 
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 import org.apache.cassandra.stress.Operation;
 import org.apache.cassandra.stress.generate.DistributionFactory;
@@ -58,9 +55,9 @@ public class SettingsCommandPreDefinedMixed extends 
SettingsCommandPreDefined
         final SeedManager seeds = new SeedManager(settings);
         return new SampledOpDistributionFactory<Command>(ratios, clustering)
         {
-            protected Operation get(Timer timer, PartitionGenerator generator, 
Command key)
+            protected List<? extends Operation> get(Timer timer, 
PartitionGenerator generator, Command key)
             {
-                return PredefinedOperation.operation(key, timer, generator, 
seeds, settings, add);
+                return 
Collections.singletonList(PredefinedOperation.operation(key, timer, generator, 
seeds, settings, add));
             }
 
             protected PartitionGenerator newGenerator()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bee990c/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandUser.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandUser.java
 
b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandUser.java
index d4e43cf..5228828 100644
--- 
a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandUser.java
+++ 
b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandUser.java
@@ -24,6 +24,7 @@ package org.apache.cassandra.stress.settings;
 import java.io.File;
 import java.net.URI;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
@@ -75,11 +76,13 @@ public class SettingsCommandUser extends SettingsCommand
         final SeedManager seeds = new SeedManager(settings);
         return new SampledOpDistributionFactory<String>(ratios, clustering)
         {
-            protected Operation get(Timer timer, PartitionGenerator generator, 
String key)
+            protected List<? extends Operation> get(Timer timer, 
PartitionGenerator generator, String key)
             {
                 if (key.equalsIgnoreCase("insert"))
-                    return profile.getInsert(timer, generator, seeds, 
settings);
-                return profile.getQuery(key, timer, generator, seeds, 
settings);
+                    return Collections.singletonList(profile.getInsert(timer, 
generator, seeds, settings));
+                if (key.equalsIgnoreCase("validate"))
+                    return profile.getValidate(timer, generator, seeds, 
settings);
+                return Collections.singletonList(profile.getQuery(key, timer, 
generator, seeds, settings));
             }
 
             protected PartitionGenerator newGenerator()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bee990c/tools/stress/src/org/apache/cassandra/stress/settings/ValidationType.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/settings/ValidationType.java 
b/tools/stress/src/org/apache/cassandra/stress/settings/ValidationType.java
deleted file mode 100644
index 710b717..0000000
--- a/tools/stress/src/org/apache/cassandra/stress/settings/ValidationType.java
+++ /dev/null
@@ -1,29 +0,0 @@
-package org.apache.cassandra.stress.settings;
-/*
- * 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- * 
- */
-
-
-public enum ValidationType
-{
-
-    NOT_FAIL, NON_ZERO, SUBSET, EQUAL
-
-}

Reply via email to