Updated Branches: refs/heads/trunk 75c4cfa5c -> aa8989d9b
(cql3) Allow defining default consistency levels patch by slebresne; reviewed by yukim for CASSANDRA-4448 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/aa8989d9 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/aa8989d9 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/aa8989d9 Branch: refs/heads/trunk Commit: aa8989d9b98f35a95c988f0b664a0b4ffd232bac Parents: 75c4cfa Author: Sylvain Lebresne <sylv...@datastax.com> Authored: Thu Sep 13 21:34:34 2012 +0200 Committer: Sylvain Lebresne <sylv...@datastax.com> Committed: Thu Sep 13 21:34:34 2012 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/config/CFMetaData.java | 42 ++++++++++++++- src/java/org/apache/cassandra/cql3/CFPropDefs.java | 43 +++++++++++++++ src/java/org/apache/cassandra/cql3/Cql.g | 2 +- .../cassandra/cql3/statements/BatchStatement.java | 27 +++++++++- .../cql3/statements/ModificationStatement.java | 9 +++- .../cassandra/cql3/statements/SelectStatement.java | 12 +++- 7 files changed, 127 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa8989d9/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 371af7c..50ecbd1 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -62,6 +62,7 @@ * fix counter add/get using CQL2 and CQL3 in stress tool (CASSANDRA-4633) * Add sstable count per level to cfstats (CASSANDRA-4537) * (cql3) Add ALTER KEYSPACE statement (CASSANDRA-4611) + * (cql3) Allow defining default consistency levels (CASSANDRA-4448) 1.1.6 http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa8989d9/src/java/org/apache/cassandra/config/CFMetaData.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java index cb8951c..adbd853 100644 --- a/src/java/org/apache/cassandra/config/CFMetaData.java +++ b/src/java/org/apache/cassandra/config/CFMetaData.java @@ -131,6 +131,8 @@ public final class CFMetaData + "value_alias text," + "column_aliases text," + "compaction_strategy_options text," + + "default_read_consistency text," + + "default_write_consistency text," + "PRIMARY KEY (keyspace_name, columnfamily_name)" + ") WITH COMMENT='ColumnFamily definitions' AND gc_grace_seconds=8640"); public static final CFMetaData SchemaColumnsCf = compile(10, "CREATE TABLE " + SystemTable.SCHEMA_COLUMNS_CF + "(" @@ -245,6 +247,11 @@ public final class CFMetaData public CompressionParameters compressionParameters; + // Default consistency levels for CQL3. The default for those values is ONE, + // but we keep the internal default to null as it help handling thrift compatibility + private ConsistencyLevel readConsistencyLevel; + private ConsistencyLevel writeConsistencyLevel; + // Processed infos used by CQL. This can be fully reconstructed from the CFMedata, // so it's not saved on disk. It is however costlyish to recreate for each query // so we cache it here (and update on each relevant CFMetadata change) @@ -268,6 +275,8 @@ public final class CFMetaData public CFMetaData compressionParameters(CompressionParameters prop) {compressionParameters = prop; return this;} public CFMetaData bloomFilterFpChance(Double prop) {bloomFilterFpChance = prop; return this;} public CFMetaData caching(Caching prop) {caching = prop; return this;} + public CFMetaData defaultReadCL(ConsistencyLevel prop) {readConsistencyLevel = prop; return this;} + public CFMetaData defaultWriteCL(ConsistencyLevel prop) {writeConsistencyLevel = prop; return this;} public CFMetaData(String keyspace, String name, ColumnFamilyType type, AbstractType<?> comp, AbstractType<?> subcc) { @@ -433,7 +442,9 @@ public final class CFMetaData .compactionStrategyOptions(oldCFMD.compactionStrategyOptions) .compressionParameters(oldCFMD.compressionParameters) .bloomFilterFpChance(oldCFMD.bloomFilterFpChance) - .caching(oldCFMD.caching); + .caching(oldCFMD.caching) + .defaultReadCL(oldCFMD.readConsistencyLevel) + .defaultWriteCL(oldCFMD.writeConsistencyLevel); } /** @@ -519,6 +530,16 @@ public final class CFMetaData return valueAlias; } + public ConsistencyLevel getReadConsistencyLevel() + { + return readConsistencyLevel == null ? ConsistencyLevel.ONE : readConsistencyLevel; + } + + public ConsistencyLevel getWriteConsistencyLevel() + { + return writeConsistencyLevel == null ? ConsistencyLevel.ONE : writeConsistencyLevel; + } + public CompressionParameters compressionParameters() { return compressionParameters; @@ -581,6 +602,8 @@ public final class CFMetaData .append(compressionParameters, rhs.compressionParameters) .append(bloomFilterFpChance, rhs.bloomFilterFpChance) .append(caching, rhs.caching) + .append(readConsistencyLevel, rhs.readConsistencyLevel) + .append(writeConsistencyLevel, rhs.writeConsistencyLevel) .isEquals(); } @@ -611,6 +634,8 @@ public final class CFMetaData .append(compressionParameters) .append(bloomFilterFpChance) .append(caching) + .append(readConsistencyLevel) + .append(writeConsistencyLevel) .toHashCode(); } @@ -799,6 +824,11 @@ public final class CFMetaData } if (cfm.valueAlias != null) valueAlias = cfm.valueAlias; + if (cfm.readConsistencyLevel != null) + readConsistencyLevel = cfm.readConsistencyLevel; + if (cfm.writeConsistencyLevel != null) + writeConsistencyLevel = cfm.writeConsistencyLevel; + bloomFilterFpChance = cfm.bloomFilterFpChance; caching = cfm.caching; @@ -1258,6 +1288,10 @@ public final class CFMetaData : Column.create(valueAlias, timestamp, cfName, "value_alias")); cf.addColumn(Column.create(json(aliasesAsStrings(columnAliases)), timestamp, cfName, "column_aliases")); cf.addColumn(Column.create(json(compactionStrategyOptions), timestamp, cfName, "compaction_strategy_options")); + cf.addColumn(readConsistencyLevel == null ? DeletedColumn.create(ldt, timestamp, cfName, "default_read_consistency") + : Column.create(readConsistencyLevel.toString(), timestamp, cfName, "default_read_consistency")); + cf.addColumn(writeConsistencyLevel == null ? DeletedColumn.create(ldt, timestamp, cfName, "default_write_consistency") + : Column.create(writeConsistencyLevel.toString(), timestamp, cfName, "default_write_consistency")); } // Package protected for use by tests @@ -1302,6 +1336,10 @@ public final class CFMetaData if (result.has("value_alias")) cfm.valueAlias(result.getBytes("value_alias")); cfm.compactionStrategyOptions(fromJsonMap(result.getString("compaction_strategy_options"))); + if (result.has("default_read_consistency")) + cfm.defaultReadCL(Enum.valueOf(ConsistencyLevel.class, result.getString("default_read_consistency"))); + if (result.has("default_write_consistency")) + cfm.defaultWriteCL(Enum.valueOf(ConsistencyLevel.class, result.getString("default_write_consistency"))); return cfm; } @@ -1465,6 +1503,8 @@ public final class CFMetaData .append("compressionOptions", compressionParameters.asThriftOptions()) .append("bloomFilterFpChance", bloomFilterFpChance) .append("caching", caching) + .append("readConsistencyLevel", readConsistencyLevel) + .append("writeConsistencyLevel", writeConsistencyLevel) .toString(); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa8989d9/src/java/org/apache/cassandra/cql3/CFPropDefs.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/CFPropDefs.java b/src/java/org/apache/cassandra/cql3/CFPropDefs.java index fb5f365..cb78db0 100644 --- a/src/java/org/apache/cassandra/cql3/CFPropDefs.java +++ b/src/java/org/apache/cassandra/cql3/CFPropDefs.java @@ -30,6 +30,8 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.exceptions.SyntaxException; +import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.compaction.AbstractCompactionStrategy; import org.apache.cassandra.io.compress.CompressionParameters; @@ -46,6 +48,9 @@ public class CFPropDefs extends PropertyDefinitions public static final String KW_REPLICATEONWRITE = "replicate_on_write"; public static final String KW_CACHING = "caching"; public static final String KW_BF_FP_CHANCE = "bloom_filter_fp_chance"; + public static final String KW_DEFAULT_R_CONSISTENCY = "default_read_consistency"; + public static final String KW_DEFAULT_W_CONSISTENCY = "default_write_consistency"; + public static final String KW_COMPACTION = "compaction"; public static final String KW_COMPRESSION = "compression"; @@ -65,6 +70,8 @@ public class CFPropDefs extends PropertyDefinitions keywords.add(KW_BF_FP_CHANCE); keywords.add(KW_COMPACTION); keywords.add(KW_COMPRESSION); + keywords.add(KW_DEFAULT_W_CONSISTENCY); + keywords.add(KW_DEFAULT_R_CONSISTENCY); obsoleteKeywords.add("compaction_strategy_class"); obsoleteKeywords.add("compaction_strategy_options"); @@ -136,6 +143,42 @@ public class CFPropDefs extends PropertyDefinitions if (!getCompressionOptions().isEmpty()) cfm.compressionParameters(CompressionParameters.create(getCompressionOptions())); + + try + { + ConsistencyLevel readCL = getConsistencyLevel(KW_DEFAULT_R_CONSISTENCY); + if (readCL != null) + { + readCL.validateForRead(cfm.ksName); + cfm.defaultReadCL(readCL); + } + ConsistencyLevel writeCL = getConsistencyLevel(KW_DEFAULT_W_CONSISTENCY); + if (writeCL != null) + { + writeCL.validateForWrite(cfm.ksName); + cfm.defaultWriteCL(writeCL); + } + } + catch (InvalidRequestException e) + { + throw new ConfigurationException(e.getMessage(), e.getCause()); + } + } + + public ConsistencyLevel getConsistencyLevel(String key) throws ConfigurationException, SyntaxException + { + String value = getSimple(key); + if (value == null) + return null; + + try + { + return Enum.valueOf(ConsistencyLevel.class, value); + } + catch (IllegalArgumentException e) + { + throw new ConfigurationException(String.format("Invalid consistency level value: %s", value)); + } } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa8989d9/src/java/org/apache/cassandra/cql3/Cql.g ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/Cql.g b/src/java/org/apache/cassandra/cql3/Cql.g index 01dbafd..1379b9a 100644 --- a/src/java/org/apache/cassandra/cql3/Cql.g +++ b/src/java/org/apache/cassandra/cql3/Cql.g @@ -189,7 +189,7 @@ useStatement returns [UseStatement stmt] selectStatement returns [SelectStatement.RawStatement expr] @init { boolean isCount = false; - ConsistencyLevel cLevel = ConsistencyLevel.ONE; + ConsistencyLevel cLevel = null; int limit = 10000; Map<ColumnIdentifier, Boolean> orderings = new LinkedHashMap<ColumnIdentifier, Boolean>(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa8989d9/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java index ac78c89..246a97b 100644 --- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java @@ -23,9 +23,10 @@ import java.util.concurrent.TimeoutException; import org.apache.cassandra.auth.Permission; import org.apache.cassandra.cql3.*; +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.CounterMutation; import org.apache.cassandra.db.IMutation; import org.apache.cassandra.db.RowMutation; -import org.apache.cassandra.db.CounterMutation; import org.apache.cassandra.exceptions.*; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.thrift.RequestType; @@ -76,11 +77,21 @@ public class BatchStatement extends ModificationStatement } } + @Override + public ConsistencyLevel getConsistencyLevel() + { + // We have validated that either the consistency is set, or all statements have the same default CL (see validate()) + return isSetConsistencyLevel() + ? super.getConsistencyLevel() + : (statements.isEmpty() ? ConsistencyLevel.ONE : statements.get(0).getConsistencyLevel()); + } + public void validate(ClientState state) throws InvalidRequestException { if (getTimeToLive() != 0) throw new InvalidRequestException("Global TTL on the BATCH statement is not supported."); + ConsistencyLevel cLevel = null; for (ModificationStatement statement : statements) { if (statement.isSetConsistencyLevel()) @@ -92,7 +103,19 @@ public class BatchStatement extends ModificationStatement if (statement.getTimeToLive() < 0) throw new InvalidRequestException("A TTL must be greater or equal to 0"); - getConsistencyLevel().validateForWrite(statement.keyspace()); + if (isSetConsistencyLevel()) + { + getConsistencyLevel().validateForWrite(statement.keyspace()); + } + else + { + // If no consistency is set for the batch, we need all the CF in the batch to have the same default consistency level, + // otherwise the batch is invalid (i.e. the user must explicitely set the CL) + ConsistencyLevel stmtCL = statement.getConsistencyLevel(); + if (cLevel != null && cLevel != stmtCL) + throw new InvalidRequestException("The tables involved in the BATCH have different default write consistency, you must explicitely set the BATCH consitency level with USING CONSISTENCY"); + cLevel = stmtCL; + } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa8989d9/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java index 291ecd9..b960704 100644 --- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java @@ -23,6 +23,8 @@ import java.nio.ByteBuffer; import java.util.*; import org.apache.cassandra.auth.Permission; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.Schema; import org.apache.cassandra.cql3.*; import org.apache.cassandra.transport.messages.ResultMessage; import org.apache.cassandra.db.*; @@ -31,7 +33,6 @@ import org.apache.cassandra.db.marshal.CompositeType; import org.apache.cassandra.exceptions.*; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.StorageProxy; -import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.thrift.RequestType; import org.apache.cassandra.thrift.ThriftValidation; @@ -80,7 +81,11 @@ public abstract class ModificationStatement extends CFStatement implements CQLSt public ConsistencyLevel getConsistencyLevel() { - return (cLevel != null) ? cLevel : defaultConsistency; + if (cLevel != null) + return cLevel; + + CFMetaData cfm = Schema.instance.getCFMetaData(keyspace(), columnFamily()); + return cfm == null ? ConsistencyLevel.ONE : cfm.getWriteConsistencyLevel(); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa8989d9/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java index 07c8453..1c2b631 100644 --- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java @@ -180,7 +180,7 @@ public class SelectStatement implements CQLStatement try { - return StorageProxy.read(commands, parameters.consistencyLevel); + return StorageProxy.read(commands, getConsistencyLevel()); } catch (IOException e) { @@ -205,7 +205,7 @@ public class SelectStatement implements CQLStatement getLimit(), true, // limit by columns, not keys false), - parameters.consistencyLevel); + getConsistencyLevel()); } catch (IOException e) { @@ -294,6 +294,11 @@ public class SelectStatement implements CQLStatement return sliceRestriction != null && !sliceRestriction.isInclusive(Bound.START) ? parameters.limit + 1 : parameters.limit; } + private ConsistencyLevel getConsistencyLevel() + { + return parameters.consistencyLevel == null ? cfDef.cfm.getReadConsistencyLevel() : parameters.consistencyLevel; + } + private Collection<ByteBuffer> getKeys(final List<ByteBuffer> variables) throws InvalidRequestException { List<ByteBuffer> keys = new ArrayList<ByteBuffer>(); @@ -917,7 +922,8 @@ public class SelectStatement implements CQLStatement public ParsedStatement.Prepared prepare() throws InvalidRequestException { CFMetaData cfm = ThriftValidation.validateColumnFamily(keyspace(), columnFamily()); - parameters.consistencyLevel.validateForRead(keyspace()); + if (parameters.consistencyLevel != null) + parameters.consistencyLevel.validateForRead(keyspace()); if (parameters.limit <= 0) throw new InvalidRequestException("LIMIT must be strictly positive");