Author: jbellis Date: Mon Dec 27 16:48:16 2010 New Revision: 1053105 URL: http://svn.apache.org/viewvc?rev=1053105&view=rev Log: convert ConsistencyChecker to use an executor as in 0.7 to ease merging of CASSANDRA-1905 patch by jbellis
Removed: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/cache/ICacheExpungeHook.java Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/ExpiringMap.java Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java?rev=1053105&r1=1053104&r2=1053105&view=diff ============================================================================== --- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java (original) +++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java Mon Dec 27 16:48:16 2010 @@ -26,12 +26,15 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; -import org.apache.log4j.Logger; import org.apache.commons.lang.ArrayUtils; import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import org.apache.cassandra.cache.ICacheExpungeHook; import org.apache.cassandra.concurrent.StageManager; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamily; @@ -41,7 +44,7 @@ import org.apache.cassandra.db.Row; import org.apache.cassandra.net.IAsyncCallback; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.utils.ExpiringMap; +import org.apache.cassandra.utils.WrappedRunnable; /** * ConsistencyChecker does the following: @@ -59,8 +62,9 @@ import org.apache.cassandra.utils.Expiri */ class ConsistencyChecker implements Runnable { - private static Logger logger_ = Logger.getLogger(ConsistencyChecker.class); - private static ExpiringMap<String, String> readRepairTable_ = new ExpiringMap<String, String>(DatabaseDescriptor.getRpcTimeout()); + private static Logger logger_ = LoggerFactory.getLogger(ConsistencyChecker.class); + + private static ScheduledExecutorService executor_ = new ScheduledThreadPoolExecutor(1); // TODO add JMX private final Row row_; protected final List<InetAddress> replicas_; @@ -126,7 +130,7 @@ class ConsistencyChecker implements Runn ReadCommand readCommand = constructReadMessage(false); Message message = readCommand.makeReadMessage(); if (logger_.isDebugEnabled()) - logger_.debug("Digest mismatch; re-reading " + readCommand_.key + " from " + message.getMessageId() + "@[" + StringUtils.join(replicas_, ", ") + "]"); + logger_.debug("Digest mismatch; re-reading " + readCommand_.key + " from " + message.getMessageId() + "@[" + StringUtils.join(replicas_, ", ") + "]"); MessagingService.instance.addCallback(new DataRepairHandler(), message.getMessageId()); for (InetAddress endpoint : replicas_) { @@ -144,12 +148,12 @@ class ConsistencyChecker implements Runn } } - class DataRepairHandler implements IAsyncCallback, ICacheExpungeHook<String, String> + class DataRepairHandler implements IAsyncCallback { private final Collection<Message> responses_ = new LinkedBlockingQueue<Message>(); private final ReadResponseResolver readResponseResolver_; private final int majority_; - + public DataRepairHandler() throws IOException { readResponseResolver_ = new ReadResponseResolver(readCommand_.table, readCommand_.key, replicas_.size()); @@ -170,20 +174,15 @@ class ConsistencyChecker implements Runn readResponseResolver_.preprocess(message); if (responses_.size() == majority_) { - String messageId = message.getMessageId(); - readRepairTable_.put(messageId, messageId, this); - } - } - - public void callMe(String key, String value) - { - try - { - readResponseResolver_.resolve(responses_); - } - catch (Exception ex) - { - throw new RuntimeException(ex); + Runnable runnable = new WrappedRunnable() + { + public void runMayThrow() throws IOException, DigestMismatchException + { + readResponseResolver_.resolve(responses_); + } + }; + // give remaining replicas until timeout to reply and get added to responses_ + executor_.schedule(runnable, DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS); } } } Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/ExpiringMap.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/ExpiringMap.java?rev=1053105&r1=1053104&r2=1053105&view=diff ============================================================================== --- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/ExpiringMap.java (original) +++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/ExpiringMap.java Mon Dec 27 16:48:16 2010 @@ -18,41 +18,34 @@ package org.apache.cassandra.utils; -import java.util.*; -import java.util.Map.Entry; +import java.util.Enumeration; +import java.util.Set; +import java.util.Timer; +import java.util.TimerTask; -import org.apache.log4j.Logger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import org.apache.cassandra.cache.ICacheExpungeHook; +import org.cliffc.high_scale_lib.NonBlockingHashMap; public class ExpiringMap<K, V> { - private class CacheableObject + private static final Logger logger = LoggerFactory.getLogger(ExpiringMap.class); + + private static class CacheableObject<T> { - private V value_; - public long age; + private final T value; + private final long age; - CacheableObject(V o) + CacheableObject(T o) { - value_ = o; + value = o; age = System.currentTimeMillis(); } - @Override - public boolean equals(Object o) - { - return value_.equals(o); - } - - @Override - public int hashCode() + T getValue() { - return value_.hashCode(); - } - - V getValue() - { - return value_; + return value; } boolean isReadyToDie(long expiration) @@ -63,70 +56,35 @@ public class ExpiringMap<K, V> private class CacheMonitor extends TimerTask { - private long expiration_; + private final long expiration; CacheMonitor(long expiration) { - expiration_ = expiration; + this.expiration = expiration; } @Override public void run() { - Map<K, V> expungedValues = new HashMap<K, V>(); - synchronized (cache_) + synchronized (cache) { - Enumeration<K> e = cache_.keys(); + Enumeration<K> e = cache.keys(); while (e.hasMoreElements()) { K key = e.nextElement(); - CacheableObject co = cache_.get(key); - if (co != null && co.isReadyToDie(expiration_)) + CacheableObject co = cache.get(key); + if (co != null && co.isReadyToDie(expiration)) { - V v = co.getValue(); - if (null != v) - { - expungedValues.put(key, v); - } - cache_.remove(key); + cache.remove(key); } } } - - /* Calling the hooks on the keys that have been expunged */ - for (Entry<K, V> entry : expungedValues.entrySet()) - { - K key = entry.getKey(); - V value = entry.getValue(); - - ICacheExpungeHook<K, V> hook = hooks_.remove(key); - if (hook != null) - { - hook.callMe(key, value); - } - } - expungedValues.clear(); } } - private Hashtable<K, CacheableObject> cache_; - private Map<K, ICacheExpungeHook<K, V>> hooks_; - private Timer timer_; - private static int counter_ = 0; - private static final Logger LOGGER = Logger.getLogger(ExpiringMap.class); - - private void init(long expiration) - { - if (expiration <= 0) - { - throw new IllegalArgumentException("Argument specified must be a positive number"); - } - - cache_ = new Hashtable<K, CacheableObject>(); - hooks_ = new Hashtable<K, ICacheExpungeHook<K, V>>(); - timer_ = new Timer("CACHETABLE-TIMER-" + (++counter_), true); - timer_.schedule(new CacheMonitor(expiration), expiration, expiration); - } + private final NonBlockingHashMap<K, CacheableObject> cache = new NonBlockingHashMap<K, CacheableObject>(); + private final Timer timer; + private static int counter = 0; /* * Specify the TTL for objects in the cache @@ -134,29 +92,29 @@ public class ExpiringMap<K, V> */ public ExpiringMap(long expiration) { - init(expiration); + if (expiration <= 0) + { + throw new IllegalArgumentException("Argument specified must be a positive number"); + } + + timer = new Timer("EXPIRING-MAP-TIMER-" + (++counter), true); + timer.schedule(new CacheMonitor(expiration), expiration / 2, expiration / 2); } public void shutdown() { - timer_.cancel(); + timer.cancel(); } public void put(K key, V value) { - cache_.put(key, new CacheableObject(value)); - } - - public void put(K key, V value, ICacheExpungeHook<K, V> hook) - { - put(key, value); - hooks_.put(key, hook); + cache.put(key, new CacheableObject<V>(value)); } public V get(K key) { V result = null; - CacheableObject co = cache_.get(key); + CacheableObject<V> co = cache.get(key); if (co != null) { result = co.getValue(); @@ -166,7 +124,7 @@ public class ExpiringMap<K, V> public V remove(K key) { - CacheableObject co = cache_.remove(key); + CacheableObject<V> co = cache.remove(key); V result = null; if (co != null) { @@ -178,7 +136,7 @@ public class ExpiringMap<K, V> public long getAge(K key) { long age = 0; - CacheableObject co = cache_.get(key); + CacheableObject<V> co = cache.get(key); if (co != null) { age = co.age; @@ -188,21 +146,21 @@ public class ExpiringMap<K, V> public int size() { - return cache_.size(); + return cache.size(); } public boolean containsKey(K key) { - return cache_.containsKey(key); + return cache.containsKey(key); } public boolean isEmpty() { - return cache_.isEmpty(); + return cache.isEmpty(); } public Set<K> keySet() { - return cache_.keySet(); + return cache.keySet(); } }