Author: brandonwilliams
Date: Tue Feb  1 16:24:56 2011
New Revision: 1066082

URL: http://svn.apache.org/viewvc?rev=1066082&view=rev
Log:
Add ability to list hosts for wh ich hints are stored, get a total hint count,
and delete hints for a given host via JMX.
Patch by Jon Hermes, reviewed by brandonwilliams for CASSANDRA-1551

Added:
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManagerMBean.java
Modified:
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=1066082&r1=1066081&r2=1066082&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java
 Tue Feb  1 16:24:56 2011
@@ -19,40 +19,43 @@
 package org.apache.cassandra.db;
 
 import java.io.IOException;
+import java.lang.management.ManagementFactory;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Collection;
+import java.util.*;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeoutException;
 
-import org.apache.cassandra.utils.ByteBufferUtil;
+import static com.google.common.base.Charsets.UTF_8;
 import org.apache.commons.lang.ArrayUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
 import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.service.DigestMismatchException;
-import org.apache.cassandra.service.IWriteResponseHandler;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.service.WriteResponseHandler;
-import org.apache.cassandra.thrift.InvalidRequestException;
+import org.apache.cassandra.service.*;
+import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.WrappedRunnable;
 import org.cliffc.high_scale_lib.NonBlockingHashSet;
 
-import static com.google.common.base.Charsets.UTF_8;
 
 
 /**
  * For each endpoint for which we have hints, there is a row in the system 
hints CF.
+ * The key for this row is ByteBuffer.wrap(string), i.e. "127.0.0.1".
+ *
  * SuperColumns in that row are keys for which we have hinted data.
  * Subcolumns names within that supercolumn are keyspace+CF, concatenated with 
SEPARATOR.
  * Subcolumn values are always empty; instead, we store the row data "normally"
@@ -78,19 +81,37 @@ import static com.google.common.base.Cha
  * that would contain the message bytes.
  */
 
-public class HintedHandOffManager
+public class HintedHandOffManager implements HintedHandOffManagerMBean
 {
     public static final HintedHandOffManager instance = new 
HintedHandOffManager();
+    public static final String HINTS_CF = "HintsColumnFamily";
 
     private static final Logger logger_ = 
LoggerFactory.getLogger(HintedHandOffManager.class);
-    public static final String HINTS_CF = "HintsColumnFamily";
     private static final int PAGE_SIZE = 10000;
     private static final String SEPARATOR = "-";
+    private static final int LARGE_NUMBER = 65536; // 64k nodes ought to be 
enough for anybody.
 
     private final NonBlockingHashSet<InetAddress> queuedDeliveries = new 
NonBlockingHashSet<InetAddress>();
 
     private final ExecutorService executor_ = new 
JMXEnabledThreadPoolExecutor("HintedHandoff", 
DatabaseDescriptor.getCompactionThreadPriority());
 
+    public HintedHandOffManager()
+    {
+        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+        try
+        {
+            mbs.registerMBean(this, new 
ObjectName("org.apache.cassandra.db:type=HintedHandoffManager"));
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+    public void registerMBean()
+    {
+        logger_.debug("Created HHOM instance, registered MBean.");
+    }
+
     private static boolean sendMessage(InetAddress endpoint, String tableName, 
String cfName, ByteBuffer key) throws IOException
     {
         if (!Gossiper.instance.isKnownEndpoint(endpoint))
@@ -142,12 +163,28 @@ public class HintedHandOffManager
         RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, endpointAddress);
         rm.delete(new QueryPath(HINTS_CF, key, tableCF), timestamp);
         rm.apply();
-    }                                                         
+    }
+
+    public void deleteHintsForEndpoint(final String ipOrHostname)
+    {
+        try
+        {
+            InetAddress endpoint = InetAddress.getByName(ipOrHostname);
+            deleteHintsForEndpoint(endpoint);
+        }
+        catch (UnknownHostException e)
+        {
+            logger_.warn("Unable to find "+ipOrHostname+", not a hostname or 
ipaddr of a node?:");
+            e.printStackTrace();
+            throw new RuntimeException(e);
+        }
+    }
 
-    public static void deleteHintsForEndPoint(final InetAddress endpoint)
+    public void deleteHintsForEndpoint(final InetAddress endpoint)
     {
+        final String ipaddr = endpoint.getHostAddress();
         final ColumnFamilyStore hintStore = 
Table.open(Table.SYSTEM_TABLE).getColumnFamilyStore(HINTS_CF);
-        final RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, 
ByteBuffer.wrap(endpoint.getAddress()));
+        final RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, 
ByteBuffer.wrap(ipaddr.getBytes()));
         rm.delete(new QueryPath(HINTS_CF), System.currentTimeMillis());
 
         // execute asynchronously to avoid blocking caller (which may be 
processing gossip)
@@ -157,14 +194,14 @@ public class HintedHandOffManager
             {
                 try
                 {
-                    logger_.info("Deleting any stored hints for " + endpoint);
+                    logger_.info("Deleting any stored hints for " + ipaddr);
                     rm.apply();
                     hintStore.forceFlush();
                     CompactionManager.instance.submitMajor(hintStore, 0, 
Integer.MAX_VALUE);
                 }
                 catch (Exception e)
                 {
-                    logger_.warn("Could not delete hints for " + endpoint + ": 
" + e);
+                    logger_.warn("Could not delete hints for " + ipaddr + ": " 
+ e);
                 }
             }
         };
@@ -315,4 +352,62 @@ public class HintedHandOffManager
     {
         deliverHints(InetAddress.getByName(to));
     }
+
+    public List<String> listEndpointsPendingHints()
+    {
+        List<Row> rows = getHintsSlice(1);
+
+        // Extract the keys as strings to be reported.
+        LinkedList<String> result = new LinkedList<String>();
+        for (Row r : rows)
+        {
+            if (r.cf != null) //ignore removed rows
+                result.addFirst(new String(r.key.key.array()));
+        }
+        return result;
+    }
+
+    public Map<String, Integer> countPendingHints()
+    {
+        List<Row> rows = getHintsSlice(Integer.MAX_VALUE);
+
+        Map<String, Integer> result = new HashMap<String, Integer>();
+        for (Row r : rows)
+        {
+            if (r.cf != null) //ignore removed rows
+                result.put(new String(r.key.key.array()), 
r.cf.getColumnCount());
+        }
+        return result;
+    }
+
+    private List<Row> getHintsSlice(int column_count)
+    {
+        // ColumnParent for HintsCF...
+        ColumnParent parent = new ColumnParent(HINTS_CF);
+
+        // Get count # of columns...
+        SlicePredicate predicate = new SlicePredicate();
+        SliceRange sliceRange = new SliceRange();
+        sliceRange.setStart(new byte[0]).setFinish(new byte[0]);
+        sliceRange.setCount(column_count);
+        predicate.setSlice_range(sliceRange);
+
+        // From keys "" to ""...
+        IPartitioner partitioner = StorageService.getPartitioner();
+        ByteBuffer empty = ByteBufferUtil.EMPTY_BYTE_BUFFER;
+        Range range = new Range(partitioner.getToken(empty), 
partitioner.getToken(empty));
+
+        // Get a bunch of rows!
+        List<Row> rows;
+        try
+        {
+            rows = StorageProxy.getRangeSlice(new RangeSliceCommand("system", 
parent, predicate, range, LARGE_NUMBER), ConsistencyLevel.ONE);
+        }
+        catch (Exception e)
+        {
+            logger_.info("HintsCF getEPPendingHints timed out.");
+            throw new RuntimeException(e);
+        }
+        return rows;
+    }
 }

Added: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManagerMBean.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManagerMBean.java?rev=1066082&view=auto
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManagerMBean.java
 (added)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManagerMBean.java
 Tue Feb  1 16:24:56 2011
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.util.List;
+import java.util.Map;
+
+public interface HintedHandOffManagerMBean
+{
+    /**
+     * Nuke all hints from this node to `ep`.
+     * @param epaddr String rep. of endpoint address to delete hints for, 
either ip address ("127.0.0.1") or hostname
+     */
+    public void deleteHintsForEndpoint(final String epaddr);
+
+    /**
+     * List all the endpoints that this node has hints for.
+     * @return set of endpoints; as Strings
+     */
+    public List<String> listEndpointsPendingHints();
+
+    /**
+     * List all the endpoints that this node has hints for, and
+     *  count the number of hints for each such endpoint.
+     *
+     * @return map of endpoint -> hint count
+     */
+    public Map<String, Integer> countPendingHints();
+}
+

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java?rev=1066082&r1=1066081&r2=1066082&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java
 Tue Feb  1 16:24:56 2011
@@ -422,6 +422,8 @@ public class StorageService implements I
         StorageLoadBalancer.instance.startBroadcasting();
         MigrationManager.announce(DatabaseDescriptor.getDefsVersion(), 
DatabaseDescriptor.getSeeds());
 
+        HintedHandOffManager.instance.registerMBean();
+
         if (DatabaseDescriptor.isAutoBootstrap()
                 && 
DatabaseDescriptor.getSeeds().contains(FBUtilities.getLocalAddress())
                 && !SystemTable.isBootstrapped())
@@ -828,7 +830,7 @@ public class StorageService implements I
     {
         Gossiper.instance.removeEndpoint(endpoint);
         tokenMetadata_.removeEndpoint(endpoint);
-        HintedHandOffManager.deleteHintsForEndPoint(endpoint);
+        HintedHandOffManager.instance.deleteHintsForEndpoint(endpoint);
         tokenMetadata_.removeBootstrapToken(token);
         calculatePendingRanges();
         if (!isClientMode)


Reply via email to