Author: jbellis
Date: Mon Dec 27 16:48:16 2010
New Revision: 1053105

URL: http://svn.apache.org/viewvc?rev=1053105&view=rev
Log:
convert ConsistencyChecker to use an executor as in 0.7 to ease merging of 
CASSANDRA-1905
patch by jbellis

Removed:
    
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/cache/ICacheExpungeHook.java
Modified:
    
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java
    
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/ExpiringMap.java

Modified: 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java?rev=1053105&r1=1053104&r2=1053105&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java
 (original)
+++ 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java
 Mon Dec 27 16:48:16 2010
@@ -26,12 +26,15 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
-import org.apache.log4j.Logger;
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.cache.ICacheExpungeHook;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamily;
@@ -41,7 +44,7 @@ import org.apache.cassandra.db.Row;
 import org.apache.cassandra.net.IAsyncCallback;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.ExpiringMap;
+import org.apache.cassandra.utils.WrappedRunnable;
 
 /**
  * ConsistencyChecker does the following:
@@ -59,8 +62,9 @@ import org.apache.cassandra.utils.Expiri
  */
 class ConsistencyChecker implements Runnable
 {
-       private static Logger logger_ = 
Logger.getLogger(ConsistencyChecker.class);
-    private static ExpiringMap<String, String> readRepairTable_ = new 
ExpiringMap<String, String>(DatabaseDescriptor.getRpcTimeout());
+    private static Logger logger_ = 
LoggerFactory.getLogger(ConsistencyChecker.class);
+
+    private static ScheduledExecutorService executor_ = new 
ScheduledThreadPoolExecutor(1); // TODO add JMX
 
     private final Row row_;
     protected final List<InetAddress> replicas_;
@@ -126,7 +130,7 @@ class ConsistencyChecker implements Runn
                     ReadCommand readCommand = constructReadMessage(false);
                     Message message = readCommand.makeReadMessage();
                     if (logger_.isDebugEnabled())
-                        logger_.debug("Digest mismatch; re-reading " + 
readCommand_.key + " from " + message.getMessageId() + "@[" + 
StringUtils.join(replicas_, ", ") + "]");                         
+                        logger_.debug("Digest mismatch; re-reading " + 
readCommand_.key + " from " + message.getMessageId() + "@[" + 
StringUtils.join(replicas_, ", ") + "]");
                     MessagingService.instance.addCallback(new 
DataRepairHandler(), message.getMessageId());
                     for (InetAddress endpoint : replicas_)
                     {
@@ -144,12 +148,12 @@ class ConsistencyChecker implements Runn
         }
     }
 
-    class DataRepairHandler implements IAsyncCallback, 
ICacheExpungeHook<String, String>
+    class DataRepairHandler implements IAsyncCallback
        {
                private final Collection<Message> responses_ = new 
LinkedBlockingQueue<Message>();
                private final ReadResponseResolver readResponseResolver_;
                private final int majority_;
-               
+
         public DataRepairHandler() throws IOException
         {
             readResponseResolver_ = new 
ReadResponseResolver(readCommand_.table, readCommand_.key, replicas_.size());
@@ -170,20 +174,15 @@ class ConsistencyChecker implements Runn
             readResponseResolver_.preprocess(message);
             if (responses_.size() == majority_)
             {
-                String messageId = message.getMessageId();
-                readRepairTable_.put(messageId, messageId, this);
-            }
-        }
-
-               public void callMe(String key, String value)
-               {
-            try
-                       {
-                               readResponseResolver_.resolve(responses_);
-            }
-            catch (Exception ex)
-            {
-                throw new RuntimeException(ex);
+                Runnable runnable = new WrappedRunnable()
+                {
+                    public void runMayThrow() throws IOException, 
DigestMismatchException
+                    {
+                        readResponseResolver_.resolve(responses_);
+                    }
+                };
+                // give remaining replicas until timeout to reply and get 
added to responses_
+                executor_.schedule(runnable, 
DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
             }
         }
     }

Modified: 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/ExpiringMap.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/ExpiringMap.java?rev=1053105&r1=1053104&r2=1053105&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/ExpiringMap.java
 (original)
+++ 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/ExpiringMap.java
 Mon Dec 27 16:48:16 2010
@@ -18,41 +18,34 @@
 
 package org.apache.cassandra.utils;
 
-import java.util.*;
-import java.util.Map.Entry;
+import java.util.Enumeration;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
 
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.cache.ICacheExpungeHook;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
 public class ExpiringMap<K, V>
 {
-    private class CacheableObject
+    private static final Logger logger = 
LoggerFactory.getLogger(ExpiringMap.class);
+
+    private static class CacheableObject<T>
     {
-        private V value_;
-        public long age;
+        private final T value;
+        private final long age;
 
-        CacheableObject(V o)
+        CacheableObject(T o)
         {
-            value_ = o;
+            value = o;
             age = System.currentTimeMillis();
         }
 
-        @Override
-        public boolean equals(Object o)
-        {
-            return value_.equals(o);
-        }
-
-        @Override
-        public int hashCode()
+        T getValue()
         {
-            return value_.hashCode();
-        }
-
-        V getValue()
-        {
-            return value_;
+            return value;
         }
 
         boolean isReadyToDie(long expiration)
@@ -63,70 +56,35 @@ public class ExpiringMap<K, V>
 
     private class CacheMonitor extends TimerTask
     {
-        private long expiration_;
+        private final long expiration;
 
         CacheMonitor(long expiration)
         {
-            expiration_ = expiration;
+            this.expiration = expiration;
         }
 
         @Override
         public void run()
         {
-            Map<K, V> expungedValues = new HashMap<K, V>();
-            synchronized (cache_)
+            synchronized (cache)
             {
-                Enumeration<K> e = cache_.keys();
+                Enumeration<K> e = cache.keys();
                 while (e.hasMoreElements())
                 {
                     K key = e.nextElement();
-                    CacheableObject co = cache_.get(key);
-                    if (co != null && co.isReadyToDie(expiration_))
+                    CacheableObject co = cache.get(key);
+                    if (co != null && co.isReadyToDie(expiration))
                     {
-                        V v = co.getValue();
-                        if (null != v)
-                        {
-                            expungedValues.put(key, v);
-                        }
-                        cache_.remove(key);
+                        cache.remove(key);
                     }
                 }
             }
-
-            /* Calling the hooks on the keys that have been expunged */
-            for (Entry<K, V> entry : expungedValues.entrySet())
-            {
-                K key = entry.getKey();
-                V value = entry.getValue();
-                
-                ICacheExpungeHook<K, V> hook = hooks_.remove(key);
-                if (hook != null)
-                {
-                    hook.callMe(key, value);
-                }
-            }
-            expungedValues.clear();
         }
     }
 
-    private Hashtable<K, CacheableObject> cache_;
-    private Map<K, ICacheExpungeHook<K, V>> hooks_;
-    private Timer timer_;
-    private static int counter_ = 0;
-    private static final Logger LOGGER = Logger.getLogger(ExpiringMap.class);
-
-    private void init(long expiration)
-    {
-        if (expiration <= 0)
-        {
-            throw new IllegalArgumentException("Argument specified must be a 
positive number");
-        }
-
-        cache_ = new Hashtable<K, CacheableObject>();
-        hooks_ = new Hashtable<K, ICacheExpungeHook<K, V>>();
-        timer_ = new Timer("CACHETABLE-TIMER-" + (++counter_), true);
-        timer_.schedule(new CacheMonitor(expiration), expiration, expiration);
-    }
+    private final NonBlockingHashMap<K, CacheableObject> cache = new 
NonBlockingHashMap<K, CacheableObject>();
+    private final Timer timer;
+    private static int counter = 0;
 
     /*
     * Specify the TTL for objects in the cache
@@ -134,29 +92,29 @@ public class ExpiringMap<K, V>
     */
     public ExpiringMap(long expiration)
     {
-        init(expiration);
+        if (expiration <= 0)
+        {
+            throw new IllegalArgumentException("Argument specified must be a 
positive number");
+        }
+
+        timer = new Timer("EXPIRING-MAP-TIMER-" + (++counter), true);
+        timer.schedule(new CacheMonitor(expiration), expiration / 2, 
expiration / 2);
     }
 
     public void shutdown()
     {
-        timer_.cancel();
+        timer.cancel();
     }
 
     public void put(K key, V value)
     {
-        cache_.put(key, new CacheableObject(value));
-    }
-
-    public void put(K key, V value, ICacheExpungeHook<K, V> hook)
-    {
-        put(key, value);
-        hooks_.put(key, hook);
+        cache.put(key, new CacheableObject<V>(value));
     }
 
     public V get(K key)
     {
         V result = null;
-        CacheableObject co = cache_.get(key);
+        CacheableObject<V> co = cache.get(key);
         if (co != null)
         {
             result = co.getValue();
@@ -166,7 +124,7 @@ public class ExpiringMap<K, V>
 
     public V remove(K key)
     {
-        CacheableObject co = cache_.remove(key);
+        CacheableObject<V> co = cache.remove(key);
         V result = null;
         if (co != null)
         {
@@ -178,7 +136,7 @@ public class ExpiringMap<K, V>
     public long getAge(K key)
     {
         long age = 0;
-        CacheableObject co = cache_.get(key);
+        CacheableObject<V> co = cache.get(key);
         if (co != null)
         {
             age = co.age;
@@ -188,21 +146,21 @@ public class ExpiringMap<K, V>
 
     public int size()
     {
-        return cache_.size();
+        return cache.size();
     }
 
     public boolean containsKey(K key)
     {
-        return cache_.containsKey(key);
+        return cache.containsKey(key);
     }
 
     public boolean isEmpty()
     {
-        return cache_.isEmpty();
+        return cache.isEmpty();
     }
 
     public Set<K> keySet()
     {
-        return cache_.keySet();
+        return cache.keySet();
     }
 }


Reply via email to