Updated Branches: refs/heads/trunk 21de3328a -> de01d07a0
iss-6691 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/84f2b890 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/84f2b890 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/84f2b890 Branch: refs/heads/trunk Commit: 84f2b8908e9cdc418a03322db41c9db1ac4f6d6f Parents: 67101c2 Author: belliottsmith <git...@sub.laerad.com> Authored: Wed Feb 12 11:36:35 2014 +0000 Committer: belliottsmith <git...@sub.laerad.com> Committed: Wed Feb 12 11:36:35 2014 +0000 ---------------------------------------------------------------------- .../org/apache/cassandra/stress/Operation.java | 74 +++++++++--- .../apache/cassandra/stress/StressAction.java | 7 +- .../cassandra/stress/generatedata/DataGen.java | 6 +- .../stress/generatedata/DataGenBytesRandom.java | 2 +- .../stress/generatedata/DataGenHex.java | 2 +- .../generatedata/DataGenStringDictionary.java | 6 +- .../generatedata/DataGenStringRepeats.java | 16 +-- .../cassandra/stress/generatedata/KeyGen.java | 2 +- .../cassandra/stress/generatedata/RowGen.java | 4 +- .../operations/CqlIndexedRangeSlicer.java | 2 +- .../stress/operations/CqlInserter.java | 2 +- .../stress/operations/CqlOperation.java | 112 +++++++++++++++++++ .../cassandra/stress/operations/CqlReader.java | 8 +- .../stress/operations/ThriftCounterAdder.java | 2 +- .../operations/ThriftIndexedRangeSlicer.java | 2 +- .../stress/operations/ThriftInserter.java | 6 +- .../stress/operations/ThriftReader.java | 30 ++++- 17 files changed, 239 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/84f2b890/tools/stress/src/org/apache/cassandra/stress/Operation.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/Operation.java b/tools/stress/src/org/apache/cassandra/stress/Operation.java index fa7a453..4519b19 100644 --- a/tools/stress/src/org/apache/cassandra/stress/Operation.java +++ b/tools/stress/src/org/apache/cassandra/stress/Operation.java @@ -21,10 +21,25 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collections; +import java.util.EnumMap; import java.util.List; import org.apache.cassandra.stress.generatedata.KeyGen; import org.apache.cassandra.stress.generatedata.RowGen; +import org.apache.cassandra.stress.operations.CqlCounterAdder; +import org.apache.cassandra.stress.operations.CqlCounterGetter; +import org.apache.cassandra.stress.operations.CqlIndexedRangeSlicer; +import org.apache.cassandra.stress.operations.CqlInserter; +import org.apache.cassandra.stress.operations.CqlMultiGetter; +import org.apache.cassandra.stress.operations.CqlRangeSlicer; +import org.apache.cassandra.stress.operations.CqlReader; +import org.apache.cassandra.stress.operations.ThriftCounterAdder; +import org.apache.cassandra.stress.operations.ThriftCounterGetter; +import org.apache.cassandra.stress.operations.ThriftIndexedRangeSlicer; +import org.apache.cassandra.stress.operations.ThriftInserter; +import org.apache.cassandra.stress.operations.ThriftMultiGetter; +import org.apache.cassandra.stress.operations.ThriftRangeSlicer; +import org.apache.cassandra.stress.operations.ThriftReader; import org.apache.cassandra.stress.settings.Command; import org.apache.cassandra.stress.settings.CqlVersion; import org.apache.cassandra.stress.settings.SettingsCommandMixed; @@ -66,7 +81,8 @@ public abstract class Operation public final RowGen rowGen; public final List<ColumnParent> columnParents; public final StressMetrics metrics; - public final SettingsCommandMixed.CommandSelector readWriteSelector; + public final SettingsCommandMixed.CommandSelector commandSelector; + private final EnumMap<Command, State> substates; private Object cqlCache; public State(Command type, StressSettings settings, StressMetrics metrics) @@ -74,9 +90,15 @@ public abstract class Operation this.type = type; this.timer = metrics.getTiming().newTimer(); if (type == Command.MIXED) - readWriteSelector = ((SettingsCommandMixed) settings.command).selector(); + { + commandSelector = ((SettingsCommandMixed) settings.command).selector(); + substates = new EnumMap<>(Command.class); + } else - readWriteSelector = null; + { + commandSelector = null; + substates = null; + } this.settings = settings; this.keyGen = settings.keys.newKeyGen(); this.rowGen = settings.columns.newRowGen(); @@ -91,6 +113,20 @@ public abstract class Operation columnParents = Arrays.asList(cp); } } + + private State(Command type, State copy) + { + this.type = type; + this.timer = copy.timer; + this.rowGen = copy.rowGen; + this.keyGen = copy.keyGen; + this.columnParents = copy.columnParents; + this.metrics = copy.metrics; + this.settings = copy.settings; + this.substates = null; + this.commandSelector = null; + } + public boolean isCql3() { return settings.mode.cqlVersion == CqlVersion.CQL3; @@ -107,6 +143,18 @@ public abstract class Operation { cqlCache = val; } + + public State substate(Command command) + { + assert type == Command.MIXED; + State substate = substates.get(command); + if (substate == null) + { + substates.put(command, substate = new State(command, this)); + } + return substate; + } + } protected ByteBuffer getKey() @@ -119,9 +167,9 @@ public abstract class Operation return state.keyGen.getKeys(count, index); } - protected List<ByteBuffer> generateColumnValues() + protected List<ByteBuffer> generateColumnValues(ByteBuffer key) { - return state.rowGen.generate(index); + return state.rowGen.generate(index, key); } /** @@ -146,20 +194,18 @@ public abstract class Operation boolean success = false; String exceptionMessage = null; - for (int t = 0; t < state.settings.command.tries; t++) + int tries = 0; + for (; tries < state.settings.command.tries; tries++) { - if (success) - break; - try { success = run.run(); + break; } catch (Exception e) { System.err.println(e); exceptionMessage = getExceptionMessage(e); - success = false; } } @@ -167,11 +213,13 @@ public abstract class Operation if (!success) { - error(String.format("Operation [%d] retried %d times - error executing for key %s %s%n", + error(String.format("Operation [%d] x%d key %s %s%n", index, - state.settings.command.tries, + tries, run.key(), - (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")")); + (exceptionMessage == null) + ? "Data returned was not validated" + : "Error executing: " + exceptionMessage)); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/84f2b890/tools/stress/src/org/apache/cassandra/stress/StressAction.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/StressAction.java b/tools/stress/src/org/apache/cassandra/stress/StressAction.java index 0312093..d85f010 100644 --- a/tools/stress/src/org/apache/cassandra/stress/StressAction.java +++ b/tools/stress/src/org/apache/cassandra/stress/StressAction.java @@ -93,6 +93,10 @@ public class StressAction implements Runnable default: throw new IllegalStateException(); } + + // we need to warm up all the nodes in the cluster ideally, but we may not be the only stress instance; + // so warm up all the nodes we're speaking to only. + iterations *= settings.node.nodes.size(); output.println(String.format("Warming up %s with %d iterations...", type, iterations)); run(type, 20, iterations, warmupOutput); } @@ -533,7 +537,8 @@ public class StressAction implements Runnable } case MIXED: - return createOperation(state.readWriteSelector.next(), state, index); + Command subcommand = state.commandSelector.next(); + return createOperation(subcommand, state.substate(subcommand), index); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/84f2b890/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGen.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGen.java b/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGen.java index 4c22005..c441b7e 100644 --- a/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGen.java +++ b/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGen.java @@ -6,13 +6,13 @@ import java.util.List; public abstract class DataGen { - public abstract void generate(ByteBuffer fill, long offset); + public abstract void generate(ByteBuffer fill, long index, ByteBuffer seed); public abstract boolean isDeterministic(); - public void generate(List<ByteBuffer> fills, long offset) + public void generate(List<ByteBuffer> fills, long index, ByteBuffer seed) { for (ByteBuffer fill : fills) - generate(fill, offset++); + generate(fill, index++, seed); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/84f2b890/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenBytesRandom.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenBytesRandom.java b/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenBytesRandom.java index 3906f93..cce438d 100644 --- a/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenBytesRandom.java +++ b/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenBytesRandom.java @@ -9,7 +9,7 @@ public class DataGenBytesRandom extends DataGen private final Random rnd = new Random(); @Override - public void generate(ByteBuffer fill, long offset) + public void generate(ByteBuffer fill, long index, ByteBuffer seed) { fill.clear(); rnd.nextBytes(fill.array()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/84f2b890/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenHex.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenHex.java b/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenHex.java index 50d49dd..b71d3e9 100644 --- a/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenHex.java +++ b/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenHex.java @@ -8,7 +8,7 @@ public abstract class DataGenHex extends DataGen abstract long next(long operationIndex); @Override - public final void generate(ByteBuffer fill, long operationIndex) + public final void generate(ByteBuffer fill, long operationIndex, ByteBuffer seed) { fill.clear(); fillKeyStringBytes(next(operationIndex), fill.array()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/84f2b890/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenStringDictionary.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenStringDictionary.java b/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenStringDictionary.java index e581232..7733ed6 100644 --- a/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenStringDictionary.java +++ b/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenStringDictionary.java @@ -25,13 +25,13 @@ public class DataGenStringDictionary extends DataGen } @Override - public void generate(ByteBuffer fill, long index) + public void generate(ByteBuffer fill, long index, ByteBuffer seed) { fill(fill, 0); } @Override - public void generate(List<ByteBuffer> fills, long index) + public void generate(List<ByteBuffer> fills, long index, ByteBuffer seed) { for (int i = 0 ; i < fills.size() ; i++) fill(fills.get(0), i); @@ -55,7 +55,7 @@ public class DataGenStringDictionary extends DataGen @Override public boolean isDeterministic() { - return true; + return false; } public static DataGenFactory getFactory(File file) throws IOException http://git-wip-us.apache.org/repos/asf/cassandra/blob/84f2b890/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenStringRepeats.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenStringRepeats.java b/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenStringRepeats.java index 47091f7..4c5bb89 100644 --- a/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenStringRepeats.java +++ b/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenStringRepeats.java @@ -25,32 +25,32 @@ public class DataGenStringRepeats extends DataGen } @Override - public void generate(ByteBuffer fill, long index) + public void generate(ByteBuffer fill, long index, ByteBuffer seed) { - fill(fill, index, 0); + fill(fill, index, 0, seed); } @Override - public void generate(List<ByteBuffer> fills, long index) + public void generate(List<ByteBuffer> fills, long index, ByteBuffer seed) { for (int i = 0 ; i < fills.size() ; i++) { - fill(fills.get(i), index, i); + fill(fills.get(i), index, i, seed); } } - private void fill(ByteBuffer fill, long index, int column) + private void fill(ByteBuffer fill, long index, int column, ByteBuffer seed) { fill.clear(); byte[] trg = fill.array(); - byte[] src = getData(index, column); + byte[] src = getData(index, column, seed); for (int j = 0 ; j < trg.length ; j += src.length) System.arraycopy(src, 0, trg, j, Math.min(src.length, trg.length - j)); } - private byte[] getData(long index, int column) + private byte[] getData(long index, int column, ByteBuffer seed) { - final long key = (column * repeatFrequency) + (index % repeatFrequency); + final long key = (column * repeatFrequency) + ((seed == null ? index : Math.abs(seed.hashCode())) % repeatFrequency); byte[] r = cache.get(key); if (r != null) return r; http://git-wip-us.apache.org/repos/asf/cassandra/blob/84f2b890/tools/stress/src/org/apache/cassandra/stress/generatedata/KeyGen.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/generatedata/KeyGen.java b/tools/stress/src/org/apache/cassandra/stress/generatedata/KeyGen.java index cdd6d39..36dc31d 100644 --- a/tools/stress/src/org/apache/cassandra/stress/generatedata/KeyGen.java +++ b/tools/stress/src/org/apache/cassandra/stress/generatedata/KeyGen.java @@ -21,7 +21,7 @@ public class KeyGen { while (keyBuffers.size() < n) keyBuffers.add(ByteBuffer.wrap(new byte[keySize])); - dataGen.generate(keyBuffers, index); + dataGen.generate(keyBuffers, index, null); return keyBuffers; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/84f2b890/tools/stress/src/org/apache/cassandra/stress/generatedata/RowGen.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/generatedata/RowGen.java b/tools/stress/src/org/apache/cassandra/stress/generatedata/RowGen.java index 869fbc7..3174177 100644 --- a/tools/stress/src/org/apache/cassandra/stress/generatedata/RowGen.java +++ b/tools/stress/src/org/apache/cassandra/stress/generatedata/RowGen.java @@ -16,10 +16,10 @@ public abstract class RowGen this.dataGen = dataGenerator; } - public List<ByteBuffer> generate(long operationIndex) + public List<ByteBuffer> generate(long operationIndex, ByteBuffer key) { List<ByteBuffer> fill = getColumns(operationIndex); - dataGen.generate(fill, operationIndex); + dataGen.generate(fill, operationIndex, key); return fill; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/84f2b890/tools/stress/src/org/apache/cassandra/stress/operations/CqlIndexedRangeSlicer.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CqlIndexedRangeSlicer.java b/tools/stress/src/org/apache/cassandra/stress/operations/CqlIndexedRangeSlicer.java index 748bf30..ff43322 100644 --- a/tools/stress/src/org/apache/cassandra/stress/operations/CqlIndexedRangeSlicer.java +++ b/tools/stress/src/org/apache/cassandra/stress/operations/CqlIndexedRangeSlicer.java @@ -70,7 +70,7 @@ public class CqlIndexedRangeSlicer extends CqlOperation<byte[][]> protected void run(CqlOperation.ClientWrapper client) throws IOException { acceptNoResults = false; - final List<ByteBuffer> columns = generateColumnValues(); + final List<ByteBuffer> columns = generateColumnValues(getKey()); final ByteBuffer value = columns.get(1); // only C1 column is indexed byte[] minKey = new byte[0]; int rowCount; http://git-wip-us.apache.org/repos/asf/cassandra/blob/84f2b890/tools/stress/src/org/apache/cassandra/stress/operations/CqlInserter.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CqlInserter.java b/tools/stress/src/org/apache/cassandra/stress/operations/CqlInserter.java index 82f00aa..8d964f5 100644 --- a/tools/stress/src/org/apache/cassandra/stress/operations/CqlInserter.java +++ b/tools/stress/src/org/apache/cassandra/stress/operations/CqlInserter.java @@ -72,7 +72,7 @@ public class CqlInserter extends CqlOperation<Integer> protected List<ByteBuffer> getQueryParameters(byte[] key) { final ArrayList<ByteBuffer> queryParams = new ArrayList<>(); - final List<ByteBuffer> values = generateColumnValues(); + final List<ByteBuffer> values = generateColumnValues(ByteBuffer.wrap(key)); queryParams.addAll(values); queryParams.add(ByteBuffer.wrap(key)); return queryParams; http://git-wip-us.apache.org/repos/asf/cassandra/blob/84f2b890/tools/stress/src/org/apache/cassandra/stress/operations/CqlOperation.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CqlOperation.java b/tools/stress/src/org/apache/cassandra/stress/operations/CqlOperation.java index 744e7f6..bd2f131 100644 --- a/tools/stress/src/org/apache/cassandra/stress/operations/CqlOperation.java +++ b/tools/stress/src/org/apache/cassandra/stress/operations/CqlOperation.java @@ -20,6 +20,7 @@ package org.apache.cassandra.stress.operations; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.List; import com.datastax.driver.core.PreparedStatement; @@ -33,6 +34,7 @@ import org.apache.cassandra.stress.util.JavaDriverClient; import org.apache.cassandra.stress.util.ThriftClient; import org.apache.cassandra.thrift.Compression; import org.apache.cassandra.thrift.CqlResult; +import org.apache.cassandra.thrift.CqlRow; import org.apache.cassandra.thrift.ThriftConversion; import org.apache.cassandra.transport.SimpleClient; import org.apache.cassandra.transport.messages.ResultMessage; @@ -167,6 +169,35 @@ public abstract class CqlOperation<V> extends Operation } + protected final class CqlRunOpMatchResults extends CqlRunOp<ByteBuffer[][]> + { + + final List<List<ByteBuffer>> expect; + + // a null value for an item in expect means we just check the row is present + protected CqlRunOpMatchResults(ClientWrapper client, String query, Object queryId, List<ByteBuffer> params, String id, ByteBuffer key, List<List<ByteBuffer>> expect) + { + super(client, query, queryId, RowsHandler.INSTANCE, params, id, key); + this.expect = expect; + } + + @Override + public int keyCount() + { + return result == null ? 0 : result.length; + } + + public boolean validate(ByteBuffer[][] result) + { + if (result.length != expect.size()) + return false; + for (int i = 0 ; i < result.length ; i++) + if (!expect.get(i).equals(Arrays.asList(result[i]))) + return false; + return true; + } + } + // Cql protected abstract class CqlRunOp<V> implements RunOp { @@ -451,6 +482,87 @@ public abstract class CqlOperation<V> extends Operation } // Processes results from each client into an array of all key bytes returned + protected static final class RowsHandler implements ResultHandler<ByteBuffer[][]> + { + static final RowsHandler INSTANCE = new RowsHandler(); + + @Override + public Function<ResultSet, ByteBuffer[][]> javaDriverHandler() + { + return new Function<ResultSet, ByteBuffer[][]>() + { + + @Override + public ByteBuffer[][] apply(ResultSet result) + { + if (result == null) + return new ByteBuffer[0][]; + List<Row> rows = result.all(); + + ByteBuffer[][] r = new ByteBuffer[rows.size()][]; + for (int i = 0 ; i < r.length ; i++) + { + Row row = rows.get(i); + r[i] = new ByteBuffer[row.getColumnDefinitions().size() - 1]; + for (int j = 1 ; j < row.getColumnDefinitions().size() ; j++) + r[i][j - 1] = row.getBytes(j); + } + return r; + } + }; + } + + @Override + public Function<ResultMessage, ByteBuffer[][]> thriftHandler() + { + return new Function<ResultMessage, ByteBuffer[][]>() + { + + @Override + public ByteBuffer[][] apply(ResultMessage result) + { + if (result instanceof ResultMessage.Rows) + { + ResultMessage.Rows rows = ((ResultMessage.Rows) result); + ByteBuffer[][] r = new ByteBuffer[rows.result.size()][]; + for (int i = 0 ; i < r.length ; i++) + { + List<ByteBuffer> row = rows.result.rows.get(i); + r[i] = new ByteBuffer[row.size()]; + for (int j = 0 ; j < row.size() ; j++) + r[i][j] = row.get(j); + } + return r; + } + return new ByteBuffer[0][]; + } + }; + } + + @Override + public Function<CqlResult, ByteBuffer[][]> simpleNativeHandler() + { + return new Function<CqlResult, ByteBuffer[][]>() + { + + @Override + public ByteBuffer[][] apply(CqlResult result) + { + ByteBuffer[][] r = new ByteBuffer[result.getRows().size()][]; + for (int i = 0 ; i < r.length ; i++) + { + CqlRow row = result.getRows().get(i); + r[i] = new ByteBuffer[row.getColumns().size()]; + for (int j = 0 ; j < r[i].length ; j++) + r[i][j] = ByteBuffer.wrap(row.getColumns().get(j).getValue()); + } + return r; + } + }; + } + + } + // Processes results from each client into an array of all key bytes returned protected static final class KeysHandler implements ResultHandler<byte[][]> { static final KeysHandler INSTANCE = new KeysHandler(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/84f2b890/tools/stress/src/org/apache/cassandra/stress/operations/CqlReader.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CqlReader.java b/tools/stress/src/org/apache/cassandra/stress/operations/CqlReader.java index 749a482..44da43f 100644 --- a/tools/stress/src/org/apache/cassandra/stress/operations/CqlReader.java +++ b/tools/stress/src/org/apache/cassandra/stress/operations/CqlReader.java @@ -23,10 +23,11 @@ package org.apache.cassandra.stress.operations; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; -public class CqlReader extends CqlOperation<Integer> +public class CqlReader extends CqlOperation<ByteBuffer[][]> { public CqlReader(State state, long idx) @@ -79,9 +80,10 @@ public class CqlReader extends CqlOperation<Integer> } @Override - protected CqlRunOp<Integer> buildRunOp(ClientWrapper client, String query, Object queryId, List<ByteBuffer> params, String keyid, ByteBuffer key) + protected CqlRunOp<ByteBuffer[][]> buildRunOp(ClientWrapper client, String query, Object queryId, List<ByteBuffer> params, String keyid, ByteBuffer key) { - return new CqlRunOpTestNonEmpty(client, query, queryId, params, keyid, key); + List<ByteBuffer> expectRow = state.rowGen.isDeterministic() ? generateColumnValues(key) : null; + return new CqlRunOpMatchResults(client, query, queryId, params, keyid, key, Arrays.asList(expectRow)); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/84f2b890/tools/stress/src/org/apache/cassandra/stress/operations/ThriftCounterAdder.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/ThriftCounterAdder.java b/tools/stress/src/org/apache/cassandra/stress/operations/ThriftCounterAdder.java index b1657b2..26695a6 100644 --- a/tools/stress/src/org/apache/cassandra/stress/operations/ThriftCounterAdder.java +++ b/tools/stress/src/org/apache/cassandra/stress/operations/ThriftCounterAdder.java @@ -39,7 +39,7 @@ public class ThriftCounterAdder extends Operation public void run(final ThriftClient client) throws IOException { - List<CounterColumn> columns = new ArrayList<CounterColumn>(); + List<CounterColumn> columns = new ArrayList<>(); for (int i = 0; i < state.settings.columns.maxColumnsPerKey; i++) columns.add(new CounterColumn(getColumnNameBytes(i), 1L)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/84f2b890/tools/stress/src/org/apache/cassandra/stress/operations/ThriftIndexedRangeSlicer.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/ThriftIndexedRangeSlicer.java b/tools/stress/src/org/apache/cassandra/stress/operations/ThriftIndexedRangeSlicer.java index c6b1b03..6eab209 100644 --- a/tools/stress/src/org/apache/cassandra/stress/operations/ThriftIndexedRangeSlicer.java +++ b/tools/stress/src/org/apache/cassandra/stress/operations/ThriftIndexedRangeSlicer.java @@ -49,7 +49,7 @@ public class ThriftIndexedRangeSlicer extends Operation .setSlice_range(new SliceRange(ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, state.settings.columns.maxColumnsPerKey)); - final List<ByteBuffer> columns = generateColumnValues(); + final List<ByteBuffer> columns = generateColumnValues(getKey()); final ColumnParent parent = state.columnParents.get(0); final ByteBuffer columnName = getColumnNameBytes(1); http://git-wip-us.apache.org/repos/asf/cassandra/blob/84f2b890/tools/stress/src/org/apache/cassandra/stress/operations/ThriftInserter.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/ThriftInserter.java b/tools/stress/src/org/apache/cassandra/stress/operations/ThriftInserter.java index c5f8051..b107f26 100644 --- a/tools/stress/src/org/apache/cassandra/stress/operations/ThriftInserter.java +++ b/tools/stress/src/org/apache/cassandra/stress/operations/ThriftInserter.java @@ -42,7 +42,7 @@ public final class ThriftInserter extends Operation public void run(final ThriftClient client) throws IOException { final ByteBuffer key = getKey(); - final List<Column> columns = generateColumns(); + final List<Column> columns = generateColumns(key); Map<String, List<Mutation>> row; if (!state.settings.columns.useSuperColumns) @@ -92,9 +92,9 @@ public final class ThriftInserter extends Operation }); } - protected List<Column> generateColumns() + protected List<Column> generateColumns(ByteBuffer key) { - final List<ByteBuffer> values = generateColumnValues(); + final List<ByteBuffer> values = generateColumnValues(key); final List<Column> columns = new ArrayList<>(values.size()); if (state.settings.columns.useTimeUUIDComparator) http://git-wip-us.apache.org/repos/asf/cassandra/blob/84f2b890/tools/stress/src/org/apache/cassandra/stress/operations/ThriftReader.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/ThriftReader.java b/tools/stress/src/org/apache/cassandra/stress/operations/ThriftReader.java index a8605e8..c50843f 100644 --- a/tools/stress/src/org/apache/cassandra/stress/operations/ThriftReader.java +++ b/tools/stress/src/org/apache/cassandra/stress/operations/ThriftReader.java @@ -19,12 +19,16 @@ package org.apache.cassandra.stress.operations; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; import org.apache.cassandra.stress.Operation; import org.apache.cassandra.stress.util.ThriftClient; +import org.apache.cassandra.thrift.ColumnOrSuperColumn; import org.apache.cassandra.thrift.ColumnParent; import org.apache.cassandra.thrift.SlicePredicate; import org.apache.cassandra.thrift.SliceRange; +import org.apache.cassandra.thrift.SuperColumn; public final class ThriftReader extends Operation { @@ -48,6 +52,7 @@ public final class ThriftReader extends Operation predicate.setColumn_names(state.settings.columns.names); final ByteBuffer key = getKey(); + final List<ByteBuffer> expect = state.rowGen.isDeterministic() ? generateColumnValues(key) : null; for (final ColumnParent parent : state.columnParents) { timeWithRetry(new RunOp() @@ -55,7 +60,30 @@ public final class ThriftReader extends Operation @Override public boolean run() throws Exception { - return client.get_slice(key, parent, predicate, state.settings.command.consistencyLevel).size() != 0; + List<ColumnOrSuperColumn> row = client.get_slice(key, parent, predicate, state.settings.command.consistencyLevel); + if (expect == null) + return !row.isEmpty(); + if (!state.settings.columns.useSuperColumns) + { + if (row.size() != expect.size()) + return false; + for (int i = 0 ; i < row.size() ; i++) + if (!row.get(i).getColumn().bufferForValue().equals(expect.get(i))) + return false; + } + else + { + for (ColumnOrSuperColumn col : row) + { + SuperColumn superColumn = col.getSuper_column(); + if (superColumn.getColumns().size() != expect.size()) + return false; + for (int i = 0 ; i < expect.size() ; i++) + if (!superColumn.getColumns().get(i).bufferForValue().equals(expect.get(i))) + return false; + } + } + return true; } @Override