Author: brandonwilliams Date: Tue Feb 1 16:24:56 2011 New Revision: 1066082
URL: http://svn.apache.org/viewvc?rev=1066082&view=rev Log: Add ability to list hosts for wh ich hints are stored, get a total hint count, and delete hints for a given host via JMX. Patch by Jon Hermes, reviewed by brandonwilliams for CASSANDRA-1551 Added: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManagerMBean.java Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=1066082&r1=1066081&r2=1066082&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java Tue Feb 1 16:24:56 2011 @@ -19,40 +19,43 @@ package org.apache.cassandra.db; import java.io.IOException; +import java.lang.management.ManagementFactory; import java.net.InetAddress; import java.net.UnknownHostException; import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.Collection; +import java.util.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeoutException; -import org.apache.cassandra.utils.ByteBufferUtil; +import static com.google.common.base.Charsets.UTF_8; import org.apache.commons.lang.ArrayUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.management.MBeanServer; +import javax.management.ObjectName; import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.filter.QueryFilter; import org.apache.cassandra.db.filter.QueryPath; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Range; import org.apache.cassandra.gms.FailureDetector; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.service.DigestMismatchException; -import org.apache.cassandra.service.IWriteResponseHandler; -import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.service.WriteResponseHandler; -import org.apache.cassandra.thrift.InvalidRequestException; +import org.apache.cassandra.service.*; +import org.apache.cassandra.thrift.*; +import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.WrappedRunnable; import org.cliffc.high_scale_lib.NonBlockingHashSet; -import static com.google.common.base.Charsets.UTF_8; /** * For each endpoint for which we have hints, there is a row in the system hints CF. + * The key for this row is ByteBuffer.wrap(string), i.e. "127.0.0.1". + * * SuperColumns in that row are keys for which we have hinted data. * Subcolumns names within that supercolumn are keyspace+CF, concatenated with SEPARATOR. * Subcolumn values are always empty; instead, we store the row data "normally" @@ -78,19 +81,37 @@ import static com.google.common.base.Cha * that would contain the message bytes. */ -public class HintedHandOffManager +public class HintedHandOffManager implements HintedHandOffManagerMBean { public static final HintedHandOffManager instance = new HintedHandOffManager(); + public static final String HINTS_CF = "HintsColumnFamily"; private static final Logger logger_ = LoggerFactory.getLogger(HintedHandOffManager.class); - public static final String HINTS_CF = "HintsColumnFamily"; private static final int PAGE_SIZE = 10000; private static final String SEPARATOR = "-"; + private static final int LARGE_NUMBER = 65536; // 64k nodes ought to be enough for anybody. private final NonBlockingHashSet<InetAddress> queuedDeliveries = new NonBlockingHashSet<InetAddress>(); private final ExecutorService executor_ = new JMXEnabledThreadPoolExecutor("HintedHandoff", DatabaseDescriptor.getCompactionThreadPriority()); + public HintedHandOffManager() + { + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + try + { + mbs.registerMBean(this, new ObjectName("org.apache.cassandra.db:type=HintedHandoffManager")); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } + public void registerMBean() + { + logger_.debug("Created HHOM instance, registered MBean."); + } + private static boolean sendMessage(InetAddress endpoint, String tableName, String cfName, ByteBuffer key) throws IOException { if (!Gossiper.instance.isKnownEndpoint(endpoint)) @@ -142,12 +163,28 @@ public class HintedHandOffManager RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, endpointAddress); rm.delete(new QueryPath(HINTS_CF, key, tableCF), timestamp); rm.apply(); - } + } + + public void deleteHintsForEndpoint(final String ipOrHostname) + { + try + { + InetAddress endpoint = InetAddress.getByName(ipOrHostname); + deleteHintsForEndpoint(endpoint); + } + catch (UnknownHostException e) + { + logger_.warn("Unable to find "+ipOrHostname+", not a hostname or ipaddr of a node?:"); + e.printStackTrace(); + throw new RuntimeException(e); + } + } - public static void deleteHintsForEndPoint(final InetAddress endpoint) + public void deleteHintsForEndpoint(final InetAddress endpoint) { + final String ipaddr = endpoint.getHostAddress(); final ColumnFamilyStore hintStore = Table.open(Table.SYSTEM_TABLE).getColumnFamilyStore(HINTS_CF); - final RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, ByteBuffer.wrap(endpoint.getAddress())); + final RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, ByteBuffer.wrap(ipaddr.getBytes())); rm.delete(new QueryPath(HINTS_CF), System.currentTimeMillis()); // execute asynchronously to avoid blocking caller (which may be processing gossip) @@ -157,14 +194,14 @@ public class HintedHandOffManager { try { - logger_.info("Deleting any stored hints for " + endpoint); + logger_.info("Deleting any stored hints for " + ipaddr); rm.apply(); hintStore.forceFlush(); CompactionManager.instance.submitMajor(hintStore, 0, Integer.MAX_VALUE); } catch (Exception e) { - logger_.warn("Could not delete hints for " + endpoint + ": " + e); + logger_.warn("Could not delete hints for " + ipaddr + ": " + e); } } }; @@ -315,4 +352,62 @@ public class HintedHandOffManager { deliverHints(InetAddress.getByName(to)); } + + public List<String> listEndpointsPendingHints() + { + List<Row> rows = getHintsSlice(1); + + // Extract the keys as strings to be reported. + LinkedList<String> result = new LinkedList<String>(); + for (Row r : rows) + { + if (r.cf != null) //ignore removed rows + result.addFirst(new String(r.key.key.array())); + } + return result; + } + + public Map<String, Integer> countPendingHints() + { + List<Row> rows = getHintsSlice(Integer.MAX_VALUE); + + Map<String, Integer> result = new HashMap<String, Integer>(); + for (Row r : rows) + { + if (r.cf != null) //ignore removed rows + result.put(new String(r.key.key.array()), r.cf.getColumnCount()); + } + return result; + } + + private List<Row> getHintsSlice(int column_count) + { + // ColumnParent for HintsCF... + ColumnParent parent = new ColumnParent(HINTS_CF); + + // Get count # of columns... + SlicePredicate predicate = new SlicePredicate(); + SliceRange sliceRange = new SliceRange(); + sliceRange.setStart(new byte[0]).setFinish(new byte[0]); + sliceRange.setCount(column_count); + predicate.setSlice_range(sliceRange); + + // From keys "" to ""... + IPartitioner partitioner = StorageService.getPartitioner(); + ByteBuffer empty = ByteBufferUtil.EMPTY_BYTE_BUFFER; + Range range = new Range(partitioner.getToken(empty), partitioner.getToken(empty)); + + // Get a bunch of rows! + List<Row> rows; + try + { + rows = StorageProxy.getRangeSlice(new RangeSliceCommand("system", parent, predicate, range, LARGE_NUMBER), ConsistencyLevel.ONE); + } + catch (Exception e) + { + logger_.info("HintsCF getEPPendingHints timed out."); + throw new RuntimeException(e); + } + return rows; + } } Added: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManagerMBean.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManagerMBean.java?rev=1066082&view=auto ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManagerMBean.java (added) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManagerMBean.java Tue Feb 1 16:24:56 2011 @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db; + +import java.util.List; +import java.util.Map; + +public interface HintedHandOffManagerMBean +{ + /** + * Nuke all hints from this node to `ep`. + * @param epaddr String rep. of endpoint address to delete hints for, either ip address ("127.0.0.1") or hostname + */ + public void deleteHintsForEndpoint(final String epaddr); + + /** + * List all the endpoints that this node has hints for. + * @return set of endpoints; as Strings + */ + public List<String> listEndpointsPendingHints(); + + /** + * List all the endpoints that this node has hints for, and + * count the number of hints for each such endpoint. + * + * @return map of endpoint -> hint count + */ + public Map<String, Integer> countPendingHints(); +} + Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java?rev=1066082&r1=1066081&r2=1066082&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java Tue Feb 1 16:24:56 2011 @@ -422,6 +422,8 @@ public class StorageService implements I StorageLoadBalancer.instance.startBroadcasting(); MigrationManager.announce(DatabaseDescriptor.getDefsVersion(), DatabaseDescriptor.getSeeds()); + HintedHandOffManager.instance.registerMBean(); + if (DatabaseDescriptor.isAutoBootstrap() && DatabaseDescriptor.getSeeds().contains(FBUtilities.getLocalAddress()) && !SystemTable.isBootstrapped()) @@ -828,7 +830,7 @@ public class StorageService implements I { Gossiper.instance.removeEndpoint(endpoint); tokenMetadata_.removeEndpoint(endpoint); - HintedHandOffManager.deleteHintsForEndPoint(endpoint); + HintedHandOffManager.instance.deleteHintsForEndpoint(endpoint); tokenMetadata_.removeBootstrapToken(token); calculatePendingRanges(); if (!isClientMode)