Merge branch 'cassandra-1.2' into cassandra-2.0 Conflicts: src/java/org/apache/cassandra/db/ConsistencyLevel.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f8fd7db6 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f8fd7db6 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f8fd7db6 Branch: refs/heads/trunk Commit: f8fd7db67e3ff7268bd2bc96bd1373d54664b7dd Parents: 5fa6055 32d7cb5 Author: Sylvain Lebresne <sylv...@datastax.com> Authored: Wed Jan 8 17:22:55 2014 +0100 Committer: Sylvain Lebresne <sylv...@datastax.com> Committed: Wed Jan 8 17:22:55 2014 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/db/ConsistencyLevel.java | 59 +++++++++++++------- .../locator/AbstractReplicationStrategy.java | 2 +- 3 files changed, 41 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8fd7db6/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index faf52a4,cba97d0..d0b63a0 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -5,29 -8,10 +5,30 @@@ Merged from 1.2 * fsync compression metadata (CASSANDRA-6531) * Validate CF existence on execution for prepared statement (CASSANDRA-6535) * Add ability to throttle batchlog replay (CASSANDRA-6550) + * Fix executing LOCAL_QUORUM with SimpleStrategy (CASSANDRA-6545) -1.2.13 +2.0.4 + * Allow removing snapshots of no-longer-existing CFs (CASSANDRA-6418) + * add StorageService.stopDaemon() (CASSANDRA-4268) + * add IRE for invalid CF supplied to get_count (CASSANDRA-5701) + * add client encryption support to sstableloader (CASSANDRA-6378) + * Fix accept() loop for SSL sockets post-shutdown (CASSANDRA-6468) + * Fix size-tiered compaction in LCS L0 (CASSANDRA-6496) + * Fix assertion failure in filterColdSSTables (CASSANDRA-6483) + * Fix row tombstones in larger-than-memory compactions (CASSANDRA-6008) + * Fix cleanup ClassCastException (CASSANDRA-6462) + * Reduce gossip memory use by interning VersionedValue strings (CASSANDRA-6410) + * Allow specifying datacenters to participate in a repair (CASSANDRA-6218) + * Fix divide-by-zero in PCI (CASSANDRA-6403) + * Fix setting last compacted key in the wrong level for LCS (CASSANDRA-6284) + * Add millisecond precision formats to the timestamp parser (CASSANDRA-6395) + * Expose a total memtable size metric for a CF (CASSANDRA-6391) + * cqlsh: handle symlinks properly (CASSANDRA-6425) + * Fix potential infinite loop when paging query with IN (CASSANDRA-6464) + * Fix assertion error in AbstractQueryPager.discardFirst (CASSANDRA-6447) + * Fix streaming older SSTable yields unnecessary tombstones (CASSANDRA-6527) +Merged from 1.2: * Improved error message on bad properties in DDL queries (CASSANDRA-6453) * Randomize batchlog candidates selection (CASSANDRA-6481) * Fix thundering herd on endpoint cache invalidation (CASSANDRA-6345, 6485) http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8fd7db6/src/java/org/apache/cassandra/db/ConsistencyLevel.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/ConsistencyLevel.java index 0f6aba7,3737c73..6d04314 --- a/src/java/org/apache/cassandra/db/ConsistencyLevel.java +++ b/src/java/org/apache/cassandra/db/ConsistencyLevel.java @@@ -89,12 -88,19 +89,19 @@@ public enum ConsistencyLeve return codeIdx[code]; } - private int quorumFor(Table table) ++ private int quorumFor(Keyspace keyspace) + { - return (table.getReplicationStrategy().getReplicationFactor() / 2) + 1; ++ return (keyspace.getReplicationStrategy().getReplicationFactor() / 2) + 1; + } + - private int localQuorumFor(Table table, String dc) + private int localQuorumFor(Keyspace keyspace, String dc) { - return (((NetworkTopologyStrategy) keyspace.getReplicationStrategy()).getReplicationFactor(dc) / 2) + 1; - return (table.getReplicationStrategy() instanceof NetworkTopologyStrategy) - ? (((NetworkTopologyStrategy) table.getReplicationStrategy()).getReplicationFactor(dc) / 2) + 1 - : quorumFor(table); ++ return (keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy) ++ ? (((NetworkTopologyStrategy) keyspace.getReplicationStrategy()).getReplicationFactor(dc) / 2) + 1 ++ : quorumFor(keyspace); } - public int blockFor(Table table) + public int blockFor(Keyspace keyspace) { switch (this) { @@@ -108,17 -114,24 +115,24 @@@ case THREE: return 3; case QUORUM: - return (keyspace.getReplicationStrategy().getReplicationFactor() / 2) + 1; - return quorumFor(table); ++ return quorumFor(keyspace); case ALL: - return table.getReplicationStrategy().getReplicationFactor(); + return keyspace.getReplicationStrategy().getReplicationFactor(); case LOCAL_QUORUM: - return localQuorumFor(table, DatabaseDescriptor.getLocalDataCenter()); + return localQuorumFor(keyspace, DatabaseDescriptor.getLocalDataCenter()); case EACH_QUORUM: - NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) keyspace.getReplicationStrategy(); - int n = 0; - for (String dc : strategy.getDatacenters()) - n += localQuorumFor(keyspace, dc); - return n; - if (table.getReplicationStrategy() instanceof NetworkTopologyStrategy) ++ if (keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy) + { - NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) table.getReplicationStrategy(); ++ NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) keyspace.getReplicationStrategy(); + int n = 0; + for (String dc : strategy.getDatacenters()) - n += localQuorumFor(table, dc); ++ n += localQuorumFor(keyspace, dc); + return n; + } + else + { - return quorumFor(table); ++ return quorumFor(keyspace); + } default: throw new UnsupportedOperationException("Invalid consistency level: " + toString()); } @@@ -211,16 -224,20 +225,20 @@@ case LOCAL_ONE: return countLocalEndpoints(liveEndpoints) >= 1; case LOCAL_QUORUM: - return countLocalEndpoints(liveEndpoints) >= blockFor(table); + return countLocalEndpoints(liveEndpoints) >= blockFor(keyspace); case EACH_QUORUM: - for (Map.Entry<String, Integer> entry : countPerDCEndpoints(keyspace, liveEndpoints).entrySet()) - if (table.getReplicationStrategy() instanceof NetworkTopologyStrategy) ++ if (keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy) { - if (entry.getValue() < localQuorumFor(keyspace, entry.getKey())) - return false; - for (Map.Entry<String, Integer> entry : countPerDCEndpoints(table, liveEndpoints).entrySet()) ++ for (Map.Entry<String, Integer> entry : countPerDCEndpoints(keyspace, liveEndpoints).entrySet()) + { - if (entry.getValue() < localQuorumFor(table, entry.getKey())) ++ if (entry.getValue() < localQuorumFor(keyspace, entry.getKey())) + return false; + } + return true; } - return true; + // Fallthough on purpose for SimpleStrategy default: - return Iterables.size(liveEndpoints) >= blockFor(table); + return Iterables.size(liveEndpoints) >= blockFor(keyspace); } } @@@ -251,14 -268,18 +269,18 @@@ } break; case EACH_QUORUM: - for (Map.Entry<String, Integer> entry : countPerDCEndpoints(keyspace, liveEndpoints).entrySet()) - if (table.getReplicationStrategy() instanceof NetworkTopologyStrategy) ++ if (keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy) { - int dcBlockFor = localQuorumFor(keyspace, entry.getKey()); - int dcLive = entry.getValue(); - if (dcLive < dcBlockFor) - throw new UnavailableException(this, dcBlockFor, dcLive); - for (Map.Entry<String, Integer> entry : countPerDCEndpoints(table, liveEndpoints).entrySet()) ++ for (Map.Entry<String, Integer> entry : countPerDCEndpoints(keyspace, liveEndpoints).entrySet()) + { - int dcBlockFor = localQuorumFor(table, entry.getKey()); ++ int dcBlockFor = localQuorumFor(keyspace, entry.getKey()); + int dcLive = entry.getValue(); + if (dcLive < dcBlockFor) + throw new UnavailableException(this, dcBlockFor, dcLive); + } + break; } - break; + // Fallthough on purpose for SimpleStrategy default: int live = Iterables.size(liveEndpoints); if (live < blockFor) @@@ -281,42 -302,8 +303,39 @@@ } } - public void validateForWrite(String table) throws InvalidRequestException + public void validateForWrite(String keyspaceName) throws InvalidRequestException + { + switch (this) + { - case EACH_QUORUM: - requireNetworkTopologyStrategy(keyspaceName); - break; + case SERIAL: + case LOCAL_SERIAL: + throw new InvalidRequestException("You must use conditional updates for serializable writes"); + } + } + + // This is the same than validateForWrite really, but we include a slightly different error message for SERIAL/LOCAL_SERIAL + public void validateForCasCommit(String keyspaceName) throws InvalidRequestException + { + switch (this) + { + case EACH_QUORUM: + requireNetworkTopologyStrategy(keyspaceName); + break; + case SERIAL: + case LOCAL_SERIAL: + throw new InvalidRequestException(this + " is not supported as conditional update commit consistency. Use ANY if you mean \"make sure it is accepted but I don't care how many replicas commit it for non-SERIAL reads\""); + } + } + + public void validateForCas() throws InvalidRequestException { + if (!isSerialConsistency()) + throw new InvalidRequestException("Invalid consistency for conditional update. Must be one of SERIAL or LOCAL_SERIAL"); + } + + public boolean isSerialConsistency() + { + return this == SERIAL || this == LOCAL_SERIAL; } public void validateCounterForWrite(CFMetaData metadata) throws InvalidRequestException http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8fd7db6/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java index f4a2662,e4dd422..df33813 --- a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java +++ b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java @@@ -132,20 -132,20 +132,20 @@@ public abstract class AbstractReplicati if (consistency_level.isDatacenterLocal()) { // block for in this context will be localnodes block. - return new DatacenterWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, getTable(), callback, writeType); + return new DatacenterWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, getKeyspace(), callback, writeType); } - else if (consistency_level == ConsistencyLevel.EACH_QUORUM) + else if (consistency_level == ConsistencyLevel.EACH_QUORUM && (this instanceof NetworkTopologyStrategy)) { - return new DatacenterSyncWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, getTable(), callback, writeType); + return new DatacenterSyncWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, getKeyspace(), callback, writeType); } - return new WriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, getTable(), callback, writeType); + return new WriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, getKeyspace(), callback, writeType); } - private Table getTable() + private Keyspace getKeyspace() { - if (table == null) - table = Table.open(tableName); - return table; + if (keyspace == null) + keyspace = Keyspace.open(keyspaceName); + return keyspace; } /**