Author: iocanel
Date: Fri Oct 21 21:14:51 2011
New Revision: 1187543
URL: http://svn.apache.org/viewvc?rev=1187543&view=rev
Log:
[DIRECTMEMORY-16] Added non singleton implementation for Cache and Memory
Manager. Added interfaces for CacheService and MemoryManagerService.
Added:
incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/cache/CacheService.java
incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/cache/CacheServiceImpl.java
(contents, props changed)
- copied, changed from r1186494,
incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/cache/Cache.java
incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerService.java
incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerServiceImpl.java
Modified:
incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/cache/Cache.java
incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManager.java
incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/test/ConcurrentTests2.java
incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/test/ConcurrentTests3.java
incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/test/MemoryManagerTests.java
incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/test/Starter.java
Modified:
incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/cache/Cache.java
URL:
http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/cache/Cache.java?rev=1187543&r1=1187542&r2=1187543&view=diff
==============================================================================
---
incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/cache/Cache.java
(original)
+++
incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/cache/Cache.java
Fri Oct 21 21:14:51 2011
@@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentMa
import org.apache.directmemory.measures.Every;
import org.apache.directmemory.measures.Ram;
import org.apache.directmemory.memory.MemoryManager;
+import org.apache.directmemory.memory.MemoryManagerService;
import org.apache.directmemory.memory.OffHeapMemoryBuffer;
import org.apache.directmemory.memory.Pointer;
import org.apache.directmemory.misc.Format;
@@ -40,205 +41,106 @@ import com.google.common.collect.MapMake
public class Cache {
- private static Logger logger =
LoggerFactory.getLogger(MemoryManager.class);
- private static ConcurrentMap<String, Pointer> map;
-
- public static int DEFAULT_CONCURRENCY_LEVEL = 4;
- public static int DEFAULT_INITIAL_CAPACITY = 100000;
-
- public static Serializer serializer = new ProtoStuffSerializerV1();
+ private static Logger logger = LoggerFactory.getLogger(Cache.class);
+ private static MemoryManagerService memoryManager =
MemoryManager.getMemoryManager();
+ private static CacheService cacheService = new
CacheServiceImpl(memoryManager);
private Cache() {
// not instantiable
}
-
-
private final static Timer timer = new Timer();
- public static void scheduleDisposalEvery(long l) {
- timer.schedule(new TimerTask() {
- public void run() {
- logger.info("begin scheduled disposal");
- collectExpired();
- collectLFU();
- logger.info("scheduled disposal complete");
- }
- }, l);
- logger.info("disposal scheduled every " + l + " milliseconds");
+ public static void scheduleDisposalEvery(long l){
+ cacheService.scheduleDisposalEvery(l);
}
public static void init(int numberOfBuffers, int size, int
initialCapacity, int concurrencyLevel) {
- map = new MapMaker()
- .concurrencyLevel(concurrencyLevel)
- .initialCapacity(initialCapacity)
- .makeMap();
-
- logger.info("*** initializing
*******************************\r\n" + Format.logo());
- logger.info("************************************************");
- MemoryManager.init(numberOfBuffers, size);
- logger.info("initialized");
- logger.info(Format.it("number of buffer(s): \t%1d with %2s
each", numberOfBuffers, Ram.inMb(size)));
- logger.info(Format.it("initial capacity: \t%1d",
initialCapacity));
- logger.info(Format.it("concurrency level: \t%1d",
concurrencyLevel));
- scheduleDisposalEvery(Every.seconds(10));
+
cacheService.init(numberOfBuffers,size,initialCapacity,concurrencyLevel);
}
public static void init(int numberOfBuffers, int size) {
- init(numberOfBuffers, size, DEFAULT_INITIAL_CAPACITY,
DEFAULT_CONCURRENCY_LEVEL);
+ init(numberOfBuffers, size,
CacheService.DEFAULT_INITIAL_CAPACITY, CacheService.DEFAULT_CONCURRENCY_LEVEL);
}
public static Pointer putByteArray(String key, byte[] payload, int
expiresIn) {
- Pointer ptr = MemoryManager.store(payload, expiresIn);
- map.put(key, ptr);
- return ptr;
+ return cacheService.putByteArray(key,payload,expiresIn);
}
public static Pointer putByteArray(String key, byte[] payload) {
- return putByteArray(key, payload, 0);
+ return cacheService.putByteArray(key,payload);
}
public static Pointer put(String key, Object object) {
- return put(key, object, 0);
+ return cacheService.put(key,object);
}
public static Pointer put(String key, Object object, int expiresIn) {
- try {
- byte[] payload = serializer.serialize(object,
object.getClass());
- Pointer ptr = putByteArray(key, payload, expiresIn);
- ptr.clazz = object.getClass();
- return ptr;
- } catch (IOException e) {
- logger.error(e.getMessage());
- return null;
- }
+ return cacheService.put(key,object,expiresIn);
}
public static Pointer updateByteArray(String key, byte[] payload) {
- Pointer p = map.get(key);
- p = MemoryManager.update(p, payload);
- return p;
+ return cacheService.updateByteArray(key, payload);
}
public static Pointer update(String key, Object object) {
- Pointer p = map.get(key);
- try {
- p = MemoryManager.update(p,
serializer.serialize(object, object.getClass()));
- p.clazz = object.getClass();
- return p;
- } catch (IOException e) {
- logger.error(e.getMessage());
- return null;
- }
+ return cacheService.update(key, object);
}
public static byte[] retrieveByteArray(String key) {
- Pointer ptr = getPointer(key);
- if (ptr == null) return null;
- if (ptr.expired() || ptr.free) {
- map.remove(key);
- if (!ptr.free) {
- MemoryManager.free(ptr);
- }
- return null;
- } else {
- return MemoryManager.retrieve(ptr);
- }
+ return cacheService.retrieveByteArray(key);
}
public static Object retrieve(String key) {
- Pointer ptr = getPointer(key);
- if (ptr == null) return null;
- if (ptr.expired() || ptr.free) {
- map.remove(key);
- if (!ptr.free) {
- MemoryManager.free(ptr);
- }
- return null;
- } else {
- try {
- return
serializer.deserialize(MemoryManager.retrieve(ptr),ptr.clazz);
- } catch (EOFException e) {
- logger.error(e.getMessage());
- } catch (IOException e) {
- logger.error(e.getMessage());
- } catch (ClassNotFoundException e) {
- logger.error(e.getMessage());
- } catch (InstantiationException e) {
- logger.error(e.getMessage());
- } catch (IllegalAccessException e) {
- logger.error(e.getMessage());
- }
- }
- return null;
+ return cacheService.retrieve(key);
}
public static Pointer getPointer(String key) {
- return map.get(key);
+ return cacheService.getPointer(key);
}
public static void free(String key) {
- Pointer p = map.remove(key);
- if (p != null) {
- MemoryManager.free(p);
- }
+ cacheService.free(key);
}
public static void free(Pointer pointer) {
- MemoryManager.free(pointer);
+ cacheService.free(pointer);
}
public static void collectExpired() {
- MemoryManager.collectExpired();
- // still have to look for orphan (storing references to freed
pointers) map entries
+ cacheService.collectExpired();
}
public static void collectLFU() {
- MemoryManager.collectLFU();
- // can possibly clear one whole buffer if it's too fragmented -
investigate
+ cacheService.collectLFU();
}
public static void collectAll() {
- Thread thread = new Thread(){
- public void run(){
- logger.info("begin disposal");
- collectExpired();
- collectLFU();
- logger.info("disposal complete");
- }
- };
- thread.start();
+ cacheService.collectAll();
}
public static void clear() {
- map.clear();
- MemoryManager.clear();
- logger.info("Cache cleared");
+ cacheService.clear();
}
public static long entries() {
- return map.size();
+ return cacheService.entries();
}
private static void dump(OffHeapMemoryBuffer mem) {
- logger.info(Format.it("off-heap - buffer: \t%1d",
mem.bufferNumber));
- logger.info(Format.it("off-heap - allocated: \t%1s",
Ram.inMb(mem.capacity())));
- logger.info(Format.it("off-heap - used: \t%1s",
Ram.inMb(mem.used())));
- logger.info(Format.it("heap - max: \t%1s",
Ram.inMb(Runtime.getRuntime().maxMemory())));
- logger.info(Format.it("heap - allocated: \t%1s",
Ram.inMb(Runtime.getRuntime().totalMemory())));
- logger.info(Format.it("heap - free : \t%1s",
Ram.inMb(Runtime.getRuntime().freeMemory())));
- logger.info("************************************************");
+ cacheService.dump(mem);
}
public static void dump() {
- if (!logger.isInfoEnabled())
- return;
-
- logger.info("*** DirectMemory statistics ********************");
-
- for (OffHeapMemoryBuffer mem : MemoryManager.buffers) {
- dump(mem);
- }
+ cacheService.dump();
}
+
+ public static Serializer getSerializer() {
+ return cacheService.getSerializer();
+ }
+
+ public static MemoryManagerService getMemoryManager(){
+ return cacheService.getMemoryManager();
+ }
}
Added:
incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/cache/CacheService.java
URL:
http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/cache/CacheService.java?rev=1187543&view=auto
==============================================================================
---
incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/cache/CacheService.java
(added)
+++
incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/cache/CacheService.java
Fri Oct 21 21:14:51 2011
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.directmemory.cache;
+
+import java.util.concurrent.ConcurrentMap;
+import org.apache.directmemory.memory.MemoryManagerService;
+import org.apache.directmemory.memory.OffHeapMemoryBuffer;
+import org.apache.directmemory.memory.Pointer;
+import org.apache.directmemory.serialization.Serializer;
+
+public interface CacheService {
+
+ public static int DEFAULT_CONCURRENCY_LEVEL = 4;
+ public static int DEFAULT_INITIAL_CAPACITY = 100000;
+
+ public void init(int numberOfBuffers, int size, int initialCapacity, int
concurrencyLevel);
+
+ public void init(int numberOfBuffers, int size);
+
+ public void scheduleDisposalEvery(long l);
+
+ public Pointer putByteArray(String key, byte[] payload, int expiresIn);
+
+ public Pointer putByteArray(String key, byte[] payload);
+
+ public Pointer put(String key, Object object);
+
+ public Pointer put(String key, Object object, int expiresIn);
+
+ public Pointer updateByteArray(String key, byte[] payload);
+
+ public Pointer update(String key, Object object);
+
+ public byte[] retrieveByteArray(String key);
+
+ public Object retrieve(String key);
+
+ public Pointer getPointer(String key);
+
+ public void free(String key);
+
+ public void free(Pointer pointer);
+
+ public void collectExpired();
+
+ public void collectLFU();
+
+ public void collectAll();
+
+
+ public void clear();
+
+ public long entries();
+
+ public void dump(OffHeapMemoryBuffer mem);
+
+ public void dump();
+
+ public ConcurrentMap<String, Pointer> getMap();
+
+ public void setMap(ConcurrentMap<String, Pointer> map);
+
+ public Serializer getSerializer();
+
+ public MemoryManagerService getMemoryManager();
+
+ public void setMemoryManager(MemoryManagerService memoryManager);
+
+}
Copied:
incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/cache/CacheServiceImpl.java
(from r1186494,
incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/cache/Cache.java)
URL:
http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/cache/CacheServiceImpl.java?p2=incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/cache/CacheServiceImpl.java&p1=incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/cache/Cache.java&r1=1186494&r2=1187543&rev=1187543&view=diff
==============================================================================
---
incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/cache/Cache.java
(original)
+++
incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/cache/CacheServiceImpl.java
Fri Oct 21 21:14:51 2011
@@ -1,5 +1,3 @@
-package org.apache.directmemory.cache;
-
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -9,7 +7,7 @@ package org.apache.directmemory.cache;
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@@ -19,15 +17,19 @@ package org.apache.directmemory.cache;
* under the License.
*/
+package org.apache.directmemory.cache;
+
import java.io.EOFException;
import java.io.IOException;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentMap;
-
+import com.google.common.collect.MapMaker;
import org.apache.directmemory.measures.Every;
import org.apache.directmemory.measures.Ram;
import org.apache.directmemory.memory.MemoryManager;
+import org.apache.directmemory.memory.MemoryManagerService;
+import org.apache.directmemory.memory.MemoryManagerServiceImpl;
import org.apache.directmemory.memory.OffHeapMemoryBuffer;
import org.apache.directmemory.memory.Pointer;
import org.apache.directmemory.misc.Format;
@@ -36,209 +38,240 @@ import org.apache.directmemory.serializa
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.MapMaker;
+public class CacheServiceImpl implements CacheService {
+
+ private static Logger logger =
LoggerFactory.getLogger(CacheServiceImpl.class);
+ private ConcurrentMap<String, Pointer> map;
-public class Cache {
+ private Serializer serializer = new ProtoStuffSerializerV1();
+ private MemoryManagerService memoryManager = new MemoryManagerServiceImpl();
- private static Logger logger =
LoggerFactory.getLogger(MemoryManager.class);
- private static ConcurrentMap<String, Pointer> map;
-
- public static int DEFAULT_CONCURRENCY_LEVEL = 4;
- public static int DEFAULT_INITIAL_CAPACITY = 100000;
-
- public static Serializer serializer = new ProtoStuffSerializerV1();
-
- private Cache() {
- // not instantiable
- }
-
-
- private final static Timer timer = new Timer();
-
- public static void scheduleDisposalEvery(long l) {
- timer.schedule(new TimerTask() {
- public void run() {
- logger.info("begin scheduled disposal");
- collectExpired();
- collectLFU();
- logger.info("scheduled disposal complete");
- }
- }, l);
- logger.info("disposal scheduled every " + l + " milliseconds");
- }
-
- public static void init(int numberOfBuffers, int size, int
initialCapacity, int concurrencyLevel) {
- map = new MapMaker()
- .concurrencyLevel(concurrencyLevel)
- .initialCapacity(initialCapacity)
- .makeMap();
-
- logger.info("*** initializing
*******************************\r\n" + Format.logo());
- logger.info("************************************************");
- MemoryManager.init(numberOfBuffers, size);
- logger.info("initialized");
- logger.info(Format.it("number of buffer(s): \t%1d with %2s
each", numberOfBuffers, Ram.inMb(size)));
- logger.info(Format.it("initial capacity: \t%1d",
initialCapacity));
- logger.info(Format.it("concurrency level: \t%1d",
concurrencyLevel));
- scheduleDisposalEvery(Every.seconds(10));
- }
-
- public static void init(int numberOfBuffers, int size) {
- init(numberOfBuffers, size, DEFAULT_INITIAL_CAPACITY,
DEFAULT_CONCURRENCY_LEVEL);
- }
-
- public static Pointer putByteArray(String key, byte[] payload, int
expiresIn) {
- Pointer ptr = MemoryManager.store(payload, expiresIn);
- map.put(key, ptr);
- return ptr;
- }
-
- public static Pointer putByteArray(String key, byte[] payload) {
- return putByteArray(key, payload, 0);
- }
-
- public static Pointer put(String key, Object object) {
- return put(key, object, 0);
- }
-
- public static Pointer put(String key, Object object, int expiresIn) {
- try {
- byte[] payload = serializer.serialize(object,
object.getClass());
- Pointer ptr = putByteArray(key, payload, expiresIn);
- ptr.clazz = object.getClass();
- return ptr;
- } catch (IOException e) {
- logger.error(e.getMessage());
- return null;
- }
- }
-
- public static Pointer updateByteArray(String key, byte[] payload) {
- Pointer p = map.get(key);
- p = MemoryManager.update(p, payload);
- return p;
- }
-
- public static Pointer update(String key, Object object) {
- Pointer p = map.get(key);
- try {
- p = MemoryManager.update(p,
serializer.serialize(object, object.getClass()));
- p.clazz = object.getClass();
- return p;
- } catch (IOException e) {
- logger.error(e.getMessage());
- return null;
- }
- }
-
- public static byte[] retrieveByteArray(String key) {
- Pointer ptr = getPointer(key);
- if (ptr == null) return null;
- if (ptr.expired() || ptr.free) {
- map.remove(key);
- if (!ptr.free) {
- MemoryManager.free(ptr);
- }
- return null;
- } else {
- return MemoryManager.retrieve(ptr);
- }
- }
-
- public static Object retrieve(String key) {
- Pointer ptr = getPointer(key);
- if (ptr == null) return null;
- if (ptr.expired() || ptr.free) {
- map.remove(key);
- if (!ptr.free) {
- MemoryManager.free(ptr);
- }
- return null;
- } else {
- try {
- return
serializer.deserialize(MemoryManager.retrieve(ptr),ptr.clazz);
- } catch (EOFException e) {
- logger.error(e.getMessage());
- } catch (IOException e) {
- logger.error(e.getMessage());
- } catch (ClassNotFoundException e) {
- logger.error(e.getMessage());
- } catch (InstantiationException e) {
- logger.error(e.getMessage());
- } catch (IllegalAccessException e) {
- logger.error(e.getMessage());
- }
- }
- return null;
- }
-
- public static Pointer getPointer(String key) {
- return map.get(key);
- }
-
- public static void free(String key) {
- Pointer p = map.remove(key);
- if (p != null) {
- MemoryManager.free(p);
- }
- }
-
- public static void free(Pointer pointer) {
- MemoryManager.free(pointer);
- }
-
- public static void collectExpired() {
- MemoryManager.collectExpired();
- // still have to look for orphan (storing references to freed
pointers) map entries
- }
-
- public static void collectLFU() {
- MemoryManager.collectLFU();
- // can possibly clear one whole buffer if it's too fragmented -
investigate
- }
-
- public static void collectAll() {
- Thread thread = new Thread(){
- public void run(){
- logger.info("begin disposal");
- collectExpired();
- collectLFU();
- logger.info("disposal complete");
- }
- };
- thread.start();
- }
-
-
- public static void clear() {
- map.clear();
- MemoryManager.clear();
- logger.info("Cache cleared");
- }
-
- public static long entries() {
- return map.size();
- }
-
- private static void dump(OffHeapMemoryBuffer mem) {
- logger.info(Format.it("off-heap - buffer: \t%1d",
mem.bufferNumber));
- logger.info(Format.it("off-heap - allocated: \t%1s",
Ram.inMb(mem.capacity())));
- logger.info(Format.it("off-heap - used: \t%1s",
Ram.inMb(mem.used())));
- logger.info(Format.it("heap - max: \t%1s",
Ram.inMb(Runtime.getRuntime().maxMemory())));
- logger.info(Format.it("heap - allocated: \t%1s",
Ram.inMb(Runtime.getRuntime().totalMemory())));
- logger.info(Format.it("heap - free : \t%1s",
Ram.inMb(Runtime.getRuntime().freeMemory())));
- logger.info("************************************************");
- }
-
- public static void dump() {
- if (!logger.isInfoEnabled())
- return;
-
- logger.info("*** DirectMemory statistics ********************");
-
- for (OffHeapMemoryBuffer mem : MemoryManager.buffers) {
- dump(mem);
- }
- }
-
+ private final Timer timer = new Timer();
+
+
+ /**
+ * Constructor
+ */
+ public CacheServiceImpl() {
+ }
+
+ /**
+ * Constructor
+ *
+ * @param memoryManager
+ */
+ public CacheServiceImpl(MemoryManagerService memoryManager) {
+ this.memoryManager = memoryManager;
+ }
+
+
+ public void scheduleDisposalEvery(long l) {
+ timer.schedule(new TimerTask() {
+ public void run() {
+ logger.info("begin scheduled disposal");
+ collectExpired();
+ collectLFU();
+ logger.info("scheduled disposal complete");
+ }
+ }, l);
+ logger.info("disposal scheduled every " + l + " milliseconds");
+ }
+
+ public void init(int numberOfBuffers, int size, int initialCapacity, int
concurrencyLevel) {
+ map = new MapMaker()
+ .concurrencyLevel(concurrencyLevel)
+ .initialCapacity(initialCapacity)
+ .makeMap();
+
+ logger.info("*** initializing *******************************\r\n" +
Format.logo());
+ logger.info("************************************************");
+ memoryManager.init(numberOfBuffers, size);
+ logger.info("initialized");
+ logger.info(Format.it("number of buffer(s): \t%1d with %2s each",
numberOfBuffers, Ram.inMb(size)));
+ logger.info(Format.it("initial capacity: \t%1d", initialCapacity));
+ logger.info(Format.it("concurrency level: \t%1d", concurrencyLevel));
+ scheduleDisposalEvery(Every.seconds(10));
+ }
+
+ public void init(int numberOfBuffers, int size) {
+ init(numberOfBuffers, size, DEFAULT_INITIAL_CAPACITY,
DEFAULT_CONCURRENCY_LEVEL);
+ }
+
+ public Pointer putByteArray(String key, byte[] payload, int expiresIn) {
+ Pointer ptr = memoryManager.store(payload, expiresIn);
+ map.put(key, ptr);
+ return ptr;
+ }
+
+ public Pointer putByteArray(String key, byte[] payload) {
+ return putByteArray(key, payload, 0);
+ }
+
+ public Pointer put(String key, Object object) {
+ return put(key, object, 0);
+ }
+
+ public Pointer put(String key, Object object, int expiresIn) {
+ try {
+ byte[] payload = serializer.serialize(object, object.getClass());
+ Pointer ptr = putByteArray(key, payload, expiresIn);
+ ptr.clazz = object.getClass();
+ return ptr;
+ } catch (IOException e) {
+ logger.error(e.getMessage());
+ return null;
+ }
+ }
+
+ public Pointer updateByteArray(String key, byte[] payload) {
+ Pointer p = map.get(key);
+ p = MemoryManager.update(p, payload);
+ return p;
+ }
+
+ public Pointer update(String key, Object object) {
+ Pointer p = map.get(key);
+ try {
+ p = MemoryManager.update(p, serializer.serialize(object,
object.getClass()));
+ p.clazz = object.getClass();
+ return p;
+ } catch (IOException e) {
+ logger.error(e.getMessage());
+ return null;
+ }
+ }
+
+ public byte[] retrieveByteArray(String key) {
+ Pointer ptr = getPointer(key);
+ if (ptr == null) return null;
+ if (ptr.expired() || ptr.free) {
+ map.remove(key);
+ if (!ptr.free) {
+ MemoryManager.free(ptr);
+ }
+ return null;
+ } else {
+ return MemoryManager.retrieve(ptr);
+ }
+ }
+
+ public Object retrieve(String key) {
+ Pointer ptr = getPointer(key);
+ if (ptr == null) return null;
+ if (ptr.expired() || ptr.free) {
+ map.remove(key);
+ if (!ptr.free) {
+ MemoryManager.free(ptr);
+ }
+ return null;
+ } else {
+ try {
+ return serializer.deserialize(MemoryManager.retrieve(ptr), ptr.clazz);
+ } catch (EOFException e) {
+ logger.error(e.getMessage());
+ } catch (IOException e) {
+ logger.error(e.getMessage());
+ } catch (ClassNotFoundException e) {
+ logger.error(e.getMessage());
+ } catch (InstantiationException e) {
+ logger.error(e.getMessage());
+ } catch (IllegalAccessException e) {
+ logger.error(e.getMessage());
+ }
+ }
+ return null;
+ }
+
+ public Pointer getPointer(String key) {
+ return map.get(key);
+ }
+
+ public void free(String key) {
+ Pointer p = map.remove(key);
+ if (p != null) {
+ MemoryManager.free(p);
+ }
+ }
+
+ public void free(Pointer pointer) {
+ MemoryManager.free(pointer);
+ }
+
+ public void collectExpired() {
+ MemoryManager.collectExpired();
+ // still have to look for orphan (storing references to freed pointers)
map entries
+ }
+
+ public void collectLFU() {
+ MemoryManager.collectLFU();
+ // can possibly clear one whole buffer if it's too fragmented - investigate
+ }
+
+ public void collectAll() {
+ Thread thread = new Thread() {
+ public void run() {
+ logger.info("begin disposal");
+ collectExpired();
+ collectLFU();
+ logger.info("disposal complete");
+ }
+ };
+ thread.start();
+ }
+
+
+ public void clear() {
+ map.clear();
+ MemoryManager.clear();
+ logger.info("Cache cleared");
+ }
+
+ public long entries() {
+ return map.size();
+ }
+
+ public void dump(OffHeapMemoryBuffer mem) {
+ logger.info(Format.it("off-heap - buffer: \t%1d", mem.bufferNumber));
+ logger.info(Format.it("off-heap - allocated: \t%1s",
Ram.inMb(mem.capacity())));
+ logger.info(Format.it("off-heap - used: \t%1s",
Ram.inMb(mem.used())));
+ logger.info(Format.it("heap - max: \t%1s",
Ram.inMb(Runtime.getRuntime().maxMemory())));
+ logger.info(Format.it("heap - allocated: \t%1s",
Ram.inMb(Runtime.getRuntime().totalMemory())));
+ logger.info(Format.it("heap - free : \t%1s",
Ram.inMb(Runtime.getRuntime().freeMemory())));
+ logger.info("************************************************");
+ }
+
+ public void dump() {
+ if (!logger.isInfoEnabled())
+ return;
+
+ logger.info("*** DirectMemory statistics ********************");
+
+ for (OffHeapMemoryBuffer mem : memoryManager.getBuffers()) {
+ dump(mem);
+ }
+ }
+
+ public ConcurrentMap<String, Pointer> getMap() {
+ return map;
+ }
+
+ public void setMap(ConcurrentMap<String, Pointer> map) {
+ this.map = map;
+ }
+
+ public Serializer getSerializer() {
+ return serializer;
+ }
+
+ public void setSerializer(Serializer serializer) {
+ this.serializer = serializer;
+ }
+
+ public MemoryManagerService getMemoryManager() {
+ return memoryManager;
+ }
+
+ public void setMemoryManager(MemoryManagerService memoryManager) {
+ this.memoryManager = memoryManager;
+ }
}
Propchange:
incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/cache/CacheServiceImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/cache/CacheServiceImpl.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Modified:
incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManager.java
URL:
http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManager.java?rev=1187543&r1=1187542&r2=1187543&view=diff
==============================================================================
---
incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManager.java
(original)
+++
incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManager.java
Fri Oct 21 21:14:51 2011
@@ -20,42 +20,23 @@ package org.apache.directmemory.memory;
*/
import java.util.List;
-import java.util.Vector;
-
-import org.apache.directmemory.measures.Ram;
-import org.apache.directmemory.misc.Format;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MemoryManager {
private static Logger logger =
LoggerFactory.getLogger(MemoryManager.class);
- public static List<OffHeapMemoryBuffer> buffers = new
Vector<OffHeapMemoryBuffer>();
- public static OffHeapMemoryBuffer activeBuffer = null;
+ private static MemoryManagerService memoryManager = new
MemoryManagerServiceImpl();;
private MemoryManager() {
//static class
}
public static void init(int numberOfBuffers, int size) {
- for (int i = 0; i < numberOfBuffers; i++) {
- buffers.add(OffHeapMemoryBuffer.createNew(size, i));
- }
- activeBuffer = buffers.get(0);
- logger.info(Format.it("MemoryManager initialized - %d buffers,
%s each", numberOfBuffers, Ram.inMb(size)));
+ memoryManager.init(numberOfBuffers,size);
}
public static Pointer store(byte[] payload, int expiresIn) {
- Pointer p = activeBuffer.store(payload, expiresIn);
- if (p == null) {
- if (activeBuffer.bufferNumber+1 == buffers.size()) {
- return null;
- } else {
- // try next buffer
- activeBuffer =
buffers.get(activeBuffer.bufferNumber+1);
- p = activeBuffer.store(payload, expiresIn);
- }
- }
- return p;
+ return memoryManager.store(payload,expiresIn);
}
public static Pointer store(byte[] payload) {
@@ -63,54 +44,43 @@ public class MemoryManager {
}
public static Pointer update(Pointer pointer, byte[] payload) {
- Pointer p = activeBuffer.update(pointer, payload);
- if (p == null) {
- if (activeBuffer.bufferNumber == buffers.size()) {
- return null;
- } else {
- // try next buffer
- activeBuffer =
buffers.get(activeBuffer.bufferNumber+1);
- p = activeBuffer.store(payload);
- }
- }
- return p;
+ return memoryManager.update(pointer,payload);
}
public static byte[] retrieve(Pointer pointer) {
- return buffers.get(pointer.bufferNumber).retrieve(pointer);
+ return memoryManager.retrieve(pointer);
}
public static void free(Pointer pointer) {
- buffers.get(pointer.bufferNumber).free(pointer);
+ memoryManager.free(pointer);
}
public static void clear() {
- for (OffHeapMemoryBuffer buffer : buffers) {
- buffer.clear();
- }
- activeBuffer = buffers.get(0);
+ memoryManager.clear();
}
public static long capacity() {
- long totalCapacity = 0;
- for (OffHeapMemoryBuffer buffer : buffers) {
- totalCapacity += buffer.capacity();
- }
- return totalCapacity;
+ return memoryManager.capacity();
}
public static long collectExpired() {
- long disposed = 0;
- for (OffHeapMemoryBuffer buffer : buffers) {
- disposed += buffer.collectExpired();
- }
- return disposed;
+ return memoryManager.collectExpired();
}
public static void collectLFU() {
- for (OffHeapMemoryBuffer buf : MemoryManager.buffers) {
- buf.collectLFU(-1);
- }
+ memoryManager.collectLFU();
}
+ public static List<OffHeapMemoryBuffer> getBuffers() {
+ return memoryManager.getBuffers();
+ }
+
+
+ public static OffHeapMemoryBuffer getActiveBuffer() {
+ return memoryManager.getActiveBuffer();
+ }
+
+ public static MemoryManagerService getMemoryManager() {
+ return memoryManager;
+ }
}
Added:
incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerService.java
URL:
http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerService.java?rev=1187543&view=auto
==============================================================================
---
incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerService.java
(added)
+++
incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerService.java
Fri Oct 21 21:14:51 2011
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.directmemory.memory;
+
+
+import java.util.List;
+
+public interface MemoryManagerService {
+
+ public void init(int numberOfBuffers, int size);
+
+ public Pointer store(byte[] payload, int expiresIn);
+
+ public Pointer store(byte[] payload);
+
+ public Pointer update(Pointer pointer, byte[] payload);
+
+ public byte[] retrieve(Pointer pointer);
+
+ public void free(Pointer pointer);
+
+ public void clear();
+
+ public long capacity();
+
+ public long collectExpired();
+
+ public void collectLFU();
+
+ public List<OffHeapMemoryBuffer> getBuffers();
+
+ public OffHeapMemoryBuffer getActiveBuffer();
+
+}
Added:
incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerServiceImpl.java
URL:
http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerServiceImpl.java?rev=1187543&view=auto
==============================================================================
---
incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerServiceImpl.java
(added)
+++
incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerServiceImpl.java
Fri Oct 21 21:14:51 2011
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.directmemory.memory;
+
+import java.util.List;
+import java.util.Vector;
+import org.apache.directmemory.measures.Ram;
+import org.apache.directmemory.misc.Format;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MemoryManagerServiceImpl implements MemoryManagerService {
+
+ private static Logger logger = LoggerFactory.getLogger(MemoryManager.class);
+ public List<OffHeapMemoryBuffer> buffers = new Vector<OffHeapMemoryBuffer>();
+ public OffHeapMemoryBuffer activeBuffer = null;
+
+ public MemoryManagerServiceImpl() {
+ }
+
+ public void init(int numberOfBuffers, int size) {
+ for (int i = 0; i < numberOfBuffers; i++) {
+ buffers.add(OffHeapMemoryBuffer.createNew(size, i));
+ }
+ activeBuffer = buffers.get(0);
+ logger.info(Format.it("MemoryManager initialized - %d buffers, %s each",
numberOfBuffers, Ram.inMb(size)));
+ }
+
+ public Pointer store(byte[] payload, int expiresIn) {
+ Pointer p = activeBuffer.store(payload, expiresIn);
+ if (p == null) {
+ if (activeBuffer.bufferNumber + 1 == buffers.size()) {
+ return null;
+ } else {
+ // try next buffer
+ activeBuffer = buffers.get(activeBuffer.bufferNumber + 1);
+ p = activeBuffer.store(payload, expiresIn);
+ }
+ }
+ return p;
+ }
+
+ public Pointer store(byte[] payload) {
+ return store(payload, 0);
+ }
+
+ public Pointer update(Pointer pointer, byte[] payload) {
+ Pointer p = activeBuffer.update(pointer, payload);
+ if (p == null) {
+ if (activeBuffer.bufferNumber == buffers.size()) {
+ return null;
+ } else {
+ // try next buffer
+ activeBuffer = buffers.get(activeBuffer.bufferNumber + 1);
+ p = activeBuffer.store(payload);
+ }
+ }
+ return p;
+ }
+
+ public byte[] retrieve(Pointer pointer) {
+ return buffers.get(pointer.bufferNumber).retrieve(pointer);
+ }
+
+ public void free(Pointer pointer) {
+ buffers.get(pointer.bufferNumber).free(pointer);
+ }
+
+ public void clear() {
+ for (OffHeapMemoryBuffer buffer : buffers) {
+ buffer.clear();
+ }
+ activeBuffer = buffers.get(0);
+ }
+
+ public long capacity() {
+ long totalCapacity = 0;
+ for (OffHeapMemoryBuffer buffer : buffers) {
+ totalCapacity += buffer.capacity();
+ }
+ return totalCapacity;
+ }
+
+ public long collectExpired() {
+ long disposed = 0;
+ for (OffHeapMemoryBuffer buffer : buffers) {
+ disposed += buffer.collectExpired();
+ }
+ return disposed;
+ }
+
+ public void collectLFU() {
+ for (OffHeapMemoryBuffer buf : buffers) {
+ buf.collectLFU(-1);
+ }
+ }
+
+ public List<OffHeapMemoryBuffer> getBuffers() {
+ return buffers;
+ }
+
+ public void setBuffers(List<OffHeapMemoryBuffer> buffers) {
+ this.buffers = buffers;
+ }
+
+ public OffHeapMemoryBuffer getActiveBuffer() {
+ return activeBuffer;
+ }
+
+ public void setActiveBuffer(OffHeapMemoryBuffer activeBuffer) {
+ this.activeBuffer = activeBuffer;
+ }
+}
Modified:
incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/test/ConcurrentTests2.java
URL:
http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/test/ConcurrentTests2.java?rev=1187543&r1=1187542&r2=1187543&view=diff
==============================================================================
---
incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/test/ConcurrentTests2.java
(original)
+++
incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/test/ConcurrentTests2.java
Fri Oct 21 21:14:51 2011
@@ -191,7 +191,7 @@ public class ConcurrentTests2 {
@AfterClass
public static void dump() {
- for (OffHeapMemoryBuffer mem : MemoryManager.buffers) {
+ for (OffHeapMemoryBuffer mem : MemoryManager.getBuffers()) {
dump(mem);
}
Modified:
incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/test/ConcurrentTests3.java
URL:
http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/test/ConcurrentTests3.java?rev=1187543&r1=1187542&r2=1187543&view=diff
==============================================================================
---
incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/test/ConcurrentTests3.java
(original)
+++
incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/test/ConcurrentTests3.java
Fri Oct 21 21:14:51 2011
@@ -222,7 +222,7 @@ public class ConcurrentTests3 {
@AfterClass
public static void dump() {
- for (OffHeapMemoryBuffer mem : MemoryManager.buffers) {
+ for (OffHeapMemoryBuffer mem : MemoryManager.getBuffers()) {
dump(mem);
}
Modified:
incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/test/MemoryManagerTests.java
URL:
http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/test/MemoryManagerTests.java?rev=1187543&r1=1187542&r2=1187543&view=diff
==============================================================================
---
incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/test/MemoryManagerTests.java
(original)
+++
incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/test/MemoryManagerTests.java
Fri Oct 21 21:14:51 2011
@@ -58,9 +58,9 @@ public class MemoryManagerTests {
logger.info("stored");
assertNotNull(p);
assertEquals(size,p.end);
- assertEquals(size, MemoryManager.activeBuffer.used());
+ assertEquals(size, MemoryManager.getActiveBuffer().used());
MemoryManager.free(p);
- assertEquals(0, MemoryManager.activeBuffer.used());
+ assertEquals(0, MemoryManager.getActiveBuffer().used());
logger.info("end");
}
@@ -85,7 +85,7 @@ public class MemoryManagerTests {
@Test
public void readTest() {
- for (OffHeapMemoryBuffer buffer : MemoryManager.buffers) {
+ for (OffHeapMemoryBuffer buffer : MemoryManager.getBuffers()) {
for (Pointer ptr : buffer.pointers) {
if (!ptr.free) {
byte[] res =
MemoryManager.retrieve(ptr);
Modified:
incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/test/Starter.java
URL:
http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/test/Starter.java?rev=1187543&r1=1187542&r2=1187543&view=diff
==============================================================================
---
incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/test/Starter.java
(original)
+++
incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/test/Starter.java
Fri Oct 21 21:14:51 2011
@@ -106,7 +106,7 @@ public class Starter {
logger.info("...done in " + (System.currentTimeMillis() -
start) + " msecs.");
logger.info("---------------------------------");
- for (OffHeapMemoryBuffer buf : MemoryManager.buffers) {
+ for (OffHeapMemoryBuffer buf : MemoryManager.getBuffers()) {
dump(buf);
}
}