Show Effective Owership
patch by Vijay; reviewed by Brandon Williams for CASSANDRA-3412


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0dea8dc5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0dea8dc5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0dea8dc5

Branch: refs/heads/trunk
Commit: 0dea8dc56b170e8ad0d2b69a74a2f7b3e2284770
Parents: ddee43e
Author: Vijay Parthasarathy <vijay2...@gmail.com>
Authored: Fri Feb 17 14:56:37 2012 -0800
Committer: Vijay Parthasarathy <vijay2...@gmail.com>
Committed: Fri Feb 17 14:56:37 2012 -0800

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../apache/cassandra/service/StorageService.java   |   65 +++++++++++++++
 .../cassandra/service/StorageServiceMBean.java     |    9 ++
 src/java/org/apache/cassandra/tools/NodeCmd.java   |   31 +++++--
 src/java/org/apache/cassandra/tools/NodeProbe.java |    7 ++-
 5 files changed, 104 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dea8dc5/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e3207ea..8303f10 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -77,6 +77,7 @@
  * Avoid NPE on aboarted stream-out sessions (CASSANDRA-3904)
  * BulkRecordWriter throws NPE for counter columns (CASSANDRA-3906)
  * Support compression using BulkWriter (CASSANDRA-3907)
+ * Show Effective Owership via Nodetool ring <keyspace> (CASSANDRA-3412)
 
 1.0.8
  * fix race between cleanup and flush on secondary index CFSes (CASSANDRA-3712)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dea8dc5/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index ed33b44..93d398a 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -26,12 +26,15 @@ import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.util.*;
+import java.util.Map.Entry;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
+import com.google.common.base.Supplier;
 import com.google.common.collect.*;
+
 import org.apache.log4j.Level;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
@@ -935,6 +938,25 @@ public class StorageService implements 
IEndpointStateChangeSubscriber, StorageSe
         return rangeToEndpointMap;
     }
 
+    private Map<InetAddress, Collection<Range<Token>>> 
constructEndpointToRangeMap(String keyspace)
+    {
+        Multimap<InetAddress, Range<Token>> endpointToRangeMap = 
Multimaps.newListMultimap(new HashMap<InetAddress, Collection<Range<Token>>>(), 
new Supplier<List<Range<Token>>>()
+        {
+            public List<Range<Token>> get()
+            {
+                return Lists.newArrayList();
+            }
+        });
+        
+        List<Range<Token>> ranges = 
getAllRanges(tokenMetadata_.sortedTokens());
+        for (Range<Token> range : ranges)
+        {
+            for (InetAddress endpoint : 
Table.open(keyspace).getReplicationStrategy().getNaturalEndpoints(range.left))
+                endpointToRangeMap.put(endpoint, range);
+        }
+        return endpointToRangeMap.asMap();
+    }
+
     /*
      * Handle the reception of a new particular ApplicationState for a 
particular endpoint. Note that the value of the
      * ApplicationState has not necessarily "changed" since the last known 
value, if we already received the same update
@@ -2614,6 +2636,49 @@ public class StorageService implements 
IEndpointStateChangeSubscriber, StorageSe
         return string_map;
     }
 
+    public Map<String, Float> effectiveOwnership(String keyspace) throws 
ConfigurationException
+    {   
+        Map<String, Float> effective = Maps.newHashMap();
+        if (Schema.instance.getNonSystemTables().size() <= 0)
+            throw new ConfigurationException("Couldn't find any Non System 
Keyspaces to infer replication topology");
+        if (keyspace == null && 
!hasSameReplication(Schema.instance.getNonSystemTables()))
+            throw new ConfigurationException("Non System keyspaces doesnt have 
the same topology");
+        
+        if (keyspace == null)
+            keyspace = Schema.instance.getNonSystemTables().get(0);
+        
+        List<Token> sortedTokens = new 
ArrayList<Token>(tokenMetadata_.getTokenToEndpointMapForReading().keySet());
+        Collections.sort(sortedTokens);
+        Map<Token, Float> ownership = 
getPartitioner().describeOwnership(sortedTokens);
+        
+        for (Entry<InetAddress, Collection<Range<Token>>> ranges : 
constructEndpointToRangeMap(keyspace).entrySet())
+        {
+            Token token = tokenMetadata_.getToken(ranges.getKey());
+            for (Range<Token> range: ranges.getValue())
+            {
+                float value = effective.get(token.toString()) == null ? 0.0F : 
effective.get(token.toString());
+                effective.put(token.toString(), value + 
ownership.get(range.left));
+            }
+        }
+        return effective;
+    }
+    
+    private boolean hasSameReplication(List<String> list)
+    {
+        if (list.isEmpty())
+            return false;
+        for (int i = 0; i < list.size() -1; i++)
+        {
+            KSMetaData ksm1 = 
Schema.instance.getKSMetaData(Schema.instance.getNonSystemTables().get(i));
+            KSMetaData ksm2 = 
Schema.instance.getKSMetaData(Schema.instance.getNonSystemTables().get(i + 1));
+            if (!ksm1.strategyClass.equals(ksm2.strategyClass) || 
+                    
!Iterators.elementsEqual(ksm1.strategyOptions.entrySet().iterator(), 
+                                             
ksm2.strategyOptions.entrySet().iterator()))
+                return false;
+        }
+        return true;
+    }
+
     public List<String> getKeyspaces()
     {
         List<String> tableslist = new 
ArrayList<String>(Schema.instance.getTables());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dea8dc5/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java 
b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index c5aa9fd..69b4969 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -299,6 +299,15 @@ public interface StorageServiceMBean
      */
     public Map<String, Float> getOwnership();
 
+    /**
+     * Effective ownership is % of the data each node owns given the keyspace 
+     * we calculate the percentage using replication factor.
+     * If Keyspace == null, this method will try to verify if all the keyspaces
+     * in the cluster have the same replication strategies and if yes then we 
will
+     * use the first else a empty Map is returned.
+     */
+    public Map<String, Float> effectiveOwnership(String keyspace) throws 
ConfigurationException;
+
     public List<String> getKeyspaces();
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dea8dc5/src/java/org/apache/cassandra/tools/NodeCmd.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeCmd.java 
b/src/java/org/apache/cassandra/tools/NodeCmd.java
index e5ed0b7..c216560 100644
--- a/src/java/org/apache/cassandra/tools/NodeCmd.java
+++ b/src/java/org/apache/cassandra/tools/NodeCmd.java
@@ -205,7 +205,7 @@ public class NodeCmd
      * 
      * @param outs the stream to write to
      */
-    public void printRing(PrintStream outs)
+    public void printRing(PrintStream outs, String keyspace)
     {
         Map<String, String> tokenToEndpoint = probe.getTokenToEndpointMap();
         List<String> sortedTokens = new 
ArrayList<String>(tokenToEndpoint.keySet());
@@ -217,16 +217,27 @@ public class NodeCmd
         Collection<String> movingNodes = probe.getMovingNodes();
         Map<String, String> loadMap = probe.getLoadMap();
 
-        String format = "%-16s%-12s%-12s%-7s%-8s%-16s%-8s%-44s%n";
-        outs.printf(format, "Address", "DC", "Rack", "Status", "State", 
"Load", "Owns", "Token");
+        String format = "%-16s%-12s%-12s%-7s%-8s%-16s%-20s%-44s%n";
+        
+        // Calculate per-token ownership of the ring
+        Map<String, Float> ownerships;
+        try
+        {
+            ownerships = probe.effectiveOwnership(keyspace);
+            outs.printf(format, "Address", "DC", "Rack", "Status", "State", 
"Load", "Effective-Owership", "Token");
+        }
+        catch (ConfigurationException ex)
+        {
+            ownerships = probe.getOwnership();
+            outs.printf("Note: Ownership information does not include 
topology, please specify a keyspace. \n");
+            outs.printf(format, "Address", "DC", "Rack", "Status", "State", 
"Load", "Owns", "Token");
+        }
+        
         // show pre-wrap token twice so you can always read a node's range as
         // (previous line token, current line token]
         if (sortedTokens.size() > 1)
             outs.printf(format, "", "", "", "", "", "", "", 
sortedTokens.get(sortedTokens.size() - 1));
 
-        // Calculate per-token ownership of the ring
-        Map<String, Float> ownerships = probe.getOwnership();
-
         for (String token : sortedTokens)
         {
             String primaryEndpoint = tokenToEndpoint.get(token);
@@ -266,7 +277,7 @@ public class NodeCmd
             String load = loadMap.containsKey(primaryEndpoint)
                           ? loadMap.get(primaryEndpoint)
                           : "?";
-            String owns = new 
DecimalFormat("##0.00%").format(ownerships.get(token));
+            String owns = new 
DecimalFormat("##0.00%").format(ownerships.get(token) == null ? 0.0F : 
ownerships.get(token));
             outs.printf(format, primaryEndpoint, dataCenter, rack, status, 
state, load, owns, token);
         }
     }
@@ -657,7 +668,11 @@ public class NodeCmd
 
             switch (command)
             {
-                case RING            : nodeCmd.printRing(System.out); break;
+                case RING : 
+                    if (arguments.length > 0) { nodeCmd.printRing(System.out, 
arguments[0]); }
+                    else                      { nodeCmd.printRing(System.out, 
null); };
+                    break;
+
                 case INFO            : nodeCmd.printInfo(System.out); break;
                 case CFSTATS         : 
nodeCmd.printColumnFamilyStats(System.out); break;
                 case DECOMMISSION    : probe.decommission(); break;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dea8dc5/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java 
b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 8739745..dca07bf 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -266,7 +266,12 @@ public class NodeProbe
     {
         return ssProxy.getOwnership();
     }
-
+    
+    public Map<String, Float> effectiveOwnership(String keyspace) throws 
ConfigurationException
+    {
+        return ssProxy.effectiveOwnership(keyspace);
+    }
+    
     public CacheServiceMBean getCacheServiceMBean()
     {
         String cachePath = "org.apache.cassandra.db:type=Caches";

Reply via email to