Repository: cassandra Updated Branches: refs/heads/cassandra-3.0 f21c88851 -> cc2478d4c
Allow EACH_QUORUM for reads patch by Carl Yeksigian; revieiwed by Ariel Weisberg for CASSANDRA-9602 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/cc2478d4 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/cc2478d4 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/cc2478d4 Branch: refs/heads/cassandra-3.0 Commit: cc2478d4ce5d0cabb150489ebec5cda1442bc174 Parents: f21c888 Author: Carl Yeksigian <c...@apache.org> Authored: Tue Oct 6 11:05:04 2015 -0400 Committer: Aleksey Yeschenko <alek...@apache.org> Committed: Wed Oct 14 16:16:58 2015 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + NEWS.txt | 1 + .../apache/cassandra/db/ConsistencyLevel.java | 40 ++++++++++++++++++-- .../exceptions/UnavailableException.java | 5 +++ 4 files changed, 44 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc2478d4/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 1a43cfc..6b7b4eb 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0-rc2 + * Allow EACH_QUORUM for reads (CASSANDRA-9602) * Fix potential ClassCastException while upgrading (CASSANDRA-10468) * Fix NPE in MVs on update (CASSANDRA-10503) * Only include modified cell data in indexing deltas (CASSANDRA-10438) http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc2478d4/NEWS.txt ---------------------------------------------------------------------- diff --git a/NEWS.txt b/NEWS.txt index c36481e..1176dcd 100644 --- a/NEWS.txt +++ b/NEWS.txt @@ -18,6 +18,7 @@ using the provided 'sstableupgrade' tool. New features ------------ + - EACH_QUORUM is now a supported consistency level for read requests. - Support for IN restrictions on any partition key component or clustering key as well as support for EQ and IN multicolumn restrictions has been added to UPDATE and DELETE statement. http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc2478d4/src/java/org/apache/cassandra/db/ConsistencyLevel.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ConsistencyLevel.java b/src/java/org/apache/cassandra/db/ConsistencyLevel.java index 85ec0f3..c6655dc 100644 --- a/src/java/org/apache/cassandra/db/ConsistencyLevel.java +++ b/src/java/org/apache/cassandra/db/ConsistencyLevel.java @@ -182,6 +182,11 @@ public enum ConsistencyLevel public List<InetAddress> filterForQuery(Keyspace keyspace, List<InetAddress> liveEndpoints, ReadRepairDecision readRepair) { + // If we are doing an each quorum, we have to make sure that the endpoints we select provide a quorum for each + // data center + if (this == EACH_QUORUM) + return filterForEachQuorum(keyspace, liveEndpoints, readRepair); + /* * Endpoints are expected to be restricted to live replicas, sorted by snitch preference. * For LOCAL_QUORUM, move local-DC replicas in front first as we need them there whether @@ -217,6 +222,37 @@ public enum ConsistencyLevel } } + private List<InetAddress> filterForEachQuorum(Keyspace keyspace, List<InetAddress> liveEndpoints, ReadRepairDecision readRepair) + { + NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) keyspace.getReplicationStrategy(); + + // quickly drop out if read repair is GLOBAL, since we just use all of the live endpoints + if (readRepair == ReadRepairDecision.GLOBAL) + return liveEndpoints; + + Map<String, List<InetAddress>> dcsEndpoints = new HashMap<>(); + for (String dc: strategy.getDatacenters()) + dcsEndpoints.put(dc, new ArrayList<>()); + + for (InetAddress add : liveEndpoints) + { + String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(add); + dcsEndpoints.get(dc).add(add); + } + + List<InetAddress> waitSet = new ArrayList<>(); + for (Map.Entry<String, List<InetAddress>> dcEndpoints : dcsEndpoints.entrySet()) + { + List<InetAddress> dcEndpoint = dcEndpoints.getValue(); + if (readRepair == ReadRepairDecision.DC_LOCAL && dcEndpoints.getKey().equals(DatabaseDescriptor.getLocalDataCenter())) + waitSet.addAll(dcEndpoint); + else + waitSet.addAll(dcEndpoint.subList(0, Math.min(localQuorumFor(keyspace, dcEndpoints.getKey()), dcEndpoint.size()))); + } + + return waitSet; + } + public boolean isSufficientLiveNodes(Keyspace keyspace, Iterable<InetAddress> liveEndpoints) { switch (this) @@ -282,7 +318,7 @@ public enum ConsistencyLevel int dcBlockFor = localQuorumFor(keyspace, entry.getKey()); int dcLive = entry.getValue(); if (dcLive < dcBlockFor) - throw new UnavailableException(this, dcBlockFor, dcLive); + throw new UnavailableException(this, entry.getKey(), dcBlockFor, dcLive); } break; } @@ -304,8 +340,6 @@ public enum ConsistencyLevel { case ANY: throw new InvalidRequestException("ANY ConsistencyLevel is only supported for writes"); - case EACH_QUORUM: - throw new InvalidRequestException("EACH_QUORUM ConsistencyLevel is only supported for writes"); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc2478d4/src/java/org/apache/cassandra/exceptions/UnavailableException.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/exceptions/UnavailableException.java b/src/java/org/apache/cassandra/exceptions/UnavailableException.java index baee0b2..7b4edd8 100644 --- a/src/java/org/apache/cassandra/exceptions/UnavailableException.java +++ b/src/java/org/apache/cassandra/exceptions/UnavailableException.java @@ -30,6 +30,11 @@ public class UnavailableException extends RequestExecutionException this("Cannot achieve consistency level " + consistency, consistency, required, alive); } + public UnavailableException(ConsistencyLevel consistency, String dc, int required, int alive) + { + this("Cannot achieve consistency level " + consistency + " in DC " + dc, consistency, required, alive); + } + public UnavailableException(String msg, ConsistencyLevel consistency, int required, int alive) { super(ExceptionCode.UNAVAILABLE, msg);