Author: jbellis
Date: Tue Dec 28 03:17:24 2010
New Revision: 1053245

URL: http://svn.apache.org/viewvc?rev=1053245&view=rev
Log:
merged from 0.6

Added:
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IMessageCallback.java
      - copied, changed from r1053244, 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/IMessageCallback.java
Modified:
    cassandra/branches/cassandra-0.7/   (props changed)
    cassandra/branches/cassandra-0.7/CHANGES.txt
    
cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
   (props changed)
    
cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
   (props changed)
    
cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
   (props changed)
    
cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
   (props changed)
    
cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
   (props changed)
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/AsyncResult.java
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IAsyncCallback.java
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IAsyncResult.java
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/ExpiringMap.java

Propchange: cassandra/branches/cassandra-0.7/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Dec 28 03:17:24 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6:922689-1053218
+/cassandra/branches/cassandra-0.6:922689-1053244
 /cassandra/branches/cassandra-0.7:1035666,1050269
 /cassandra/trunk:1026516-1026734,1028929
 /incubator/cassandra/branches/cassandra-0.3:774578-796573

Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1053245&r1=1053244&r2=1053245&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Tue Dec 28 03:17:24 2010
@@ -11,6 +11,7 @@ dev
  * add OpenBitSet to support larger bloom filters (CASSANDRA-1555)
  * handle URL-specified log4j regression (CASSANDRA-1907)
  * enable keepalive on intra-cluster sockets (CASSANDRA-1766)
+ * count timeouts towards dynamicsnitch latencies (CASSANDRA-1905)
 
 
 0.7.0-rc3

Propchange: 
cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Dec 28 03:17:24 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1053218
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1053244
 
/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1035666,1050269
 
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1026734,1028929
 
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573

Propchange: 
cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Dec 28 03:17:24 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1053218
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1053244
 
/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1035666,1050269
 
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1026734,1028929
 
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198

Propchange: 
cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Dec 28 03:17:24 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1053218
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1053244
 
/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1035666,1050269
 
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1026734,1028929
 
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573

Propchange: 
cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Dec 28 03:17:24 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1053218
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1053244
 
/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1035666,1050269
 
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1026734,1028929
 
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573

Propchange: 
cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Dec 28 03:17:24 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1053218
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1053244
 
/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1035666,1050269
 
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1026734,1028929
 
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/AsyncResult.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/AsyncResult.java?rev=1053245&r1=1053244&r2=1053245&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/AsyncResult.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/AsyncResult.java
 Tue Dec 28 03:17:24 2010
@@ -94,6 +94,8 @@ class AsyncResult implements IAsyncResul
         {
             lock.unlock();
         }        
+
+        MessagingService.removeRegisteredCallback(response.getMessageId());
     }
 
     public InetAddress getFrom()

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IAsyncCallback.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IAsyncCallback.java?rev=1053245&r1=1053244&r2=1053245&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IAsyncCallback.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IAsyncCallback.java
 Tue Dec 28 03:17:24 2010
@@ -24,7 +24,7 @@ package org.apache.cassandra.net;
  * service.  In particular, if any shared state is referenced, making
  * response alone synchronized will not suffice.
  */
-public interface IAsyncCallback 
+public interface IAsyncCallback extends IMessageCallback
 {
        /**
         * @param msg response received.

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IAsyncResult.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IAsyncResult.java?rev=1053245&r1=1053244&r2=1053245&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IAsyncResult.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IAsyncResult.java
 Tue Dec 28 03:17:24 2010
@@ -22,7 +22,7 @@ import java.net.InetAddress;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-public interface IAsyncResult
+public interface IAsyncResult extends IMessageCallback
 {    
     /**
      * Same operation as the above get() but allows the calling

Copied: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IMessageCallback.java
 (from r1053244, 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/IMessageCallback.java)
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IMessageCallback.java?p2=cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IMessageCallback.java&p1=cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/IMessageCallback.java&r1=1053244&r2=1053245&rev=1053245&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/IMessageCallback.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IMessageCallback.java
 Tue Dec 28 03:17:24 2010
@@ -3,8 +3,3 @@ package org.apache.cassandra.net;
 public interface IMessageCallback
 {
 }
-package org.apache.cassandra.net;
-
-public interface IMessageCallback
-{
-}

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java?rev=1053245&r1=1053244&r2=1053245&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
 Tue Dec 28 03:17:24 2010
@@ -36,6 +36,9 @@ import java.util.concurrent.atomic.Atomi
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
+import com.google.common.base.Function;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,6 +46,8 @@ import org.apache.cassandra.concurrent.D
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.locator.ILatencyPublisher;
+import org.apache.cassandra.locator.ILatencySubscriber;
 import org.apache.cassandra.net.io.SerializerType;
 import org.apache.cassandra.net.sink.SinkManager;
 import org.apache.cassandra.service.GCInspector;
@@ -54,7 +59,7 @@ import org.apache.cassandra.utils.GuidGe
 import org.apache.cassandra.utils.SimpleCondition;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
-public class MessagingService implements MessagingServiceMBean
+public class MessagingService implements MessagingServiceMBean, 
ILatencyPublisher
 {
     private static int version_ = 1;
     //TODO: make this parameter dynamic somehow.  Not sure if config is 
appropriate.
@@ -64,9 +69,9 @@ public class MessagingService implements
     public static final int PROTOCOL_MAGIC = 0xCA552DFA;
 
     /* This records all the results mapped by message Id */
-    private static ExpiringMap<String, IAsyncCallback> callbackMap_;
-    private static ExpiringMap<String, IAsyncResult> taskCompletionMap_;
-    
+    private static ExpiringMap<String, IMessageCallback> callbacks;
+    private static Multimap<String, InetAddress> targets;
+
     /* Lookup table for registering message handlers based on the verb. */
     private static Map<StorageService.Verb, IVerbHandler> verbHandlers_;
 
@@ -83,6 +88,8 @@ public class MessagingService implements
     private SocketThread socketThread;
     private SimpleCondition listenGate;
     private static final Map<StorageService.Verb, AtomicInteger> 
droppedMessages = new EnumMap<StorageService.Verb, 
AtomicInteger>(StorageService.Verb.class);
+    private final List<ILatencySubscriber> subscribers = new 
ArrayList<ILatencySubscriber>();
+
     static
     {
         for (StorageService.Verb verb : StorageService.Verb.values())
@@ -99,15 +106,6 @@ public class MessagingService implements
     {
         listenGate = new SimpleCondition();
         verbHandlers_ = new EnumMap<StorageService.Verb, 
IVerbHandler>(StorageService.Verb.class);
-        /*
-         * Leave callbacks in the cachetable long enough that any related 
messages will arrive
-         * before the callback is evicted from the table. The concurrency 
level is set at 128
-         * which is the sum of the threads in the pool that adds shit into the 
table and the 
-         * pool that retrives the callback from here.
-        */
-        callbackMap_ = new ExpiringMap<String, IAsyncCallback>((long) (1.1 * 
DatabaseDescriptor.getRpcTimeout()));
-        taskCompletionMap_ = new ExpiringMap<String, IAsyncResult>((long) (1.1 
* DatabaseDescriptor.getRpcTimeout()));
-
         streamExecutor_ = new DebuggableThreadPoolExecutor("Streaming", 
DatabaseDescriptor.getCompactionThreadPriority());
         Runnable logDropped = new Runnable()
         {
@@ -118,6 +116,26 @@ public class MessagingService implements
         };
         StorageService.scheduledTasks.scheduleWithFixedDelay(logDropped, 
LOG_DROPPED_INTERVAL_IN_MS, LOG_DROPPED_INTERVAL_IN_MS, TimeUnit.MILLISECONDS);
 
+        Function<String, ?> timeoutReporter = new Function<String, Object>()
+        {
+            public Object apply(String messageId)
+            {
+                Collection<InetAddress> addresses = 
targets.removeAll(messageId);
+                if (addresses == null)
+                    return null;
+
+                for (InetAddress address : addresses)
+                {
+                    for (ILatencySubscriber subscriber : subscribers)
+                        subscriber.receiveTiming(address, (double) 
DatabaseDescriptor.getRpcTimeout());
+                }
+
+                return null;
+            }
+        };
+        targets = ArrayListMultimap.create();
+        callbacks = new ExpiringMap<String, IMessageCallback>((long) (1.1 * 
DatabaseDescriptor.getRpcTimeout()), timeoutReporter);
+
         MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
         try
         {
@@ -230,6 +248,7 @@ public class MessagingService implements
         addCallback(cb, messageId);
         for (InetAddress endpoint : to)
         {
+            targets.put(messageId, endpoint);
             sendOneWay(message, endpoint);
         }
         return messageId;
@@ -237,7 +256,7 @@ public class MessagingService implements
 
     public void addCallback(IAsyncCallback cb, String messageId)
     {
-        callbackMap_.put(messageId, cb);
+        callbacks.put(messageId, cb);
     }
 
     /**
@@ -254,6 +273,7 @@ public class MessagingService implements
     {        
         String messageId = message.getMessageId();
         addCallback(cb, messageId);
+        targets.put(messageId, to);
         sendOneWay(message, to);
         return messageId;
     }
@@ -280,6 +300,7 @@ public class MessagingService implements
         for ( int i = 0; i < messages.length; ++i )
         {
             messages[i].setMessageId(groupId);
+            targets.put(groupId, to.get(i));
             sendOneWay(messages[i], to.get(i));
         }
         return groupId;
@@ -332,7 +353,8 @@ public class MessagingService implements
     public IAsyncResult sendRR(Message message, InetAddress to)
     {
         IAsyncResult iar = new AsyncResult();
-        taskCompletionMap_.put(message.getMessageId(), iar);
+        callbacks.put(message.getMessageId(), iar);
+        targets.put(message.getMessageId(), to);
         sendOneWay(message, to);
         return iar;
     }
@@ -350,6 +372,11 @@ public class MessagingService implements
         streamExecutor_.execute(new FileStreamTask(header, to));
     }
     
+    public void register(ILatencySubscriber subcriber)
+    {
+        subscribers.add(subcriber);
+    }
+
     /** blocks until the processing pools are empty and done. */
     public static void waitFor() throws InterruptedException
     {
@@ -371,10 +398,7 @@ public class MessagingService implements
         }
 
         streamExecutor_.shutdownNow();
-
-        /* shut down the cachetables */
-        taskCompletionMap_.shutdown();
-        callbackMap_.shutdown();
+        callbacks.shutdown();
 
         logger_.info("Shutdown complete (no further commands will be 
processed)");
     }
@@ -391,29 +415,25 @@ public class MessagingService implements
         stage.execute(runnable);
     }
 
-    public static IAsyncCallback getRegisteredCallback(String messageId)
-    {
-        return callbackMap_.get(messageId);
-    }
-    
-    public static void removeRegisteredCallback(String messageId)
+    public static IMessageCallback getRegisteredCallback(String messageId)
     {
-        callbackMap_.remove(messageId);
+        return callbacks.get(messageId);
     }
     
-    public static IAsyncResult getAsyncResult(String messageId)
+    public static IMessageCallback removeRegisteredCallback(String messageId)
     {
-        return taskCompletionMap_.remove(messageId);
+        targets.removeAll(messageId); // TODO fix this when we clean up quorum 
reads to do proper RR
+        return callbacks.remove(messageId);
     }
 
     public static long getRegisteredCallbackAge(String messageId)
     {
-        return callbackMap_.getAge(messageId);
+        return callbacks.getAge(messageId);
     }
 
-    public static long getAsyncResultAge(String messageId)
+    public static void responseReceivedFrom(String messageId, InetAddress from)
     {
-        return taskCompletionMap_.getAge(messageId);
+        targets.remove(messageId, from);
     }
 
     public static void validateMagic(int magic) throws IOException

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/ResponseVerbHandler.java?rev=1053245&r1=1053244&r2=1053245&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
 Tue Dec 28 03:17:24 2010
@@ -18,7 +18,6 @@
 
 package org.apache.cassandra.net;
 
-
 import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.List;
@@ -37,35 +36,28 @@ public class ResponseVerbHandler impleme
 
     public void doVerb(Message message)
     {     
-        String messageId = message.getMessageId();        
-        IAsyncCallback cb = MessagingService.getRegisteredCallback(messageId);
-        double age = 0;
-        if (cb != null)
+        String messageId = message.getMessageId();
+        MessagingService.responseReceivedFrom(messageId, message.getFrom());
+        double age = System.currentTimeMillis() - 
MessagingService.getRegisteredCallbackAge(messageId);
+        IMessageCallback cb = 
MessagingService.getRegisteredCallback(messageId);
+        if (cb == null)
+            return;
+
+        // if cb is not null, then age will be valid
+        for (ILatencySubscriber subscriber : subscribers)
+            subscriber.receiveTiming(message.getFrom(), age);
+
+        if (cb instanceof IAsyncCallback)
         {
             if (logger_.isDebugEnabled())
                 logger_.debug("Processing response on a callback from " + 
message.getMessageId() + "@" + message.getFrom());
-            age = System.currentTimeMillis() - 
MessagingService.getRegisteredCallbackAge(messageId);
-            cb.response(message);
+            ((IAsyncCallback) cb).response(message);
         }
         else
         {
-            IAsyncResult ar = MessagingService.getAsyncResult(messageId);
-            if (ar != null)
-            {
-                if (logger_.isDebugEnabled())
-                    logger_.debug("Processing response on an async result from 
" + message.getMessageId() + "@" + message.getFrom());
-                age = System.currentTimeMillis() - 
MessagingService.getAsyncResultAge(messageId);
-                ar.result(message);
-            }
-        }
-        notifySubscribers(message.getFrom(), age);
-    }
-
-    private void notifySubscribers(InetAddress host, double latency)
-    {
-        for (ILatencySubscriber subscriber : subscribers)
-        {
-            subscriber.receiveTiming(host, latency);
+            if (logger_.isDebugEnabled())
+                logger_.debug("Processing response on an async result from " + 
message.getMessageId() + "@" + message.getFrom());
+            ((IAsyncResult) cb).result(message);
         }
     }
 

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/ExpiringMap.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/ExpiringMap.java?rev=1053245&r1=1053244&r2=1053245&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/ExpiringMap.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/ExpiringMap.java
 Tue Dec 28 03:17:24 2010
@@ -22,7 +22,9 @@ import java.util.Enumeration;
 import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.concurrent.Callable;
 
+import com.google.common.base.Function;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -31,6 +33,7 @@ import org.cliffc.high_scale_lib.NonBloc
 public class ExpiringMap<K, V>
 {
     private static final Logger logger = 
LoggerFactory.getLogger(ExpiringMap.class);
+    private final Function<K, ?> postExpireHook;
 
     private static class CacheableObject<T>
     {
@@ -76,6 +79,7 @@ public class ExpiringMap<K, V>
                     if (co != null && co.isReadyToDie(expiration))
                     {
                         cache.remove(key);
+                        postExpireHook.apply(key);
                     }
                 }
             }
@@ -86,12 +90,18 @@ public class ExpiringMap<K, V>
     private final Timer timer;
     private static int counter = 0;
 
-    /*
-    * Specify the TTL for objects in the cache
-    * in milliseconds.
-    */
     public ExpiringMap(long expiration)
     {
+        this(expiration, null);
+    }
+
+    /**
+     *
+     * @param expiration the TTL for objects in the cache in milliseconds
+     */
+    public ExpiringMap(long expiration, Function<K, ?> postExpireHook)
+    {
+        this.postExpireHook = postExpireHook;
         if (expiration <= 0)
         {
             throw new IllegalArgumentException("Argument specified must be a 
positive number");


Reply via email to