Merge branch 'cassandra-3.0' into cassandra-3.X
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8c95d376 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8c95d376 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8c95d376 Branch: refs/heads/cassandra-3.X Commit: 8c95d376dc2211a7e34acb3c533dcd4684330d96 Parents: af60ca9 db869bc Author: Aleksey Yeschenko <alek...@apache.org> Authored: Tue Oct 4 16:11:31 2016 +0100 Committer: Aleksey Yeschenko <alek...@apache.org> Committed: Tue Oct 4 16:22:06 2016 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/io/sstable/StressCQLSSTableWriter.java | 8 +++++--- .../src/org/apache/cassandra/stress/Stress.java | 2 +- .../org/apache/cassandra/stress/StressProfile.java | 14 +++++++------- 4 files changed, 14 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c95d376/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 0c1fd76,827a208..2819aea --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,78 -1,5 +1,79 @@@ -3.0.10 +3.10 + * Add hint delivery metrics (CASSANDRA-12693) + * Remove IndexInfo cache from FileIndexInfoRetriever (CASSANDRA-12731) + * ColumnIndex does not reuse buffer (CASSANDRA-12502) + * cdc column addition still breaks schema migration tasks (CASSANDRA-12697) + * Upgrade metrics-reporter dependencies (CASSANDRA-12089) + * Tune compaction thread count via nodetool (CASSANDRA-12248) + * Add +=/-= shortcut syntax for update queries (CASSANDRA-12232) + * Include repair session IDs in repair start message (CASSANDRA-12532) + * Add a blocking task to Index, run before joining the ring (CASSANDRA-12039) + * Fix NPE when using CQLSSTableWriter (CASSANDRA-12667) + * Support optional backpressure strategies at the coordinator (CASSANDRA-9318) + * Make randompartitioner work with new vnode allocation (CASSANDRA-12647) + * Fix cassandra-stress graphing (CASSANDRA-12237) + * Allow filtering on partition key columns for queries without secondary indexes (CASSANDRA-11031) + * Fix Cassandra Stress reporting thread model and precision (CASSANDRA-12585) + * Add JMH benchmarks.jar (CASSANDRA-12586) + * Add row offset support to SASI (CASSANDRA-11990) + * Cleanup uses of AlterTableStatementColumn (CASSANDRA-12567) + * Add keep-alive to streaming (CASSANDRA-11841) + * Tracing payload is passed through newSession(..) (CASSANDRA-11706) + * avoid deleting non existing sstable files and improve related log messages (CASSANDRA-12261) + * json/yaml output format for nodetool compactionhistory (CASSANDRA-12486) + * Retry all internode messages once after a connection is + closed and reopened (CASSANDRA-12192) + * Add support to rebuild from targeted replica (CASSANDRA-9875) + * Add sequence distribution type to cassandra stress (CASSANDRA-12490) + * "SELECT * FROM foo LIMIT ;" does not error out (CASSANDRA-12154) + * Define executeLocally() at the ReadQuery Level (CASSANDRA-12474) + * Extend read/write failure messages with a map of replica addresses + to error codes in the v5 native protocol (CASSANDRA-12311) + * Fix rebuild of SASI indexes with existing index files (CASSANDRA-12374) + * Let DatabaseDescriptor not implicitly startup services (CASSANDRA-9054, 12550) + * Fix clustering indexes in presence of static columns in SASI (CASSANDRA-12378) + * Fix queries on columns with reversed type on SASI indexes (CASSANDRA-12223) + * Added slow query log (CASSANDRA-12403) + * Count full coordinated request against timeout (CASSANDRA-12256) + * Allow TTL with null value on insert and update (CASSANDRA-12216) + * Make decommission operation resumable (CASSANDRA-12008) + * Add support to one-way targeted repair (CASSANDRA-9876) + * Remove clientutil jar (CASSANDRA-11635) + * Fix compaction throughput throttle (CASSANDRA-12366, CASSANDRA-12717) + * Delay releasing Memtable memory on flush until PostFlush has finished running (CASSANDRA-12358) + * Cassandra stress should dump all setting on startup (CASSANDRA-11914) + * Make it possible to compact a given token range (CASSANDRA-10643) + * Allow updating DynamicEndpointSnitch properties via JMX (CASSANDRA-12179) + * Collect metrics on queries by consistency level (CASSANDRA-7384) + * Add support for GROUP BY to SELECT statement (CASSANDRA-10707) + * Deprecate memtable_cleanup_threshold and update default for memtable_flush_writers (CASSANDRA-12228) + * Upgrade to OHC 0.4.4 (CASSANDRA-12133) + * Add version command to cassandra-stress (CASSANDRA-12258) + * Create compaction-stress tool (CASSANDRA-11844) + * Garbage-collecting compaction operation and schema option (CASSANDRA-7019) + * Add beta protocol flag for v5 native protocol (CASSANDRA-12142) + * Support filtering on non-PRIMARY KEY columns in the CREATE + MATERIALIZED VIEW statement's WHERE clause (CASSANDRA-10368) + * Unify STDOUT and SYSTEMLOG logback format (CASSANDRA-12004) + * COPY FROM should raise error for non-existing input files (CASSANDRA-12174) + * Faster write path (CASSANDRA-12269) + * Option to leave omitted columns in INSERT JSON unset (CASSANDRA-11424) + * Support json/yaml output in nodetool tpstats (CASSANDRA-12035) + * Expose metrics for successful/failed authentication attempts (CASSANDRA-10635) + * Prepend snapshot name with "truncated" or "dropped" when a snapshot + is taken before truncating or dropping a table (CASSANDRA-12178) + * Optimize RestrictionSet (CASSANDRA-12153) + * cqlsh does not automatically downgrade CQL version (CASSANDRA-12150) + * Omit (de)serialization of state variable in UDAs (CASSANDRA-9613) + * Create a system table to expose prepared statements (CASSANDRA-8831) + * Reuse DataOutputBuffer from ColumnIndex (CASSANDRA-11970) + * Remove DatabaseDescriptor dependency from SegmentedFile (CASSANDRA-11580) + * Add supplied username to authentication error messages (CASSANDRA-12076) + * Remove pre-startup check for open JMX port (CASSANDRA-12074) + * Remove compaction Severity from DynamicEndpointSnitch (CASSANDRA-11738) + * Restore resumable hints delivery (CASSANDRA-11960) +Merged from 3.0: + * Make stress use client mode to avoid checking commit log size on startup (CASSANDRA-12478) * Fix exceptions with new vnode allocation (CASSANDRA-12715) * Unify drain and shutdown processes (CASSANDRA-12509) * Fix NPE in ComponentOfSlice.isEQ() (CASSANDRA-12706) http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c95d376/tools/stress/src/org/apache/cassandra/io/sstable/StressCQLSSTableWriter.java ---------------------------------------------------------------------- diff --cc tools/stress/src/org/apache/cassandra/io/sstable/StressCQLSSTableWriter.java index 4fe05a8,0000000..41a0d6f mode 100644,000000..100644 --- a/tools/stress/src/org/apache/cassandra/io/sstable/StressCQLSSTableWriter.java +++ b/tools/stress/src/org/apache/cassandra/io/sstable/StressCQLSSTableWriter.java @@@ -1,672 -1,0 +1,674 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.sstable; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.*; +import java.util.stream.Collectors; + +import com.datastax.driver.core.ProtocolVersion; +import com.datastax.driver.core.TypeCodec; ++import org.antlr.runtime.RecognitionException; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.Schema; ++import org.apache.cassandra.cql3.CQLFragmentParser; +import org.apache.cassandra.cql3.ColumnSpecification; ++import org.apache.cassandra.cql3.CqlParser; +import org.apache.cassandra.cql3.QueryOptions; - import org.apache.cassandra.cql3.QueryProcessor; +import org.apache.cassandra.cql3.UpdateParameters; +import org.apache.cassandra.cql3.functions.UDHelper; +import org.apache.cassandra.cql3.statements.CreateTableStatement; +import org.apache.cassandra.cql3.statements.CreateTypeStatement; +import org.apache.cassandra.cql3.statements.ParsedStatement; +import org.apache.cassandra.cql3.statements.UpdateStatement; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.marshal.UserType; +import org.apache.cassandra.db.partitions.Partition; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.exceptions.RequestValidationException; +import org.apache.cassandra.exceptions.SyntaxException; +import org.apache.cassandra.io.sstable.format.SSTableFormat; +import org.apache.cassandra.schema.KeyspaceMetadata; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.schema.Types; +import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.Pair; + +/** + * Utility to write SSTables. + * <p> + * Typical usage looks like: + * <pre> + * String type = CREATE TYPE myKs.myType (a int, b int)"; + * String schema = "CREATE TABLE myKs.myTable (" + * + " k int PRIMARY KEY," + * + " v1 text," + * + " v2 int," + * + " v3 myType," + * + ")"; + * String insert = "INSERT INTO myKs.myTable (k, v1, v2, v3) VALUES (?, ?, ?, ?)"; + * + * // Creates a new writer. You need to provide at least the directory where to write the created sstable, + * // the schema for the sstable to write and a (prepared) insert statement to use. If you do not use the + * // default partitioner (Murmur3Partitioner), you will also need to provide the partitioner in use, see + * // StressCQLSSTableWriter.Builder for more details on the available options. + * StressCQLSSTableWriter writer = StressCQLSSTableWriter.builder() + * .inDirectory("path/to/directory") + * .withType(type) + * .forTable(schema) + * .using(insert).build(); + * + * UserType myType = writer.getUDType("myType"); + * // Adds a nember of rows to the resulting sstable + * writer.addRow(0, "test1", 24, myType.newValue().setInt("a", 10).setInt("b", 20)); + * writer.addRow(1, "test2", null, null); + * writer.addRow(2, "test3", 42, myType.newValue().setInt("a", 30).setInt("b", 40)); + * + * // Close the writer, finalizing the sstable + * writer.close(); + * </pre> + * + * Please note that {@code StressCQLSSTableWriter} is <b>not</b> thread-safe (multiple threads cannot access the + * same instance). It is however safe to use multiple instances in parallel (even if those instance write + * sstables for the same table). + */ +public class StressCQLSSTableWriter implements Closeable +{ + public static final ByteBuffer UNSET_VALUE = ByteBufferUtil.UNSET_BYTE_BUFFER; + + static + { + DatabaseDescriptor.clientInitialization(false); + // Partitioner is not set in client mode. + if (DatabaseDescriptor.getPartitioner() == null) + DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance); + } + + private final AbstractSSTableSimpleWriter writer; + private final UpdateStatement insert; + private final List<ColumnSpecification> boundNames; + private final List<TypeCodec> typeCodecs; + private final ColumnFamilyStore cfs; + + private StressCQLSSTableWriter(ColumnFamilyStore cfs, AbstractSSTableSimpleWriter writer, UpdateStatement insert, List<ColumnSpecification> boundNames) + { + this.cfs = cfs; + this.writer = writer; + this.insert = insert; + this.boundNames = boundNames; + this.typeCodecs = boundNames.stream().map(bn -> UDHelper.codecFor(UDHelper.driverType(bn.type))) + .collect(Collectors.toList()); + } + + /** + * Returns a new builder for a StressCQLSSTableWriter. + * + * @return the new builder. + */ + public static Builder builder() + { + return new Builder(); + } + + /** + * Adds a new row to the writer. + * <p> + * This is a shortcut for {@code addRow(Arrays.asList(values))}. + * + * @param values the row values (corresponding to the bind variables of the + * insertion statement used when creating by this writer). + * @return this writer. + */ + public StressCQLSSTableWriter addRow(Object... values) + throws InvalidRequestException, IOException + { + return addRow(Arrays.asList(values)); + } + + /** + * Adds a new row to the writer. + * <p> + * Each provided value type should correspond to the types of the CQL column + * the value is for. The correspondance between java type and CQL type is the + * same one than the one documented at + * www.datastax.com/drivers/java/2.0/apidocs/com/datastax/driver/core/DataType.Name.html#asJavaClass(). + * <p> + * If you prefer providing the values directly as binary, use + * {@link #rawAddRow} instead. + * + * @param values the row values (corresponding to the bind variables of the + * insertion statement used when creating by this writer). + * @return this writer. + */ + public StressCQLSSTableWriter addRow(List<Object> values) + throws InvalidRequestException, IOException + { + int size = Math.min(values.size(), boundNames.size()); + List<ByteBuffer> rawValues = new ArrayList<>(size); + + for (int i = 0; i < size; i++) + { + Object value = values.get(i); + rawValues.add(serialize(value, typeCodecs.get(i))); + } + + return rawAddRow(rawValues); + } + + /** + * Adds a new row to the writer. + * <p> + * This is equivalent to the other addRow methods, but takes a map whose + * keys are the names of the columns to add instead of taking a list of the + * values in the order of the insert statement used during construction of + * this write. + * <p> + * Please note that the column names in the map keys must be in lowercase unless + * the declared column name is a + * <a href="http://cassandra.apache.org/doc/cql3/CQL.html#identifiers">case-sensitive quoted identifier</a> + * (in which case the map key must use the exact case of the column). + * + * @param values a map of colum name to column values representing the new + * row to add. Note that if a column is not part of the map, it's value will + * be {@code null}. If the map contains keys that does not correspond to one + * of the column of the insert statement used when creating this writer, the + * the corresponding value is ignored. + * @return this writer. + */ + public StressCQLSSTableWriter addRow(Map<String, Object> values) + throws InvalidRequestException, IOException + { + int size = boundNames.size(); + List<ByteBuffer> rawValues = new ArrayList<>(size); + for (int i = 0; i < size; i++) + { + ColumnSpecification spec = boundNames.get(i); + Object value = values.get(spec.name.toString()); + rawValues.add(serialize(value, typeCodecs.get(i))); + } + return rawAddRow(rawValues); + } + + /** + * Adds a new row to the writer given already serialized values. + * + * @param values the row values (corresponding to the bind variables of the + * insertion statement used when creating by this writer) as binary. + * @return this writer. + */ + public StressCQLSSTableWriter rawAddRow(ByteBuffer... values) + throws InvalidRequestException, IOException + { + return rawAddRow(Arrays.asList(values)); + } + + /** + * Adds a new row to the writer given already serialized values. + * <p> + * This is a shortcut for {@code rawAddRow(Arrays.asList(values))}. + * + * @param values the row values (corresponding to the bind variables of the + * insertion statement used when creating by this writer) as binary. + * @return this writer. + */ + public StressCQLSSTableWriter rawAddRow(List<ByteBuffer> values) + throws InvalidRequestException, IOException + { + if (values.size() != boundNames.size()) + throw new InvalidRequestException(String.format("Invalid number of arguments, expecting %d values but got %d", boundNames.size(), values.size())); + + QueryOptions options = QueryOptions.forInternalCalls(null, values); + List<ByteBuffer> keys = insert.buildPartitionKeyNames(options); + SortedSet<Clustering> clusterings = insert.createClustering(options); + + long now = System.currentTimeMillis() * 1000; + // Note that we asks indexes to not validate values (the last 'false' arg below) because that triggers a 'Keyspace.open' + // and that forces a lot of initialization that we don't want. + UpdateParameters params = new UpdateParameters(insert.cfm, + insert.updatedColumns(), + options, + insert.getTimestamp(now, options), + insert.getTimeToLive(options), + Collections.<DecoratedKey, Partition>emptyMap()); + + try + { + for (ByteBuffer key : keys) + { + for (Clustering clustering : clusterings) + insert.addUpdateForKey(writer.getUpdateFor(key), clustering, params); + } + return this; + } + catch (SSTableSimpleUnsortedWriter.SyncException e) + { + // If we use a BufferedWriter and had a problem writing to disk, the IOException has been + // wrapped in a SyncException (see BufferedWriter below). We want to extract that IOE. + throw (IOException)e.getCause(); + } + } + + /** + * Adds a new row to the writer given already serialized values. + * <p> + * This is equivalent to the other rawAddRow methods, but takes a map whose + * keys are the names of the columns to add instead of taking a list of the + * values in the order of the insert statement used during construction of + * this write. + * + * @param values a map of colum name to column values representing the new + * row to add. Note that if a column is not part of the map, it's value will + * be {@code null}. If the map contains keys that does not correspond to one + * of the column of the insert statement used when creating this writer, the + * the corresponding value is ignored. + * @return this writer. + */ + public StressCQLSSTableWriter rawAddRow(Map<String, ByteBuffer> values) + throws InvalidRequestException, IOException + { + int size = Math.min(values.size(), boundNames.size()); + List<ByteBuffer> rawValues = new ArrayList<>(size); + for (int i = 0; i < size; i++) + { + ColumnSpecification spec = boundNames.get(i); + rawValues.add(values.get(spec.name.toString())); + } + return rawAddRow(rawValues); + } + + /** + * Returns the User Defined type, used in this SSTable Writer, that can + * be used to create UDTValue instances. + * + * @param dataType name of the User Defined type + * @return user defined type + */ + public com.datastax.driver.core.UserType getUDType(String dataType) + { + KeyspaceMetadata ksm = Schema.instance.getKSMetaData(insert.keyspace()); + UserType userType = ksm.types.getNullable(ByteBufferUtil.bytes(dataType)); + return (com.datastax.driver.core.UserType) UDHelper.driverType(userType); + } + + /** + * Close this writer. + * <p> + * This method should be called, otherwise the produced sstables are not + * guaranteed to be complete (and won't be in practice). + */ + public void close() throws IOException + { + writer.close(); + } + + private ByteBuffer serialize(Object value, TypeCodec codec) + { + if (value == null || value == UNSET_VALUE) + return (ByteBuffer) value; + + return codec.serialize(value, ProtocolVersion.NEWEST_SUPPORTED); + } + /** + * The writer loads data in directories corresponding to how they laid out on the server. + * <p> + * {keyspace}/{table-cfid}/ + * + * This method can be used to fetch the innermost directory with the sstable components + * @return The directory containing the sstable components + */ + public File getInnermostDirectory() + { + return cfs.getDirectories().getDirectoryForNewSSTables(); + } + + /** + * A Builder for a StressCQLSSTableWriter object. + */ + public static class Builder + { + private final List<File> directoryList; + private ColumnFamilyStore cfs; + + protected SSTableFormat.Type formatType = null; + + private Boolean makeRangeAware = false; + + private CreateTableStatement.RawStatement schemaStatement; + private final List<CreateTypeStatement> typeStatements; + private UpdateStatement.ParsedInsert insertStatement; + private IPartitioner partitioner; + + private boolean sorted = false; + private long bufferSizeInMB = 128; + + protected Builder() + { + this.typeStatements = new ArrayList<>(); + this.directoryList = new ArrayList<>(); + } + + /** + * The directory where to write the sstables. + * <p> + * This is a mandatory option. + * + * @param directory the directory to use, which should exists and be writable. + * @return this builder. + * + * @throws IllegalArgumentException if {@code directory} doesn't exist or is not writable. + */ + public Builder inDirectory(String directory) + { + return inDirectory(new File(directory)); + } + + /** + * The directory where to write the sstables (mandatory option). + * <p> + * This is a mandatory option. + * + * @param directory the directory to use, which should exist and be writable. + * @return this builder. + * + * @throws IllegalArgumentException if {@code directory} doesn't exist or is not writable. + */ + public Builder inDirectory(File directory) + { + if (!directory.exists()) + throw new IllegalArgumentException(directory + " doesn't exists"); + if (!directory.canWrite()) + throw new IllegalArgumentException(directory + " exists but is not writable"); + + directoryList.add(directory); + return this; + } + + /** + * A pre-instanciated ColumnFamilyStore + * <p> + * This is can be used in place of inDirectory and forTable + * + * @see #inDirectory(File) + * + * @param cfs the list of directories to use, which should exist and be writable. + * @return this builder. + * + * @throws IllegalArgumentException if a directory doesn't exist or is not writable. + */ + public Builder withCfs(ColumnFamilyStore cfs) + { + this.cfs = cfs; + return this; + } + + + public Builder withType(String typeDefinition) throws SyntaxException + { + typeStatements.add(parseStatement(typeDefinition, CreateTypeStatement.class, "CREATE TYPE")); + return this; + } + + /** + * The schema (CREATE TABLE statement) for the table for which sstable are to be created. + * <p> + * Please note that the provided CREATE TABLE statement <b>must</b> use a fully-qualified + * table name, one that include the keyspace name. + * <p> + * This is a mandatory option. + * + * @param schema the schema of the table for which sstables are to be created. + * @return this builder. + * + * @throws IllegalArgumentException if {@code schema} is not a valid CREATE TABLE statement + * or does not have a fully-qualified table name. + */ + public Builder forTable(String schema) + { + this.schemaStatement = parseStatement(schema, CreateTableStatement.RawStatement.class, "CREATE TABLE"); + return this; + } + + /** + * The partitioner to use. + * <p> + * By default, {@code Murmur3Partitioner} will be used. If this is not the partitioner used + * by the cluster for which the SSTables are created, you need to use this method to + * provide the correct partitioner. + * + * @param partitioner the partitioner to use. + * @return this builder. + */ + public Builder withPartitioner(IPartitioner partitioner) + { + this.partitioner = partitioner; + return this; + } + + + /** + * Specify if the sstable writer should be vnode range aware. + * This will create a sstable per vnode range. + * + * @param makeRangeAware + * @return + */ + public Builder rangeAware(boolean makeRangeAware) + { + this.makeRangeAware = makeRangeAware; + return this; + } + + /** + * The INSERT statement defining the order of the values to add for a given CQL row. + * <p> + * Please note that the provided INSERT statement <b>must</b> use a fully-qualified + * table name, one that include the keyspace name. Morewover, said statement must use + * bind variables since it is those bind variables that will be bound to values by the + * resulting writer. + * <p> + * This is a mandatory option, and this needs to be called after foTable(). + * + * @param insert an insertion statement that defines the order + * of column values to use. + * @return this builder. + * + * @throws IllegalArgumentException if {@code insertStatement} is not a valid insertion + * statement, does not have a fully-qualified table name or have no bind variables. + */ + public Builder using(String insert) + { + this.insertStatement = parseStatement(insert, UpdateStatement.ParsedInsert.class, "INSERT"); + return this; + } + + /** + * The size of the buffer to use. + * <p> + * This defines how much data will be buffered before being written as + * a new SSTable. This correspond roughly to the data size that will have the created + * sstable. + * <p> + * The default is 128MB, which should be reasonable for a 1GB heap. If you experience + * OOM while using the writer, you should lower this value. + * + * @param size the size to use in MB. + * @return this builder. + */ + public Builder withBufferSizeInMB(int size) + { + this.bufferSizeInMB = size; + return this; + } + + /** + * Creates a StressCQLSSTableWriter that expects sorted inputs. + * <p> + * If this option is used, the resulting writer will expect rows to be + * added in SSTable sorted order (and an exception will be thrown if that + * is not the case during insertion). The SSTable sorted order means that + * rows are added such that their partition key respect the partitioner + * order. + * <p> + * You should thus only use this option is you know that you can provide + * the rows in order, which is rarely the case. If you can provide the + * rows in order however, using this sorted might be more efficient. + * <p> + * Note that if used, some option like withBufferSizeInMB will be ignored. + * + * @return this builder. + */ + public Builder sorted() + { + this.sorted = true; + return this; + } + + @SuppressWarnings("resource") + public StressCQLSSTableWriter build() + { + if (directoryList.isEmpty() && cfs == null) + throw new IllegalStateException("No output directories specified, you should provide a directory with inDirectory()"); + if (schemaStatement == null && cfs == null) + throw new IllegalStateException("Missing schema, you should provide the schema for the SSTable to create with forTable()"); + if (insertStatement == null) + throw new IllegalStateException("No insert statement specified, you should provide an insert statement through using()"); + + synchronized (StressCQLSSTableWriter.class) + { + if (cfs == null) + cfs = createOfflineTable(schemaStatement, typeStatements, directoryList); + + if (partitioner == null) + partitioner = cfs.getPartitioner(); + + Pair<UpdateStatement, List<ColumnSpecification>> preparedInsert = prepareInsert(); + AbstractSSTableSimpleWriter writer = sorted + ? new SSTableSimpleWriter(cfs.getDirectories().getDirectoryForNewSSTables(), cfs.metadata, preparedInsert.left.updatedColumns()) + : new SSTableSimpleUnsortedWriter(cfs.getDirectories().getDirectoryForNewSSTables(), cfs.metadata, preparedInsert.left.updatedColumns(), bufferSizeInMB); + + if (formatType != null) + writer.setSSTableFormatType(formatType); + + writer.setRangeAwareWriting(makeRangeAware); + + return new StressCQLSSTableWriter(cfs, writer, preparedInsert.left, preparedInsert.right); + } + } + + private static void createTypes(String keyspace, List<CreateTypeStatement> typeStatements) + { + KeyspaceMetadata ksm = Schema.instance.getKSMetaData(keyspace); + Types.RawBuilder builder = Types.rawBuilder(keyspace); + for (CreateTypeStatement st : typeStatements) + st.addToRawBuilder(builder); + + ksm = ksm.withSwapped(builder.build()); + Schema.instance.setKeyspaceMetadata(ksm); + } + + public static ColumnFamilyStore createOfflineTable(String schema, List<File> directoryList) + { + return createOfflineTable(parseStatement(schema, CreateTableStatement.RawStatement.class, "CREATE TABLE"), Collections.EMPTY_LIST, directoryList); + } + + /** + * Creates the table according to schema statement + * with specified data directories + */ + public static ColumnFamilyStore createOfflineTable(CreateTableStatement.RawStatement schemaStatement, List<CreateTypeStatement> typeStatements, List<File> directoryList) + { + String keyspace = schemaStatement.keyspace(); + + if (Schema.instance.getKSMetaData(keyspace) == null) + Schema.instance.load(KeyspaceMetadata.create(keyspace, KeyspaceParams.simple(1))); + + createTypes(keyspace, typeStatements); + + KeyspaceMetadata ksm = Schema.instance.getKSMetaData(keyspace); + + CFMetaData cfMetaData = ksm.tables.getNullable(schemaStatement.columnFamily()); + assert cfMetaData == null; + + CreateTableStatement statement = (CreateTableStatement) schemaStatement.prepare(ksm.types).statement; + statement.validate(ClientState.forInternalCalls()); + + //Build metatdata with a portable cfId + cfMetaData = statement.metadataBuilder() + .withId(CFMetaData.generateLegacyCfId(keyspace, statement.columnFamily())) + .build() + .params(statement.params()); + + Keyspace.setInitialized(); + Directories directories = new Directories(cfMetaData, directoryList.stream().map(Directories.DataDirectory::new).collect(Collectors.toList())); + + Keyspace ks = Keyspace.openWithoutSSTables(keyspace); + ColumnFamilyStore cfs = ColumnFamilyStore.createColumnFamilyStore(ks, cfMetaData.cfName, cfMetaData, directories, false, false, true); + + ks.initCfCustom(cfs); + Schema.instance.load(cfs.metadata); + Schema.instance.setKeyspaceMetadata(ksm.withSwapped(ksm.tables.with(cfs.metadata))); + + return cfs; + } + + /** + * Prepares insert statement for writing data to SSTable + * + * @return prepared Insert statement and it's bound names + */ + private Pair<UpdateStatement, List<ColumnSpecification>> prepareInsert() + { + ParsedStatement.Prepared cqlStatement = insertStatement.prepare(); + UpdateStatement insert = (UpdateStatement) cqlStatement.statement; + insert.validate(ClientState.forInternalCalls()); + + if (insert.hasConditions()) + throw new IllegalArgumentException("Conditional statements are not supported"); + if (insert.isCounter()) + throw new IllegalArgumentException("Counter update statements are not supported"); + if (cqlStatement.boundNames.isEmpty()) + throw new IllegalArgumentException("Provided insert statement has no bind variables"); + + return Pair.create(insert, cqlStatement.boundNames); + } + } + + public static <T extends ParsedStatement> T parseStatement(String query, Class<T> klass, String type) + { + try + { - ParsedStatement stmt = QueryProcessor.parseStatement(query); ++ ParsedStatement stmt = CQLFragmentParser.parseAnyUnhandled(CqlParser::query, query); + + if (!stmt.getClass().equals(klass)) + throw new IllegalArgumentException("Invalid query, must be a " + type + " statement but was: " + stmt.getClass()); + + return klass.cast(stmt); + } - catch (RequestValidationException e) ++ catch (RecognitionException | RequestValidationException e) + { + throw new IllegalArgumentException(e.getMessage(), e); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c95d376/tools/stress/src/org/apache/cassandra/stress/Stress.java ---------------------------------------------------------------------- diff --cc tools/stress/src/org/apache/cassandra/stress/Stress.java index 925f709,bc6d027..3c0fa96 --- a/tools/stress/src/org/apache/cassandra/stress/Stress.java +++ b/tools/stress/src/org/apache/cassandra/stress/Stress.java @@@ -56,23 -54,11 +56,23 @@@ public final class Stres public static void main(String[] arguments) throws Exception { - if (FBUtilities.isWindows()) + if (FBUtilities.isWindows) WindowsTimer.startTimerPeriod(1); + int exitCode = run(arguments); + + if (FBUtilities.isWindows) + WindowsTimer.endTimerPeriod(1); + + System.exit(exitCode); + } + + + private static int run(String[] arguments) + { try { - DatabaseDescriptor.toolInitialization(); ++ DatabaseDescriptor.clientInitialization(); final StressSettings settings; try http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c95d376/tools/stress/src/org/apache/cassandra/stress/StressProfile.java ---------------------------------------------------------------------- diff --cc tools/stress/src/org/apache/cassandra/stress/StressProfile.java index 011b7d7,c19e0cd..84d4abd --- a/tools/stress/src/org/apache/cassandra/stress/StressProfile.java +++ b/tools/stress/src/org/apache/cassandra/stress/StressProfile.java @@@ -28,19 -28,16 +28,19 @@@ import java.io.Serializable import java.net.URI; import java.util.*; import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; +import java.util.stream.Collectors; import com.google.common.base.Function; - import com.google.common.collect.Iterables; import com.google.common.util.concurrent.Uninterruptibles; import com.datastax.driver.core.*; import com.datastax.driver.core.exceptions.AlreadyExistsException; ++import org.antlr.runtime.RecognitionException; import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.Config; -import org.apache.cassandra.cql3.QueryProcessor; -import org.apache.cassandra.cql3.statements.CreateKeyspaceStatement; +import org.apache.cassandra.config.ColumnDefinition; - import org.apache.cassandra.cql3.QueryProcessor; - import org.apache.cassandra.cql3.statements.CreateKeyspaceStatement; ++import org.apache.cassandra.cql3.CQLFragmentParser; ++import org.apache.cassandra.cql3.CqlParser; import org.apache.cassandra.cql3.statements.CreateTableStatement; import org.apache.cassandra.exceptions.RequestValidationException; import org.apache.cassandra.exceptions.SyntaxException; @@@ -166,10 -118,10 +166,10 @@@ public class StressProfile implements S { try { -- String name = ((CreateKeyspaceStatement) QueryProcessor.parseStatement(keyspaceCql)).keyspace(); ++ String name = CQLFragmentParser.parseAnyUnhandled(CqlParser::createKeyspaceStatement, keyspaceCql).keyspace(); assert name.equalsIgnoreCase(keyspaceName) : "Name in keyspace_definition doesn't match keyspace property: '" + name + "' != '" + keyspaceName + "'"; } -- catch (SyntaxException e) ++ catch (RecognitionException | SyntaxException e) { throw new IllegalArgumentException("There was a problem parsing the keyspace cql: " + e.getMessage()); } @@@ -183,10 -135,10 +183,10 @@@ { try { - String name = CFMetaData.compile(tableCql, keyspaceName).cfName; - String name = ((CreateTableStatement.RawStatement) QueryProcessor.parseStatement(tableCql)).columnFamily(); ++ String name = CQLFragmentParser.parseAnyUnhandled(CqlParser::createTableStatement, tableCql).columnFamily(); assert name.equalsIgnoreCase(tableName) : "Name in table_definition doesn't match table property: '" + name + "' != '" + tableName + "'"; } -- catch (RuntimeException e) ++ catch (RecognitionException | RuntimeException e) { throw new IllegalArgumentException("There was a problem parsing the table cql: " + e.getMessage()); }