Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.0 f21c88851 -> cc2478d4c


Allow EACH_QUORUM for reads

patch by Carl Yeksigian; revieiwed by Ariel Weisberg for CASSANDRA-9602


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/cc2478d4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/cc2478d4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/cc2478d4

Branch: refs/heads/cassandra-3.0
Commit: cc2478d4ce5d0cabb150489ebec5cda1442bc174
Parents: f21c888
Author: Carl Yeksigian <c...@apache.org>
Authored: Tue Oct 6 11:05:04 2015 -0400
Committer: Aleksey Yeschenko <alek...@apache.org>
Committed: Wed Oct 14 16:16:58 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 NEWS.txt                                        |  1 +
 .../apache/cassandra/db/ConsistencyLevel.java   | 40 ++++++++++++++++++--
 .../exceptions/UnavailableException.java        |  5 +++
 4 files changed, 44 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc2478d4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1a43cfc..6b7b4eb 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0-rc2
+ * Allow EACH_QUORUM for reads (CASSANDRA-9602)
  * Fix potential ClassCastException while upgrading (CASSANDRA-10468)
  * Fix NPE in MVs on update (CASSANDRA-10503)
  * Only include modified cell data in indexing deltas (CASSANDRA-10438)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc2478d4/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index c36481e..1176dcd 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -18,6 +18,7 @@ using the provided 'sstableupgrade' tool.
 
 New features
 ------------
+   - EACH_QUORUM is now a supported consistency level for read requests.
    - Support for IN restrictions on any partition key component or clustering 
key
      as well as support for EQ and IN multicolumn restrictions has been added 
to
      UPDATE and DELETE statement.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc2478d4/src/java/org/apache/cassandra/db/ConsistencyLevel.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ConsistencyLevel.java 
b/src/java/org/apache/cassandra/db/ConsistencyLevel.java
index 85ec0f3..c6655dc 100644
--- a/src/java/org/apache/cassandra/db/ConsistencyLevel.java
+++ b/src/java/org/apache/cassandra/db/ConsistencyLevel.java
@@ -182,6 +182,11 @@ public enum ConsistencyLevel
 
     public List<InetAddress> filterForQuery(Keyspace keyspace, 
List<InetAddress> liveEndpoints, ReadRepairDecision readRepair)
     {
+        // If we are doing an each quorum, we have to make sure that the 
endpoints we select provide a quorum for each
+        // data center
+        if (this == EACH_QUORUM)
+            return filterForEachQuorum(keyspace, liveEndpoints, readRepair);
+
         /*
          * Endpoints are expected to be restricted to live replicas, sorted by 
snitch preference.
          * For LOCAL_QUORUM, move local-DC replicas in front first as we need 
them there whether
@@ -217,6 +222,37 @@ public enum ConsistencyLevel
         }
     }
 
+    private List<InetAddress> filterForEachQuorum(Keyspace keyspace, 
List<InetAddress> liveEndpoints, ReadRepairDecision readRepair)
+    {
+        NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) 
keyspace.getReplicationStrategy();
+
+        // quickly drop out if read repair is GLOBAL, since we just use all of 
the live endpoints
+        if (readRepair == ReadRepairDecision.GLOBAL)
+            return liveEndpoints;
+
+        Map<String, List<InetAddress>> dcsEndpoints = new HashMap<>();
+        for (String dc: strategy.getDatacenters())
+            dcsEndpoints.put(dc, new ArrayList<>());
+
+        for (InetAddress add : liveEndpoints)
+        {
+            String dc = 
DatabaseDescriptor.getEndpointSnitch().getDatacenter(add);
+            dcsEndpoints.get(dc).add(add);
+        }
+
+        List<InetAddress> waitSet = new ArrayList<>();
+        for (Map.Entry<String, List<InetAddress>> dcEndpoints : 
dcsEndpoints.entrySet())
+        {
+            List<InetAddress> dcEndpoint = dcEndpoints.getValue();
+            if (readRepair == ReadRepairDecision.DC_LOCAL && 
dcEndpoints.getKey().equals(DatabaseDescriptor.getLocalDataCenter()))
+                waitSet.addAll(dcEndpoint);
+            else
+                waitSet.addAll(dcEndpoint.subList(0, 
Math.min(localQuorumFor(keyspace, dcEndpoints.getKey()), dcEndpoint.size())));
+        }
+
+        return waitSet;
+    }
+
     public boolean isSufficientLiveNodes(Keyspace keyspace, 
Iterable<InetAddress> liveEndpoints)
     {
         switch (this)
@@ -282,7 +318,7 @@ public enum ConsistencyLevel
                         int dcBlockFor = localQuorumFor(keyspace, 
entry.getKey());
                         int dcLive = entry.getValue();
                         if (dcLive < dcBlockFor)
-                            throw new UnavailableException(this, dcBlockFor, 
dcLive);
+                            throw new UnavailableException(this, 
entry.getKey(), dcBlockFor, dcLive);
                     }
                     break;
                 }
@@ -304,8 +340,6 @@ public enum ConsistencyLevel
         {
             case ANY:
                 throw new InvalidRequestException("ANY ConsistencyLevel is 
only supported for writes");
-            case EACH_QUORUM:
-                throw new InvalidRequestException("EACH_QUORUM 
ConsistencyLevel is only supported for writes");
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc2478d4/src/java/org/apache/cassandra/exceptions/UnavailableException.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/exceptions/UnavailableException.java 
b/src/java/org/apache/cassandra/exceptions/UnavailableException.java
index baee0b2..7b4edd8 100644
--- a/src/java/org/apache/cassandra/exceptions/UnavailableException.java
+++ b/src/java/org/apache/cassandra/exceptions/UnavailableException.java
@@ -30,6 +30,11 @@ public class UnavailableException extends 
RequestExecutionException
         this("Cannot achieve consistency level " + consistency, consistency, 
required, alive);
     }
 
+    public UnavailableException(ConsistencyLevel consistency, String dc, int 
required, int alive)
+    {
+        this("Cannot achieve consistency level " + consistency + " in DC " + 
dc, consistency, required, alive);
+    }
+
     public UnavailableException(String msg, ConsistencyLevel consistency, int 
required, int alive)
     {
         super(ExceptionCode.UNAVAILABLE, msg);

Reply via email to