Author: iocanel
Date: Fri Oct 21 21:14:51 2011
New Revision: 1187543

URL: http://svn.apache.org/viewvc?rev=1187543&view=rev
Log:
[DIRECTMEMORY-16] Added non singleton implementation for Cache and Memory 
Manager. Added interfaces for CacheService and MemoryManagerService.

Added:
    
incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/cache/CacheService.java
    
incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/cache/CacheServiceImpl.java
   (contents, props changed)
      - copied, changed from r1186494, 
incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/cache/Cache.java
    
incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerService.java
    
incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerServiceImpl.java
Modified:
    
incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/cache/Cache.java
    
incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManager.java
    
incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/test/ConcurrentTests2.java
    
incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/test/ConcurrentTests3.java
    
incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/test/MemoryManagerTests.java
    
incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/test/Starter.java

Modified: 
incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/cache/Cache.java
URL: 
http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/cache/Cache.java?rev=1187543&r1=1187542&r2=1187543&view=diff
==============================================================================
--- 
incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/cache/Cache.java
 (original)
+++ 
incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/cache/Cache.java
 Fri Oct 21 21:14:51 2011
@@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentMa
 import org.apache.directmemory.measures.Every;
 import org.apache.directmemory.measures.Ram;
 import org.apache.directmemory.memory.MemoryManager;
+import org.apache.directmemory.memory.MemoryManagerService;
 import org.apache.directmemory.memory.OffHeapMemoryBuffer;
 import org.apache.directmemory.memory.Pointer;
 import org.apache.directmemory.misc.Format;
@@ -40,205 +41,106 @@ import com.google.common.collect.MapMake
 
 public class Cache {
 
-       private static Logger logger = 
LoggerFactory.getLogger(MemoryManager.class);
-       private static ConcurrentMap<String, Pointer> map;
-       
-       public static int DEFAULT_CONCURRENCY_LEVEL = 4;
-       public static int DEFAULT_INITIAL_CAPACITY = 100000;
-       
-       public static Serializer serializer = new ProtoStuffSerializerV1();
+       private static Logger logger = LoggerFactory.getLogger(Cache.class);
+  private static MemoryManagerService memoryManager = 
MemoryManager.getMemoryManager();
+  private static CacheService cacheService = new 
CacheServiceImpl(memoryManager);
 
        private Cache() {
                // not instantiable
        }
-       
-       
        private final static Timer timer = new Timer();
 
-    public static void scheduleDisposalEvery(long l) {
-        timer.schedule(new TimerTask() {
-            public void run() {
-                                logger.info("begin scheduled disposal");
-                                collectExpired();
-                                collectLFU();
-                                logger.info("scheduled disposal complete");
-            }
-        }, l);
-        logger.info("disposal scheduled every " + l + " milliseconds");
+    public static void scheduleDisposalEvery(long l){
+      cacheService.scheduleDisposalEvery(l);
     }  
 
        public static void init(int numberOfBuffers, int size, int 
initialCapacity, int concurrencyLevel) {
-               map = new MapMaker()
-                       .concurrencyLevel(concurrencyLevel)
-                       .initialCapacity(initialCapacity)
-                       .makeMap();
-
-               logger.info("*** initializing 
*******************************\r\n" + Format.logo());
-               logger.info("************************************************");
-               MemoryManager.init(numberOfBuffers, size);
-               logger.info("initialized");
-               logger.info(Format.it("number of buffer(s): \t%1d  with %2s 
each", numberOfBuffers, Ram.inMb(size)));
-               logger.info(Format.it("initial capacity: \t%1d", 
initialCapacity));
-               logger.info(Format.it("concurrency level: \t%1d", 
concurrencyLevel));
-               scheduleDisposalEvery(Every.seconds(10));
+               
cacheService.init(numberOfBuffers,size,initialCapacity,concurrencyLevel);
        }
 
        public static void init(int numberOfBuffers, int size) {
-               init(numberOfBuffers, size, DEFAULT_INITIAL_CAPACITY, 
DEFAULT_CONCURRENCY_LEVEL);
+               init(numberOfBuffers, size, 
CacheService.DEFAULT_INITIAL_CAPACITY, CacheService.DEFAULT_CONCURRENCY_LEVEL);
        }
 
        public static Pointer putByteArray(String key, byte[] payload, int 
expiresIn) {
-               Pointer ptr = MemoryManager.store(payload, expiresIn);
-               map.put(key, ptr);
-               return ptr;
+               return cacheService.putByteArray(key,payload,expiresIn);
        }
        
        public static Pointer putByteArray(String key, byte[] payload) {
-               return putByteArray(key, payload, 0);
+               return cacheService.putByteArray(key,payload);
        }
        
        public static Pointer put(String key, Object object) {
-               return put(key, object, 0);
+               return cacheService.put(key,object);
        }
        
        public static Pointer put(String key, Object object, int expiresIn) {
-               try {
-                       byte[] payload = serializer.serialize(object, 
object.getClass());
-                       Pointer ptr = putByteArray(key, payload, expiresIn);
-                       ptr.clazz = object.getClass();
-                       return ptr; 
-               } catch (IOException e) {
-                       logger.error(e.getMessage());
-                       return null;
-               }
+         return cacheService.put(key,object,expiresIn);
        }
        
        public static Pointer updateByteArray(String key, byte[] payload) {
-               Pointer p = map.get(key);
-               p = MemoryManager.update(p, payload);
-               return p;
+               return cacheService.updateByteArray(key, payload);
        }
        
        public static Pointer update(String key, Object object) {
-               Pointer p = map.get(key);
-               try {
-                       p = MemoryManager.update(p, 
serializer.serialize(object, object.getClass()));
-                       p.clazz = object.getClass();
-                       return p;
-               } catch (IOException e) {
-                       logger.error(e.getMessage());
-                       return null;
-               }
+               return cacheService.update(key, object);
        }
        
        public static byte[] retrieveByteArray(String key) {
-               Pointer ptr = getPointer(key);
-               if (ptr == null) return null;
-               if (ptr.expired() || ptr.free) {
-                       map.remove(key);
-                       if (!ptr.free) { 
-                               MemoryManager.free(ptr);
-                       }
-                       return null;
-               } else {
-                       return MemoryManager.retrieve(ptr);
-               }
+               return cacheService.retrieveByteArray(key);
        }
        
        public static Object retrieve(String key) {
-               Pointer ptr = getPointer(key);
-               if (ptr == null) return null;
-               if (ptr.expired() || ptr.free) {
-                       map.remove(key);
-                       if (!ptr.free) { 
-                               MemoryManager.free(ptr);
-                       }
-                       return null;
-               } else {
-                       try {
-                               return 
serializer.deserialize(MemoryManager.retrieve(ptr),ptr.clazz);
-                       } catch (EOFException e) {
-                               logger.error(e.getMessage());
-                       } catch (IOException e) {
-                               logger.error(e.getMessage());
-                       } catch (ClassNotFoundException e) {
-                               logger.error(e.getMessage());
-                       } catch (InstantiationException e) {
-                               logger.error(e.getMessage());
-                       } catch (IllegalAccessException e) {
-                               logger.error(e.getMessage());
-                       }
-               }
-               return null;
+               return cacheService.retrieve(key);
        }
        
        public static Pointer getPointer(String key) {
-               return map.get(key);
+               return cacheService.getPointer(key);
        }
        
        public static void free(String key) {
-               Pointer p = map.remove(key);
-               if (p != null) {
-                       MemoryManager.free(p);
-               }
+               cacheService.free(key);
        }
        
        public static void free(Pointer pointer) {
-               MemoryManager.free(pointer);
+               cacheService.free(pointer);
        }
        
        public static void collectExpired() {
-               MemoryManager.collectExpired();
-               // still have to look for orphan (storing references to freed 
pointers) map entries
+               cacheService.collectExpired();
        }
        
        public static void collectLFU() {
-               MemoryManager.collectLFU();
-               // can possibly clear one whole buffer if it's too fragmented - 
investigate
+               cacheService.collectLFU();
        }
        
        public static void collectAll() {
-                Thread thread = new Thread(){
-                        public void run(){
-                                logger.info("begin disposal");
-                                collectExpired();
-                                collectLFU();
-                                logger.info("disposal complete");
-                        }
-                };
-                thread.start();
+                cacheService.collectAll();
        }
        
        
        public static void clear() {
-               map.clear();
-               MemoryManager.clear();
-               logger.info("Cache cleared");
+               cacheService.clear();
        }
        
        public static long entries() {
-               return map.size();
+               return cacheService.entries();
        }
 
        private static void dump(OffHeapMemoryBuffer mem) {
-               logger.info(Format.it("off-heap - buffer: \t%1d", 
mem.bufferNumber));
-               logger.info(Format.it("off-heap - allocated: \t%1s", 
Ram.inMb(mem.capacity())));
-               logger.info(Format.it("off-heap - used:      \t%1s", 
Ram.inMb(mem.used())));
-               logger.info(Format.it("heap     - max: \t%1s", 
Ram.inMb(Runtime.getRuntime().maxMemory())));
-               logger.info(Format.it("heap     - allocated: \t%1s", 
Ram.inMb(Runtime.getRuntime().totalMemory())));
-               logger.info(Format.it("heap     - free : \t%1s", 
Ram.inMb(Runtime.getRuntime().freeMemory())));
-               logger.info("************************************************");
+               cacheService.dump(mem);
        }
        
        public static void dump() {
-               if (!logger.isInfoEnabled())
-                       return;
-               
-               logger.info("*** DirectMemory statistics ********************");
-               
-               for (OffHeapMemoryBuffer mem : MemoryManager.buffers) {
-                       dump(mem);
-               }
+        cacheService.dump();
        }
+
+  public static Serializer getSerializer() {
+    return cacheService.getSerializer();
+  }
+
+  public static MemoryManagerService getMemoryManager(){
+    return cacheService.getMemoryManager();
+  }
        
 }

Added: 
incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/cache/CacheService.java
URL: 
http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/cache/CacheService.java?rev=1187543&view=auto
==============================================================================
--- 
incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/cache/CacheService.java
 (added)
+++ 
incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/cache/CacheService.java
 Fri Oct 21 21:14:51 2011
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.directmemory.cache;
+
+import java.util.concurrent.ConcurrentMap;
+import org.apache.directmemory.memory.MemoryManagerService;
+import org.apache.directmemory.memory.OffHeapMemoryBuffer;
+import org.apache.directmemory.memory.Pointer;
+import org.apache.directmemory.serialization.Serializer;
+
+public interface CacheService {
+
+  public static int DEFAULT_CONCURRENCY_LEVEL = 4;
+  public static int DEFAULT_INITIAL_CAPACITY = 100000;
+
+  public void init(int numberOfBuffers, int size, int initialCapacity, int 
concurrencyLevel);
+
+  public void init(int numberOfBuffers, int size);
+
+  public void scheduleDisposalEvery(long l);
+
+  public Pointer putByteArray(String key, byte[] payload, int expiresIn);
+
+  public Pointer putByteArray(String key, byte[] payload);
+
+  public Pointer put(String key, Object object);
+
+  public Pointer put(String key, Object object, int expiresIn);
+
+  public Pointer updateByteArray(String key, byte[] payload);
+
+  public Pointer update(String key, Object object);
+
+  public byte[] retrieveByteArray(String key);
+
+  public Object retrieve(String key);
+
+  public Pointer getPointer(String key);
+
+  public void free(String key);
+
+  public void free(Pointer pointer);
+
+  public void collectExpired();
+
+  public void collectLFU();
+
+  public void collectAll();
+
+
+  public void clear();
+
+  public long entries();
+
+  public void dump(OffHeapMemoryBuffer mem);
+
+  public void dump();
+
+  public ConcurrentMap<String, Pointer> getMap();
+
+  public void setMap(ConcurrentMap<String, Pointer> map);
+
+  public Serializer getSerializer();
+
+  public MemoryManagerService getMemoryManager();
+
+  public void setMemoryManager(MemoryManagerService memoryManager);
+
+}

Copied: 
incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/cache/CacheServiceImpl.java
 (from r1186494, 
incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/cache/Cache.java)
URL: 
http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/cache/CacheServiceImpl.java?p2=incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/cache/CacheServiceImpl.java&p1=incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/cache/Cache.java&r1=1186494&r2=1187543&rev=1187543&view=diff
==============================================================================
--- 
incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/cache/Cache.java
 (original)
+++ 
incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/cache/CacheServiceImpl.java
 Fri Oct 21 21:14:51 2011
@@ -1,5 +1,3 @@
-package org.apache.directmemory.cache;
-
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -9,7 +7,7 @@ package org.apache.directmemory.cache;
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *  http://www.apache.org/licenses/LICENSE-2.0
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
@@ -19,15 +17,19 @@ package org.apache.directmemory.cache;
  * under the License.
  */
 
+package org.apache.directmemory.cache;
+
 import java.io.EOFException;
 import java.io.IOException;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.ConcurrentMap;
-
+import com.google.common.collect.MapMaker;
 import org.apache.directmemory.measures.Every;
 import org.apache.directmemory.measures.Ram;
 import org.apache.directmemory.memory.MemoryManager;
+import org.apache.directmemory.memory.MemoryManagerService;
+import org.apache.directmemory.memory.MemoryManagerServiceImpl;
 import org.apache.directmemory.memory.OffHeapMemoryBuffer;
 import org.apache.directmemory.memory.Pointer;
 import org.apache.directmemory.misc.Format;
@@ -36,209 +38,240 @@ import org.apache.directmemory.serializa
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.MapMaker;
+public class CacheServiceImpl implements CacheService {
+
+  private static Logger logger = 
LoggerFactory.getLogger(CacheServiceImpl.class);
+  private ConcurrentMap<String, Pointer> map;
 
-public class Cache {
+  private Serializer serializer = new ProtoStuffSerializerV1();
+  private MemoryManagerService memoryManager = new MemoryManagerServiceImpl();
 
-       private static Logger logger = 
LoggerFactory.getLogger(MemoryManager.class);
-       private static ConcurrentMap<String, Pointer> map;
-       
-       public static int DEFAULT_CONCURRENCY_LEVEL = 4;
-       public static int DEFAULT_INITIAL_CAPACITY = 100000;
-       
-       public static Serializer serializer = new ProtoStuffSerializerV1();
-
-       private Cache() {
-               // not instantiable
-       }
-       
-       
-       private final static Timer timer = new Timer();
-
-    public static void scheduleDisposalEvery(long l) {
-        timer.schedule(new TimerTask() {
-            public void run() {
-                                logger.info("begin scheduled disposal");
-                                collectExpired();
-                                collectLFU();
-                                logger.info("scheduled disposal complete");
-            }
-        }, l);
-        logger.info("disposal scheduled every " + l + " milliseconds");
-    }  
-
-       public static void init(int numberOfBuffers, int size, int 
initialCapacity, int concurrencyLevel) {
-               map = new MapMaker()
-                       .concurrencyLevel(concurrencyLevel)
-                       .initialCapacity(initialCapacity)
-                       .makeMap();
-
-               logger.info("*** initializing 
*******************************\r\n" + Format.logo());
-               logger.info("************************************************");
-               MemoryManager.init(numberOfBuffers, size);
-               logger.info("initialized");
-               logger.info(Format.it("number of buffer(s): \t%1d  with %2s 
each", numberOfBuffers, Ram.inMb(size)));
-               logger.info(Format.it("initial capacity: \t%1d", 
initialCapacity));
-               logger.info(Format.it("concurrency level: \t%1d", 
concurrencyLevel));
-               scheduleDisposalEvery(Every.seconds(10));
-       }
-
-       public static void init(int numberOfBuffers, int size) {
-               init(numberOfBuffers, size, DEFAULT_INITIAL_CAPACITY, 
DEFAULT_CONCURRENCY_LEVEL);
-       }
-
-       public static Pointer putByteArray(String key, byte[] payload, int 
expiresIn) {
-               Pointer ptr = MemoryManager.store(payload, expiresIn);
-               map.put(key, ptr);
-               return ptr;
-       }
-       
-       public static Pointer putByteArray(String key, byte[] payload) {
-               return putByteArray(key, payload, 0);
-       }
-       
-       public static Pointer put(String key, Object object) {
-               return put(key, object, 0);
-       }
-       
-       public static Pointer put(String key, Object object, int expiresIn) {
-               try {
-                       byte[] payload = serializer.serialize(object, 
object.getClass());
-                       Pointer ptr = putByteArray(key, payload, expiresIn);
-                       ptr.clazz = object.getClass();
-                       return ptr; 
-               } catch (IOException e) {
-                       logger.error(e.getMessage());
-                       return null;
-               }
-       }
-       
-       public static Pointer updateByteArray(String key, byte[] payload) {
-               Pointer p = map.get(key);
-               p = MemoryManager.update(p, payload);
-               return p;
-       }
-       
-       public static Pointer update(String key, Object object) {
-               Pointer p = map.get(key);
-               try {
-                       p = MemoryManager.update(p, 
serializer.serialize(object, object.getClass()));
-                       p.clazz = object.getClass();
-                       return p;
-               } catch (IOException e) {
-                       logger.error(e.getMessage());
-                       return null;
-               }
-       }
-       
-       public static byte[] retrieveByteArray(String key) {
-               Pointer ptr = getPointer(key);
-               if (ptr == null) return null;
-               if (ptr.expired() || ptr.free) {
-                       map.remove(key);
-                       if (!ptr.free) { 
-                               MemoryManager.free(ptr);
-                       }
-                       return null;
-               } else {
-                       return MemoryManager.retrieve(ptr);
-               }
-       }
-       
-       public static Object retrieve(String key) {
-               Pointer ptr = getPointer(key);
-               if (ptr == null) return null;
-               if (ptr.expired() || ptr.free) {
-                       map.remove(key);
-                       if (!ptr.free) { 
-                               MemoryManager.free(ptr);
-                       }
-                       return null;
-               } else {
-                       try {
-                               return 
serializer.deserialize(MemoryManager.retrieve(ptr),ptr.clazz);
-                       } catch (EOFException e) {
-                               logger.error(e.getMessage());
-                       } catch (IOException e) {
-                               logger.error(e.getMessage());
-                       } catch (ClassNotFoundException e) {
-                               logger.error(e.getMessage());
-                       } catch (InstantiationException e) {
-                               logger.error(e.getMessage());
-                       } catch (IllegalAccessException e) {
-                               logger.error(e.getMessage());
-                       }
-               }
-               return null;
-       }
-       
-       public static Pointer getPointer(String key) {
-               return map.get(key);
-       }
-       
-       public static void free(String key) {
-               Pointer p = map.remove(key);
-               if (p != null) {
-                       MemoryManager.free(p);
-               }
-       }
-       
-       public static void free(Pointer pointer) {
-               MemoryManager.free(pointer);
-       }
-       
-       public static void collectExpired() {
-               MemoryManager.collectExpired();
-               // still have to look for orphan (storing references to freed 
pointers) map entries
-       }
-       
-       public static void collectLFU() {
-               MemoryManager.collectLFU();
-               // can possibly clear one whole buffer if it's too fragmented - 
investigate
-       }
-       
-       public static void collectAll() {
-                Thread thread = new Thread(){
-                        public void run(){
-                                logger.info("begin disposal");
-                                collectExpired();
-                                collectLFU();
-                                logger.info("disposal complete");
-                        }
-                };
-                thread.start();
-       }
-       
-       
-       public static void clear() {
-               map.clear();
-               MemoryManager.clear();
-               logger.info("Cache cleared");
-       }
-       
-       public static long entries() {
-               return map.size();
-       }
-
-       private static void dump(OffHeapMemoryBuffer mem) {
-               logger.info(Format.it("off-heap - buffer: \t%1d", 
mem.bufferNumber));
-               logger.info(Format.it("off-heap - allocated: \t%1s", 
Ram.inMb(mem.capacity())));
-               logger.info(Format.it("off-heap - used:      \t%1s", 
Ram.inMb(mem.used())));
-               logger.info(Format.it("heap     - max: \t%1s", 
Ram.inMb(Runtime.getRuntime().maxMemory())));
-               logger.info(Format.it("heap     - allocated: \t%1s", 
Ram.inMb(Runtime.getRuntime().totalMemory())));
-               logger.info(Format.it("heap     - free : \t%1s", 
Ram.inMb(Runtime.getRuntime().freeMemory())));
-               logger.info("************************************************");
-       }
-       
-       public static void dump() {
-               if (!logger.isInfoEnabled())
-                       return;
-               
-               logger.info("*** DirectMemory statistics ********************");
-               
-               for (OffHeapMemoryBuffer mem : MemoryManager.buffers) {
-                       dump(mem);
-               }
-       }
-       
+  private final Timer timer = new Timer();
+
+
+  /**
+   * Constructor
+   */
+  public CacheServiceImpl() {
+  }
+
+  /**
+   * Constructor
+   *
+   * @param memoryManager
+   */
+  public CacheServiceImpl(MemoryManagerService memoryManager) {
+    this.memoryManager = memoryManager;
+  }
+
+
+  public void scheduleDisposalEvery(long l) {
+    timer.schedule(new TimerTask() {
+      public void run() {
+        logger.info("begin scheduled disposal");
+        collectExpired();
+        collectLFU();
+        logger.info("scheduled disposal complete");
+      }
+    }, l);
+    logger.info("disposal scheduled every " + l + " milliseconds");
+  }
+
+  public void init(int numberOfBuffers, int size, int initialCapacity, int 
concurrencyLevel) {
+    map = new MapMaker()
+            .concurrencyLevel(concurrencyLevel)
+            .initialCapacity(initialCapacity)
+            .makeMap();
+
+    logger.info("*** initializing *******************************\r\n" + 
Format.logo());
+    logger.info("************************************************");
+    memoryManager.init(numberOfBuffers, size);
+    logger.info("initialized");
+    logger.info(Format.it("number of buffer(s): \t%1d  with %2s each", 
numberOfBuffers, Ram.inMb(size)));
+    logger.info(Format.it("initial capacity: \t%1d", initialCapacity));
+    logger.info(Format.it("concurrency level: \t%1d", concurrencyLevel));
+    scheduleDisposalEvery(Every.seconds(10));
+  }
+
+  public void init(int numberOfBuffers, int size) {
+    init(numberOfBuffers, size, DEFAULT_INITIAL_CAPACITY, 
DEFAULT_CONCURRENCY_LEVEL);
+  }
+
+  public Pointer putByteArray(String key, byte[] payload, int expiresIn) {
+    Pointer ptr = memoryManager.store(payload, expiresIn);
+    map.put(key, ptr);
+    return ptr;
+  }
+
+  public Pointer putByteArray(String key, byte[] payload) {
+    return putByteArray(key, payload, 0);
+  }
+
+  public Pointer put(String key, Object object) {
+    return put(key, object, 0);
+  }
+
+  public Pointer put(String key, Object object, int expiresIn) {
+    try {
+      byte[] payload = serializer.serialize(object, object.getClass());
+      Pointer ptr = putByteArray(key, payload, expiresIn);
+      ptr.clazz = object.getClass();
+      return ptr;
+    } catch (IOException e) {
+      logger.error(e.getMessage());
+      return null;
+    }
+  }
+
+  public Pointer updateByteArray(String key, byte[] payload) {
+    Pointer p = map.get(key);
+    p = MemoryManager.update(p, payload);
+    return p;
+  }
+
+  public Pointer update(String key, Object object) {
+    Pointer p = map.get(key);
+    try {
+      p = MemoryManager.update(p, serializer.serialize(object, 
object.getClass()));
+      p.clazz = object.getClass();
+      return p;
+    } catch (IOException e) {
+      logger.error(e.getMessage());
+      return null;
+    }
+  }
+
+  public byte[] retrieveByteArray(String key) {
+    Pointer ptr = getPointer(key);
+    if (ptr == null) return null;
+    if (ptr.expired() || ptr.free) {
+      map.remove(key);
+      if (!ptr.free) {
+        MemoryManager.free(ptr);
+      }
+      return null;
+    } else {
+      return MemoryManager.retrieve(ptr);
+    }
+  }
+
+  public Object retrieve(String key) {
+    Pointer ptr = getPointer(key);
+    if (ptr == null) return null;
+    if (ptr.expired() || ptr.free) {
+      map.remove(key);
+      if (!ptr.free) {
+        MemoryManager.free(ptr);
+      }
+      return null;
+    } else {
+      try {
+        return serializer.deserialize(MemoryManager.retrieve(ptr), ptr.clazz);
+      } catch (EOFException e) {
+        logger.error(e.getMessage());
+      } catch (IOException e) {
+        logger.error(e.getMessage());
+      } catch (ClassNotFoundException e) {
+        logger.error(e.getMessage());
+      } catch (InstantiationException e) {
+        logger.error(e.getMessage());
+      } catch (IllegalAccessException e) {
+        logger.error(e.getMessage());
+      }
+    }
+    return null;
+  }
+
+  public Pointer getPointer(String key) {
+    return map.get(key);
+  }
+
+  public void free(String key) {
+    Pointer p = map.remove(key);
+    if (p != null) {
+      MemoryManager.free(p);
+    }
+  }
+
+  public void free(Pointer pointer) {
+    MemoryManager.free(pointer);
+  }
+
+  public void collectExpired() {
+    MemoryManager.collectExpired();
+    // still have to look for orphan (storing references to freed pointers) 
map entries
+  }
+
+  public void collectLFU() {
+    MemoryManager.collectLFU();
+    // can possibly clear one whole buffer if it's too fragmented - investigate
+  }
+
+  public void collectAll() {
+    Thread thread = new Thread() {
+      public void run() {
+        logger.info("begin disposal");
+        collectExpired();
+        collectLFU();
+        logger.info("disposal complete");
+      }
+    };
+    thread.start();
+  }
+
+
+  public void clear() {
+    map.clear();
+    MemoryManager.clear();
+    logger.info("Cache cleared");
+  }
+
+  public long entries() {
+    return map.size();
+  }
+
+  public void dump(OffHeapMemoryBuffer mem) {
+    logger.info(Format.it("off-heap - buffer: \t%1d", mem.bufferNumber));
+    logger.info(Format.it("off-heap - allocated: \t%1s", 
Ram.inMb(mem.capacity())));
+    logger.info(Format.it("off-heap - used:      \t%1s", 
Ram.inMb(mem.used())));
+    logger.info(Format.it("heap        - max: \t%1s", 
Ram.inMb(Runtime.getRuntime().maxMemory())));
+    logger.info(Format.it("heap     - allocated: \t%1s", 
Ram.inMb(Runtime.getRuntime().totalMemory())));
+    logger.info(Format.it("heap     - free : \t%1s", 
Ram.inMb(Runtime.getRuntime().freeMemory())));
+    logger.info("************************************************");
+  }
+
+  public void dump() {
+    if (!logger.isInfoEnabled())
+      return;
+
+    logger.info("*** DirectMemory statistics ********************");
+
+    for (OffHeapMemoryBuffer mem : memoryManager.getBuffers()) {
+      dump(mem);
+    }
+  }
+
+  public ConcurrentMap<String, Pointer> getMap() {
+    return map;
+  }
+
+  public void setMap(ConcurrentMap<String, Pointer> map) {
+    this.map = map;
+  }
+
+  public Serializer getSerializer() {
+    return serializer;
+  }
+
+  public void setSerializer(Serializer serializer) {
+    this.serializer = serializer;
+  }
+
+  public MemoryManagerService getMemoryManager() {
+    return memoryManager;
+  }
+
+  public void setMemoryManager(MemoryManagerService memoryManager) {
+    this.memoryManager = memoryManager;
+  }
 }

Propchange: 
incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/cache/CacheServiceImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/cache/CacheServiceImpl.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision

Modified: 
incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManager.java
URL: 
http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManager.java?rev=1187543&r1=1187542&r2=1187543&view=diff
==============================================================================
--- 
incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManager.java
 (original)
+++ 
incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManager.java
 Fri Oct 21 21:14:51 2011
@@ -20,42 +20,23 @@ package org.apache.directmemory.memory;
  */
 
 import java.util.List;
-import java.util.Vector;
-
-import org.apache.directmemory.measures.Ram;
-import org.apache.directmemory.misc.Format;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class MemoryManager {
        private static Logger logger = 
LoggerFactory.getLogger(MemoryManager.class);
-       public static List<OffHeapMemoryBuffer> buffers = new 
Vector<OffHeapMemoryBuffer>();
-       public static OffHeapMemoryBuffer activeBuffer = null;
+       private static MemoryManagerService memoryManager = new 
MemoryManagerServiceImpl();;
        
        private MemoryManager() {
                //static class
        }
        
        public static void init(int numberOfBuffers, int size) {
-               for (int i = 0; i < numberOfBuffers; i++) {
-                       buffers.add(OffHeapMemoryBuffer.createNew(size, i));
-               }
-               activeBuffer = buffers.get(0);
-               logger.info(Format.it("MemoryManager initialized - %d buffers, 
%s each", numberOfBuffers, Ram.inMb(size)));
+    memoryManager.init(numberOfBuffers,size);
        }
        
        public static Pointer store(byte[] payload, int expiresIn) {
-               Pointer p = activeBuffer.store(payload, expiresIn);
-               if (p == null) {
-                       if (activeBuffer.bufferNumber+1 == buffers.size()) {
-                               return null;
-                       } else {
-                               // try next buffer
-                               activeBuffer = 
buffers.get(activeBuffer.bufferNumber+1);
-                               p = activeBuffer.store(payload, expiresIn);
-                       }
-               }
-               return p;
+               return memoryManager.store(payload,expiresIn);
        }
        
        public static Pointer store(byte[] payload) {
@@ -63,54 +44,43 @@ public class MemoryManager {
        }
        
        public static Pointer update(Pointer pointer, byte[] payload) {
-               Pointer p = activeBuffer.update(pointer, payload);
-               if (p == null) {
-                       if (activeBuffer.bufferNumber == buffers.size()) {
-                               return null;
-                       } else {
-                               // try next buffer
-                               activeBuffer = 
buffers.get(activeBuffer.bufferNumber+1);
-                               p = activeBuffer.store(payload);
-                       }
-               }
-               return p;
+               return memoryManager.update(pointer,payload);
        }
        
        public static byte[] retrieve(Pointer pointer) {
-               return buffers.get(pointer.bufferNumber).retrieve(pointer);
+               return memoryManager.retrieve(pointer);
        }
        
        public static void free(Pointer pointer) {
-               buffers.get(pointer.bufferNumber).free(pointer);
+               memoryManager.free(pointer);
        }
        
        public static void clear() {
-               for (OffHeapMemoryBuffer buffer : buffers) {
-                       buffer.clear();
-               }
-               activeBuffer = buffers.get(0);
+               memoryManager.clear();
        }
        
        public static long capacity() {
-               long totalCapacity = 0;
-               for (OffHeapMemoryBuffer buffer : buffers) {
-                       totalCapacity += buffer.capacity();
-               }
-               return totalCapacity;
+               return memoryManager.capacity();
        }
 
        public static long collectExpired() {
-               long disposed = 0;
-               for (OffHeapMemoryBuffer buffer : buffers) {
-                       disposed += buffer.collectExpired();
-               }
-               return disposed;
+               return memoryManager.collectExpired();
        }
 
        public static void collectLFU() {
-               for (OffHeapMemoryBuffer buf : MemoryManager.buffers) {
-                       buf.collectLFU(-1);
-               }
+               memoryManager.collectLFU();
        }
 
+   public static List<OffHeapMemoryBuffer> getBuffers() {
+    return memoryManager.getBuffers();
+  }
+
+
+  public static OffHeapMemoryBuffer getActiveBuffer() {
+    return memoryManager.getActiveBuffer();
+  }
+
+  public static MemoryManagerService getMemoryManager() {
+    return memoryManager;
+  }
 }

Added: 
incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerService.java
URL: 
http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerService.java?rev=1187543&view=auto
==============================================================================
--- 
incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerService.java
 (added)
+++ 
incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerService.java
 Fri Oct 21 21:14:51 2011
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.directmemory.memory;
+
+
+import java.util.List;
+
+public interface MemoryManagerService {
+
+  public void init(int numberOfBuffers, int size);
+
+  public Pointer store(byte[] payload, int expiresIn);
+
+  public Pointer store(byte[] payload);
+
+  public Pointer update(Pointer pointer, byte[] payload);
+
+  public byte[] retrieve(Pointer pointer);
+
+  public void free(Pointer pointer);
+
+  public void clear();
+
+  public long capacity();
+
+  public long collectExpired();
+
+  public void collectLFU();
+
+  public List<OffHeapMemoryBuffer> getBuffers();
+
+  public OffHeapMemoryBuffer getActiveBuffer();
+
+}

Added: 
incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerServiceImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerServiceImpl.java?rev=1187543&view=auto
==============================================================================
--- 
incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerServiceImpl.java
 (added)
+++ 
incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerServiceImpl.java
 Fri Oct 21 21:14:51 2011
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.directmemory.memory;
+
+import java.util.List;
+import java.util.Vector;
+import org.apache.directmemory.measures.Ram;
+import org.apache.directmemory.misc.Format;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MemoryManagerServiceImpl implements MemoryManagerService {
+
+  private static Logger logger = LoggerFactory.getLogger(MemoryManager.class);
+  public List<OffHeapMemoryBuffer> buffers = new Vector<OffHeapMemoryBuffer>();
+  public OffHeapMemoryBuffer activeBuffer = null;
+
+  public MemoryManagerServiceImpl() {
+  }
+
+  public void init(int numberOfBuffers, int size) {
+    for (int i = 0; i < numberOfBuffers; i++) {
+      buffers.add(OffHeapMemoryBuffer.createNew(size, i));
+    }
+    activeBuffer = buffers.get(0);
+    logger.info(Format.it("MemoryManager initialized - %d buffers, %s each", 
numberOfBuffers, Ram.inMb(size)));
+  }
+
+  public Pointer store(byte[] payload, int expiresIn) {
+    Pointer p = activeBuffer.store(payload, expiresIn);
+    if (p == null) {
+      if (activeBuffer.bufferNumber + 1 == buffers.size()) {
+        return null;
+      } else {
+        // try next buffer
+        activeBuffer = buffers.get(activeBuffer.bufferNumber + 1);
+        p = activeBuffer.store(payload, expiresIn);
+      }
+    }
+    return p;
+  }
+
+  public Pointer store(byte[] payload) {
+    return store(payload, 0);
+  }
+
+  public Pointer update(Pointer pointer, byte[] payload) {
+    Pointer p = activeBuffer.update(pointer, payload);
+    if (p == null) {
+      if (activeBuffer.bufferNumber == buffers.size()) {
+        return null;
+      } else {
+        // try next buffer
+        activeBuffer = buffers.get(activeBuffer.bufferNumber + 1);
+        p = activeBuffer.store(payload);
+      }
+    }
+    return p;
+  }
+
+  public byte[] retrieve(Pointer pointer) {
+    return buffers.get(pointer.bufferNumber).retrieve(pointer);
+  }
+
+  public void free(Pointer pointer) {
+    buffers.get(pointer.bufferNumber).free(pointer);
+  }
+
+  public void clear() {
+    for (OffHeapMemoryBuffer buffer : buffers) {
+      buffer.clear();
+    }
+    activeBuffer = buffers.get(0);
+  }
+
+  public long capacity() {
+    long totalCapacity = 0;
+    for (OffHeapMemoryBuffer buffer : buffers) {
+      totalCapacity += buffer.capacity();
+    }
+    return totalCapacity;
+  }
+
+  public long collectExpired() {
+    long disposed = 0;
+    for (OffHeapMemoryBuffer buffer : buffers) {
+      disposed += buffer.collectExpired();
+    }
+    return disposed;
+  }
+
+  public void collectLFU() {
+    for (OffHeapMemoryBuffer buf : buffers) {
+      buf.collectLFU(-1);
+    }
+  }
+
+  public List<OffHeapMemoryBuffer> getBuffers() {
+    return buffers;
+  }
+
+  public void setBuffers(List<OffHeapMemoryBuffer> buffers) {
+    this.buffers = buffers;
+  }
+
+  public OffHeapMemoryBuffer getActiveBuffer() {
+    return activeBuffer;
+  }
+
+  public void setActiveBuffer(OffHeapMemoryBuffer activeBuffer) {
+    this.activeBuffer = activeBuffer;
+  }
+}

Modified: 
incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/test/ConcurrentTests2.java
URL: 
http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/test/ConcurrentTests2.java?rev=1187543&r1=1187542&r2=1187543&view=diff
==============================================================================
--- 
incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/test/ConcurrentTests2.java
 (original)
+++ 
incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/test/ConcurrentTests2.java
 Fri Oct 21 21:14:51 2011
@@ -191,7 +191,7 @@ public class ConcurrentTests2 {
        @AfterClass
        public static void dump() {
                
-               for (OffHeapMemoryBuffer mem : MemoryManager.buffers) {
+               for (OffHeapMemoryBuffer mem : MemoryManager.getBuffers()) {
                        dump(mem);
                }
                

Modified: 
incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/test/ConcurrentTests3.java
URL: 
http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/test/ConcurrentTests3.java?rev=1187543&r1=1187542&r2=1187543&view=diff
==============================================================================
--- 
incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/test/ConcurrentTests3.java
 (original)
+++ 
incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/test/ConcurrentTests3.java
 Fri Oct 21 21:14:51 2011
@@ -222,7 +222,7 @@ public class ConcurrentTests3 {
        @AfterClass
        public static void dump() {
                
-               for (OffHeapMemoryBuffer mem : MemoryManager.buffers) {
+               for (OffHeapMemoryBuffer mem : MemoryManager.getBuffers()) {
                        dump(mem);
                }
                

Modified: 
incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/test/MemoryManagerTests.java
URL: 
http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/test/MemoryManagerTests.java?rev=1187543&r1=1187542&r2=1187543&view=diff
==============================================================================
--- 
incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/test/MemoryManagerTests.java
 (original)
+++ 
incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/test/MemoryManagerTests.java
 Fri Oct 21 21:14:51 2011
@@ -58,9 +58,9 @@ public class MemoryManagerTests {
                logger.info("stored");
                assertNotNull(p);
                assertEquals(size,p.end);
-               assertEquals(size, MemoryManager.activeBuffer.used());
+               assertEquals(size, MemoryManager.getActiveBuffer().used());
                MemoryManager.free(p);
-               assertEquals(0, MemoryManager.activeBuffer.used());             
+               assertEquals(0, MemoryManager.getActiveBuffer().used());
                logger.info("end");
        }
 
@@ -85,7 +85,7 @@ public class MemoryManagerTests {
        
        @Test
        public void readTest() {
-               for (OffHeapMemoryBuffer buffer : MemoryManager.buffers) {
+               for (OffHeapMemoryBuffer buffer : MemoryManager.getBuffers()) {
                        for (Pointer ptr : buffer.pointers) {
                                if (!ptr.free) {
                                        byte[] res = 
MemoryManager.retrieve(ptr);

Modified: 
incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/test/Starter.java
URL: 
http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/test/Starter.java?rev=1187543&r1=1187542&r2=1187543&view=diff
==============================================================================
--- 
incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/test/Starter.java
 (original)
+++ 
incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/test/Starter.java
 Fri Oct 21 21:14:51 2011
@@ -106,7 +106,7 @@ public class Starter {
                logger.info("...done in " + (System.currentTimeMillis() - 
start) + " msecs.");
                logger.info("---------------------------------");
 
-               for (OffHeapMemoryBuffer buf : MemoryManager.buffers) {
+               for (OffHeapMemoryBuffer buf : MemoryManager.getBuffers()) {
                        dump(buf);
                }
        }


Reply via email to