Hi All, I was able to release the new ByteBufferAllocator interface and two of it's implementation : merging and slab's style allocation. All feedbacks welcome.
Some work remain on finding an easy way to switch from one implementation to the other (using a builder ?) and regarding documentation. I will actively act on the second one, and if someone has an idea for the first one don't hesitate. A small side note I will also put on the wiki : if you want to run DM with big number, you need to update the -XX:MaxDirectMemorySize parameter, for instance I'm running with -XX:MaxDirectMemorySize=2000m Best, Benoit. 2012/3/1 <[email protected]>: > Author: bperroud > Date: Thu Mar 1 11:41:34 2012 > New Revision: 1295522 > > URL: http://svn.apache.org/viewvc?rev=1295522&view=rev > Log: > DIRECTMEMORY-40, DIRECTMEMORY-60 : separate responsabilities more clearly, > set allocation with merging pointers as the default one, add a SLAB's style > allocator (fixed buffer size), mark OffHeapMemoryBuffer as deprecated > > Added: > > incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/ > > incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/AbstractByteBufferAllocator.java > > incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/ByteBufferAllocator.java > > incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/DirectByteBufferUtils.java > > incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/FixedSizeByteBufferAllocatorImpl.java > > incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/MergingByteBufferAllocatorImpl.java > > incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/SlabByteBufferAllocatorImpl.java > > incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/allocator/ > > incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/allocator/FixedSizeByteBufferAllocatorImplTest.java > > incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/allocator/MergingByteBufferAllocatorImplTest.java > > incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/allocator/SlabByteBufferAllocatorImplTest.java > Removed: > > incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/OffHeapMergingMemoryBufferImpl.java > > incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/MemoryManagerServiceImplWithMerginOHMBAndAllocationPolicyTest.java > > incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/MemoryManagerServiceImplWithMerginOHMBTest.java > > incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/OffHeapMergingMemoryBufferTest.java > Modified: > > incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/cache/CacheServiceImpl.java > > incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/AllocationPolicy.java > > incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManager.java > > incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerService.java > > incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerServiceImpl.java > > incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerServiceWithAllocationPolicyImpl.java > > incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/OffHeapMemoryBuffer.java > > incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/OffHeapMemoryBufferImpl.java > > incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/PointerImpl.java > > incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/RoundRobinAllocationPolicy.java > > incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/cache/CacheServiceImplTest.java > > incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/AbstractOffHeapMemoryBufferTest.java > > incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/Concurrent2Test.java > > incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/Concurrent3Test.java > > incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/MemoryManagerServiceImplTest.java > > incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/MemoryManagerTest.java > > incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/RoundRobinAllocationPolicyTest.java > > incubator/directmemory/trunk/directmemory-cache/src/test/java/org/apache/directmemory/memory/Starter.java > > incubator/directmemory/trunk/integrations/ehcache/src/main/java/org/apache/directmemory/ehcache/DirectMemoryCache.java > > incubator/directmemory/trunk/integrations/ehcache/src/main/java/org/apache/directmemory/ehcache/DirectMemoryStore.java > > incubator/directmemory/trunk/integrations/ehcache/src/test/java/org/apache/directmemory/ehcache/EHCacheTest.java > > incubator/directmemory/trunk/itests/osgi/src/test/java/org/apache/directmemory/tests/osgi/cache/CacheServiceExportingActivator.java > > Modified: > incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/cache/CacheServiceImpl.java > URL: > http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/cache/CacheServiceImpl.java?rev=1295522&r1=1295521&r2=1295522&view=diff > ============================================================================== > --- > incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/cache/CacheServiceImpl.java > (original) > +++ > incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/cache/CacheServiceImpl.java > Thu Mar 1 11:41:34 2012 > @@ -295,7 +295,17 @@ public class CacheServiceImpl<K, V> > logger.info( format( "off-heap - buffer: \t%1d", > mem.getBufferNumber() ) ); > logger.info( format( "off-heap - allocated: \t%1s", Ram.inMb( > mem.capacity() ) ) ); > logger.info( format( "off-heap - used: \t%1s", Ram.inMb( > mem.used() ) ) ); > - logger.info( format( "heap - max: \t%1s", Ram.inMb( > Runtime.getRuntime().maxMemory() ) ) ); > + logger.info( format( "heap - max: \t%1s", Ram.inMb( > Runtime.getRuntime().maxMemory() ) ) ); > + logger.info( format( "heap - allocated: \t%1s", Ram.inMb( > Runtime.getRuntime().totalMemory() ) ) ); > + logger.info( format( "heap - free : \t%1s", Ram.inMb( > Runtime.getRuntime().freeMemory() ) ) ); > + logger.info( "************************************************" ); > + } > + > + public void dump( MemoryManagerService<V> mms ) > + { > + logger.info( format( "off-heap - allocated: \t%1s", Ram.inMb( > mms.capacity() ) ) ); > + logger.info( format( "off-heap - used: \t%1s", Ram.inMb( > mms.used() ) ) ); > + logger.info( format( "heap - max: \t%1s", Ram.inMb( > Runtime.getRuntime().maxMemory() ) ) ); > logger.info( format( "heap - allocated: \t%1s", Ram.inMb( > Runtime.getRuntime().totalMemory() ) ) ); > logger.info( format( "heap - free : \t%1s", Ram.inMb( > Runtime.getRuntime().freeMemory() ) ) ); > logger.info( "************************************************" ); > @@ -311,10 +321,7 @@ public class CacheServiceImpl<K, V> > > logger.info( "*** DirectMemory statistics ********************" ); > > - for ( OffHeapMemoryBuffer<V> mem : memoryManager.getBuffers() ) > - { > - dump( mem ); > - } > + dump( memoryManager ); > } > > @Override > > Modified: > incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/AllocationPolicy.java > URL: > http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/AllocationPolicy.java?rev=1295522&r1=1295521&r2=1295522&view=diff > ============================================================================== > --- > incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/AllocationPolicy.java > (original) > +++ > incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/AllocationPolicy.java > Thu Mar 1 11:41:34 2012 > @@ -21,6 +21,8 @@ package org.apache.directmemory.memory; > > import java.util.List; > > +import org.apache.directmemory.memory.allocator.ByteBufferAllocator; > + > /** > * Interface describing the buffer allocation policy. > * The implementations will be initialized by setting the list of buffers > {@link #init(List)}, > @@ -30,7 +32,7 @@ import java.util.List; > * @author bperroud > * > */ > -public interface AllocationPolicy<T> > +public interface AllocationPolicy > { > > /** > @@ -38,16 +40,16 @@ public interface AllocationPolicy<T> > * > * @param buffers > */ > - void init( List<OffHeapMemoryBuffer<T>> buffers ); > + void init( List<ByteBufferAllocator> allocators ); > > /** > - * Returns the active buffer in which to allocate. > + * Returns the {@link ByteBufferAllocator} to use to allocate. > * > - * @param previouslyAllocatedBuffer : the previously allocated buffer, > or null if it's the first allocation > + * @param previousAllocator : the previously used {@link > ByteBufferAllocator}, or null if it's the first allocation > * @param allocationNumber : the number of time the allocation has > already failed. > - * @return the buffer to allocate, or null if allocation has failed. > + * @return the {@link ByteBufferAllocator} to use, or null if allocation > has failed. > */ > - OffHeapMemoryBuffer<T> getActiveBuffer( OffHeapMemoryBuffer<T> > previouslyAllocatedBuffer, int allocationNumber ); > + ByteBufferAllocator getActiveAllocator( ByteBufferAllocator > previousAllocator, int allocationNumber ); > > /** > * Reset internal state > > Modified: > incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManager.java > URL: > http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManager.java?rev=1295522&r1=1295521&r2=1295522&view=diff > ============================================================================== > --- > incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManager.java > (original) > +++ > incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManager.java > Thu Mar 1 11:41:34 2012 > @@ -22,8 +22,6 @@ package org.apache.directmemory.memory; > import org.slf4j.Logger; > import org.slf4j.LoggerFactory; > > -import java.util.List; > - > public class MemoryManager > { > private static Logger logger = LoggerFactory.getLogger( > MemoryManager.class ); > @@ -85,17 +83,6 @@ public class MemoryManager > memoryManager.collectLFU(); > } > > - public static List<OffHeapMemoryBuffer<Object>> getBuffers() > - { > - return memoryManager.getBuffers(); > - } > - > - > - public static OffHeapMemoryBuffer<Object> getActiveBuffer() > - { > - return memoryManager.getActiveBuffer(); > - } > - > public static MemoryManagerService<Object> getMemoryManager() > { > return memoryManager; > > Modified: > incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerService.java > URL: > http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerService.java?rev=1295522&r1=1295521&r2=1295522&view=diff > ============================================================================== > --- > incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerService.java > (original) > +++ > incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerService.java > Thu Mar 1 11:41:34 2012 > @@ -19,7 +19,6 @@ package org.apache.directmemory.memory; > * under the License. > */ > > -import java.util.List; > > public interface MemoryManagerService<V> > { > @@ -79,14 +78,12 @@ public interface MemoryManagerService<V> > > long capacity(); > > + long used(); > + > long collectExpired(); > > void collectLFU(); > > - List<OffHeapMemoryBuffer<V>> getBuffers(); > - > - OffHeapMemoryBuffer<V> getActiveBuffer(); > - > <T extends V> Pointer<V> allocate( Class<T> type, int size, long > expiresIn, long expires ); > - > + > } > > Modified: > incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerServiceImpl.java > URL: > http://svn.apache.org/viewvc/incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerServiceImpl.java?rev=1295522&r1=1295521&r2=1295522&view=diff > ============================================================================== > --- > incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerServiceImpl.java > (original) > +++ > incubator/directmemory/trunk/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerServiceImpl.java > Thu Mar 1 11:41:34 2012 > @@ -19,144 +19,352 @@ package org.apache.directmemory.memory; > * under the License. > */ > > +import static com.google.common.collect.Iterables.filter; > +import static com.google.common.collect.Iterables.limit; > +import static com.google.common.collect.Ordering.from; > import static java.lang.String.format; > > +import java.nio.BufferOverflowException; > +import java.nio.ByteBuffer; > import java.util.ArrayList; > +import java.util.Collections; > +import java.util.Comparator; > import java.util.List; > +import java.util.Set; > +import java.util.concurrent.ConcurrentHashMap; > +import java.util.concurrent.atomic.AtomicLong; > > import org.apache.directmemory.measures.Ram; > +import org.apache.directmemory.memory.allocator.ByteBufferAllocator; > +import > org.apache.directmemory.memory.allocator.MergingByteBufferAllocatorImpl; > import org.slf4j.Logger; > import org.slf4j.LoggerFactory; > > +import com.google.common.base.Predicate; > + > public class MemoryManagerServiceImpl<V> > implements MemoryManagerService<V> > { > > + protected static final long NEVER_EXPIRES = 0L; > + > protected static Logger logger = LoggerFactory.getLogger( > MemoryManager.class ); > > - protected List<OffHeapMemoryBuffer<V>> buffers = new > ArrayList<OffHeapMemoryBuffer<V>>(); > + private List<ByteBufferAllocator> allocators; > > - protected int activeBufferIndex = 0; > + private final Set<Pointer<V>> pointers = Collections.newSetFromMap( new > ConcurrentHashMap<Pointer<V>, Boolean>() ); > + > + protected int activeAllocatorIndex = 0; > + > + private final boolean returnNullWhenFull; > + > + protected final AtomicLong used = new AtomicLong( 0L ); > > public MemoryManagerServiceImpl() > { > + this( true ); > + } > + > + public MemoryManagerServiceImpl( final boolean returnNullWhenFull ) > + { > + this.returnNullWhenFull = returnNullWhenFull; > } > > + @Override > public void init( int numberOfBuffers, int size ) > { > - buffers = new ArrayList<OffHeapMemoryBuffer<V>>( numberOfBuffers ); > > + allocators = new ArrayList<ByteBufferAllocator>( numberOfBuffers ); > + > for ( int i = 0; i < numberOfBuffers; i++ ) > { > - final OffHeapMemoryBuffer<V> offHeapMemoryBuffer = > instanciateOffHeapMemoryBuffer( size, i ); > - buffers.add( offHeapMemoryBuffer ); > + final ByteBufferAllocator allocator = > instanciateByteBufferAllocator( i, size ); > + allocators.add( allocator ); > } > > logger.info( format( "MemoryManager initialized - %d buffers, %s > each", numberOfBuffers, Ram.inMb( size ) ) ); > } > > - protected OffHeapMemoryBuffer<V> instanciateOffHeapMemoryBuffer( int > size, int bufferNumber ) > + > + protected ByteBufferAllocator instanciateByteBufferAllocator( final int > allocatorNumber, final int size ) > { > - return OffHeapMemoryBufferImpl.createNew( size, bufferNumber ); > + final MergingByteBufferAllocatorImpl allocator = new > MergingByteBufferAllocatorImpl( allocatorNumber, size ); > + > + // Hack to ensure the pointers are always splitted as it was the > case before. > + allocator.setMinSizeThreshold( 0.0 ); > + allocator.setSizeRatioThreshold( 1.0 ); > + > + return allocator; > } > > - public OffHeapMemoryBuffer<V> getActiveBuffer() > + protected ByteBufferAllocator getAllocator( int allocatorIndex ) > { > - return buffers.get( activeBufferIndex ); > + return allocators.get( allocatorIndex ); > } > > + @Override > public Pointer<V> store( byte[] payload, int expiresIn ) > { > - Pointer<V> p = getActiveBuffer().store( payload, expiresIn ); > - if ( p == null ) > + > + int allocatorIndex = activeAllocatorIndex; > + > + ByteBuffer buffer = getAllocator( allocatorIndex ).allocate( > payload.length ); > + > + if (buffer == null && allocators.size() > 1) > { > - nextBuffer(); > - p = getActiveBuffer().store( payload, expiresIn ); > + allocatorIndex = nextAllocator(); > + buffer = getAllocator( allocatorIndex ).allocate( payload.length > ); > } > + > + if (buffer == null) > + { > + if (returnsNullWhenFull()) > + { > + return null; > + } > + else > + { > + throw new BufferOverflowException(); > + } > + } > + > + buffer.rewind(); > + buffer.put( payload ); > + > + Pointer<V> p = instanciatePointer( buffer, allocatorIndex, > expiresIn, NEVER_EXPIRES ); > + > + used.addAndGet( payload.length ); > + > return p; > } > > + @Override > public Pointer<V> store( byte[] payload ) > { > return store( payload, 0 ); > } > > + @Override > public Pointer<V> update( Pointer<V> pointer, byte[] payload ) > { > - return buffers.get( pointer.getBufferNumber() ).update( pointer, > payload ); > + free( pointer ); > + return store( payload ); > } > > - public byte[] retrieve( Pointer<V> pointer ) > + @Override > + public byte[] retrieve( final Pointer<V> pointer ) > { > - return buffers.get( pointer.getBufferNumber() ).retrieve( pointer ); > + // check if pointer has not been freed before > + if (!pointers.contains( pointer )) > + { > + return null; > + } > + > + pointer.hit(); > + > + final ByteBuffer buf = pointer.getDirectBuffer().asReadOnlyBuffer(); > + buf.rewind(); > + > + final byte[] swp = new byte[buf.limit()]; > + buf.get( swp ); > + return swp; > } > > - public void free( Pointer<V> pointer ) > + @Override > + public void free( final Pointer<V> pointer ) > { > - buffers.get( pointer.getBufferNumber() ).free( pointer ); > + if ( !pointers.remove( pointer ) ) > + { > + // pointers has been already freed. > + //throw new IllegalArgumentException( "This pointer " + pointer > + " has already been freed" ); > + return; > + } > + > + getAllocator( pointer.getBufferNumber() ).free( > pointer.getDirectBuffer() ); > + > + used.addAndGet( - pointer.getCapacity() ); > + > + pointer.setFree( true ); > } > > + @Override > public void clear() > { > - for ( OffHeapMemoryBuffer<V> buffer : buffers ) > + pointers.clear(); > + for (ByteBufferAllocator allocator : allocators) > { > - buffer.clear(); > + allocator.clear(); > } > - activeBufferIndex = 0; > } > > + @Override > public long capacity() > { > long totalCapacity = 0; > - for ( OffHeapMemoryBuffer<V> buffer : buffers ) > + for (ByteBufferAllocator allocator : allocators) > { > - totalCapacity += buffer.capacity(); > + totalCapacity += allocator.getCapacity(); > } > return totalCapacity; > } > + > + @Override > + public long used() > + { > + return used.get(); > + } > > - public long collectExpired() > + private final Predicate<Pointer<V>> relative = new > Predicate<Pointer<V>>() > { > - long disposed = 0; > - for ( OffHeapMemoryBuffer<V> buffer : buffers ) > + > + @Override > + public boolean apply( Pointer<V> input ) > { > - disposed += buffer.collectExpired(); > + return !input.isFree() && !input.isExpired(); > } > - return disposed; > - } > > - public void collectLFU() > + }; > + > + private final Predicate<Pointer<V>> absolute = new > Predicate<Pointer<V>>() > { > - for ( OffHeapMemoryBuffer<V> buf : buffers ) > + > + @Override > + public boolean apply( Pointer<V> input ) > { > - buf.collectLFU( -1 ); > + return !input.isFree() && !input.isExpired(); > } > - } > > - public List<OffHeapMemoryBuffer<V>> getBuffers() > + }; > + > + @Override > + public long collectExpired() > { > - return buffers; > + int limit = 50; > + return free( limit( filter( pointers, relative ), limit ) ) > + + free( limit( filter( pointers, absolute ), limit ) > ); > + > } > > - public void setBuffers( List<OffHeapMemoryBuffer<V>> buffers ) > + @Override > + public void collectLFU() > { > - this.buffers = buffers; > + > + int limit = pointers.size() / 10; > + > + Iterable<Pointer<V>> result = from( new Comparator<Pointer<V>>() > + { > + > + public int compare( Pointer<V> o1, Pointer<V> o2 ) > + { > + float f1 = o1.getFrequency(); > + float f2 = o2.getFrequency(); > + > + return Float.compare( f1, f2 ); > + }
