Author: xedin
Date: Wed Oct  5 22:59:31 2011
New Revision: 1179467

URL: http://svn.apache.org/viewvc?rev=1179467&view=rev
Log:
off-heap cache to use sun.misc.Unsafe instead of JNA
patch by Pavel Yaskevich; reviewed by Jonathan Ellis for CASSANDRA-3271

Added:
    cassandra/trunk/src/java/org/apache/cassandra/io/util/Memory.java
Modified:
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/src/java/org/apache/cassandra/cache/FreeableMemory.java
    
cassandra/trunk/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java
    cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
    cassandra/trunk/src/java/org/apache/cassandra/io/util/MemoryInputStream.java
    
cassandra/trunk/src/java/org/apache/cassandra/io/util/MemoryOutputStream.java
    cassandra/trunk/src/resources/org/apache/cassandra/cli/CliHelp.yaml
    cassandra/trunk/test/unit/org/apache/cassandra/cache/CacheProviderTest.java

Modified: cassandra/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1179467&r1=1179466&r2=1179467&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Wed Oct  5 22:59:31 2011
@@ -3,7 +3,7 @@
  * Thrift sockets are not properly buffered (CASSANDRA-3261)
  * performance improvement for bytebufferutil compare function (CASSANDRA-3286)
  * add system.versions ColumnFamily (CASSANDRA-3140)
-
+ * off-heap cache to use sun.misc.Unsafe instead of JNA (CASSANDRA-3271)
 
 1.0.0-final
  * fix bug preventing obsolete commitlog segments from being removed

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/cache/FreeableMemory.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cache/FreeableMemory.java?rev=1179467&r1=1179466&r2=1179467&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cache/FreeableMemory.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cache/FreeableMemory.java Wed 
Oct  5 22:59:31 2011
@@ -20,11 +20,9 @@ package org.apache.cassandra.cache;
  * 
  */
 
-
-import java.io.IOException;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import com.sun.jna.Memory;
+import org.apache.cassandra.io.util.Memory;
 
 public class FreeableMemory extends Memory
 {
@@ -58,23 +56,16 @@ public class FreeableMemory extends Memo
             free();
     }
 
-    private void free()
-    {
-        assert peer != 0;
-        super.finalize(); // calls free and sets peer to zero
-    }
-
-    /**
-     * avoid re-freeing already-freed memory
-     */
     @Override
-    protected void finalize()
+    protected void finalize() throws Throwable
     {
         assert references.get() <= 0;
         assert peer == 0;
+        super.finalize();
     }
-    
-    public byte getValidByte(long offset)
+
+    @Override
+    public byte getByte(long offset)
     {
         assert peer != 0;
         return super.getByte(offset);

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java?rev=1179467&r1=1179466&r2=1179467&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java
 Wed Oct  5 22:59:31 2011
@@ -20,26 +20,11 @@ package org.apache.cassandra.cache;
  * 
  */
 
-import org.apache.cassandra.config.ConfigurationException;
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.DecoratedKey;
 
-import com.sun.jna.Memory;
-
 public class SerializingCacheProvider implements IRowCacheProvider
 {
-    public SerializingCacheProvider() throws ConfigurationException
-    {
-        try
-        {
-            Memory.class.getName();
-        }
-        catch (NoClassDefFoundError e)
-        {
-            throw new ConfigurationException("Cannot initialize 
SerializationCache without JNA in the class path");
-        }
-    }
-
     public ICache<DecoratedKey, ColumnFamily> create(int capacity, String 
tableName, String cfName)
     {
         return new SerializingCache<DecoratedKey, ColumnFamily>(capacity, 
ColumnFamily.serializer(), tableName, cfName);

Modified: cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java?rev=1179467&r1=1179466&r2=1179467&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java Wed 
Oct  5 22:59:31 2011
@@ -67,7 +67,7 @@ public final class CFMetaData
     public final static int DEFAULT_MIN_COMPACTION_THRESHOLD = 4;
     public final static int DEFAULT_MAX_COMPACTION_THRESHOLD = 32;
     public final static double DEFAULT_MERGE_SHARDS_CHANCE = 0.1;
-    public final static IRowCacheProvider DEFAULT_ROW_CACHE_PROVIDER = 
initDefaultRowCacheProvider();
+    public final static IRowCacheProvider DEFAULT_ROW_CACHE_PROVIDER = new 
SerializingCacheProvider();
     public final static String DEFAULT_COMPACTION_STRATEGY_CLASS = 
"SizeTieredCompactionStrategy";
     public final static ByteBuffer DEFAULT_KEY_NAME = 
ByteBufferUtil.bytes("KEY");
 
@@ -97,18 +97,6 @@ public final class CFMetaData
         }
     }
 
-    private static IRowCacheProvider initDefaultRowCacheProvider()
-    {
-        try
-        {
-            return new SerializingCacheProvider();
-        }
-        catch (ConfigurationException e)
-        {
-            return new ConcurrentLinkedHashCacheProvider();
-        }
-    }
-
     //REQUIRED
     public final Integer cfId;                        // internal id, never 
exposed to user
     public final String ksName;                       // name of keyspace

Added: cassandra/trunk/src/java/org/apache/cassandra/io/util/Memory.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/util/Memory.java?rev=1179467&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/util/Memory.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/util/Memory.java Wed Oct  
5 22:59:31 2011
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.cassandra.io.util;
+
+import sun.misc.Unsafe;
+
+import java.lang.reflect.Field;
+
+public class Memory
+{
+    private static final Unsafe unsafe;
+
+    static
+    {
+        try
+        {
+            Field field = sun.misc.Unsafe.class.getDeclaredField("theUnsafe");
+            field.setAccessible(true);
+            unsafe = (sun.misc.Unsafe) field.get(null);
+        }
+        catch (Exception e)
+        {
+            throw new AssertionError(e);
+        }
+    }
+
+    protected long peer;
+    // size of the memory region
+    private final long size;
+
+    protected Memory(long bytes)
+    {
+        size = bytes;
+        peer = unsafe.allocateMemory(size);
+    }
+
+    public static Memory allocate(long bytes)
+    {
+        if (bytes < 0)
+            throw new IllegalArgumentException();
+
+        return new Memory(bytes);
+    }
+
+    public void setByte(long offset, byte b)
+    {
+        checkPosition(offset);
+        unsafe.putByte(peer + offset, b);
+    }
+
+    /**
+     * Transfers count bytes from buffer to Memory
+     *
+     * @param memoryOffset start offset in the memory
+     * @param buffer the data buffer
+     * @param bufferOffset start offset of the buffer
+     * @param count number of bytes to transfer
+     */
+    public void setBytes(long memoryOffset, byte[] buffer, int bufferOffset, 
int count)
+    {
+        if (buffer == null)
+            throw new NullPointerException();
+        else if (bufferOffset < 0
+                 || count < 0
+                 || bufferOffset + count > buffer.length)
+            throw new IndexOutOfBoundsException();
+        else if (count == 0)
+            return;
+
+        checkPosition(memoryOffset);
+        long end = memoryOffset + count;
+        checkPosition(end - 1);
+        while (memoryOffset < end)
+            unsafe.putByte(peer + memoryOffset++, buffer[bufferOffset++]);
+    }
+
+    public byte getByte(long offset)
+    {
+        checkPosition(offset);
+        return unsafe.getByte(peer + offset);
+    }
+
+    /**
+     * Transfers count bytes from Memory starting at memoryOffset to buffer 
starting at bufferOffset
+     *
+     * @param memoryOffset start offset in the memory
+     * @param buffer the data buffer
+     * @param bufferOffset start offset of the buffer
+     * @param count number of bytes to transfer
+     */
+    public void getBytes(long memoryOffset, byte[] buffer, int bufferOffset, 
int count)
+    {
+        if (buffer == null)
+            throw new NullPointerException();
+        else if (bufferOffset < 0 || count < 0 || count > buffer.length - 
bufferOffset)
+            throw new IndexOutOfBoundsException();
+        else if (count == 0)
+            return;
+
+        checkPosition(memoryOffset);
+        long end = memoryOffset + count;
+        checkPosition(end - 1);
+        while (memoryOffset < end)
+            buffer[bufferOffset++] = unsafe.getByte(peer + memoryOffset++);
+    }
+
+    private void checkPosition(long offset)
+    {
+        if (peer == 0)
+            throw new IllegalStateException("Memory was freed");
+
+        if (offset < 0 || offset >= size)
+            throw new IndexOutOfBoundsException("Illegal offset: " + offset + 
", size: " + size);
+    }
+
+    public void free()
+    {
+        assert peer != 0;
+        unsafe.freeMemory(peer);
+        peer = 0;
+    }
+
+    @Override
+    protected void finalize() throws Throwable
+    {
+        try
+        {
+            if (peer != 0)
+                free();
+        }
+        finally
+        {
+            super.finalize();
+        }
+    }
+
+    public long size()
+    {
+        return size;
+    }
+}
+

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/io/util/MemoryInputStream.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/util/MemoryInputStream.java?rev=1179467&r1=1179466&r2=1179467&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/io/util/MemoryInputStream.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/io/util/MemoryInputStream.java 
Wed Oct  5 22:59:31 2011
@@ -38,9 +38,15 @@ public class MemoryInputStream extends A
     
     public int read() throws IOException
     {       
-        return mem.getValidByte(position++) & 0xFF;
+        return mem.getByte(position++) & 0xFF;
     }
-    
+
+    public void readFully(byte[] buffer, int offset, int count) throws 
IOException
+    {
+        mem.getBytes(position, buffer, offset, count);
+        position += count;
+    }
+
     protected void seekInternal(int pos)
     {
         position = pos;

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/io/util/MemoryOutputStream.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/util/MemoryOutputStream.java?rev=1179467&r1=1179466&r2=1179467&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/io/util/MemoryOutputStream.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/io/util/MemoryOutputStream.java 
Wed Oct  5 22:59:31 2011
@@ -21,10 +21,9 @@ package org.apache.cassandra.io.util;
  */
 
 
+import java.io.IOException;
 import java.io.OutputStream;
 
-import com.sun.jna.Memory;
-
 /**
  * This class provides a way to stream the writes into the {@link Memory}
  */
@@ -39,16 +38,20 @@ public class MemoryOutputStream extends 
         this.mem = mem;
     }
     
-    @Override
     public void write(int b)
     {
-        mem.setByte(this.position, (byte)b);
-        this.position++;
+        mem.setByte(position++, (byte) b);
     }
-    
+
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException
+    {
+        mem.setBytes(position, b, off, len);
+        position += len;
+    }
+
     public int position()
     {
-        return this.position;
+        return position;
     }
-    
 }

Modified: cassandra/trunk/src/resources/org/apache/cassandra/cli/CliHelp.yaml
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/resources/org/apache/cassandra/cli/CliHelp.yaml?rev=1179467&r1=1179466&r2=1179467&view=diff
==============================================================================
--- cassandra/trunk/src/resources/org/apache/cassandra/cli/CliHelp.yaml 
(original)
+++ cassandra/trunk/src/resources/org/apache/cassandra/cli/CliHelp.yaml Wed Oct 
 5 22:59:31 2011
@@ -544,13 +544,12 @@ commands:
 
         Supported values are:
             - ConcurrentLinkedHashCacheProvider
-            - SerializingCacheProvider (requires JNA)
+            - SerializingCacheProvider
 
         It is also valid to specify the fully-qualified class name to a class
         that implements org.apache.cassandra.cache.IRowCacheProvider.
 
-        row_cache_provider defaults to SerializingCacheProvider if you have JNA
-        enabled, otherwise ConcurrentLinkedHashCacheProvider.
+        row_cache_provider defaults to SerializingCacheProvider.
         SerializingCacheProvider serialises the contents of the row and stores
         it in native memory, i.e., off the JVM Heap. Serialized rows take
         significantly less memory than "live" rows in the JVM, so you can cache
@@ -805,19 +804,17 @@ commands:
 
         Supported values are:
             - ConcurrentLinkedHashCacheProvider
-            - SerializingCacheProvider (requires JNA)
+            - SerializingCacheProvider
 
         It is also valid to specify the fully-qualified class name to a class
         that implements org.apache.cassandra.cache.IRowCacheProvider.
 
-        row_cache_provider defaults to ConcurrentLinkedHashCacheProvider, 
-        but if you have JNA installed you should usually use
-        SerializingCacheProvider, which serialises the contents of the
-        row and stores it in native memory, i.e., off the JVM
-        Heap. Serialized rows take significantly less memory than
-        "live" rows in the JVM, so you can cache more rows in a given
-        memory footprint.  And storing the cache off-heap means you
-        can use smaller heap sizes, reducing the impact of GC pauses.
+        row_cache_provider defaults to SerializingCacheProvider.
+        SerializingCacheProvider serialises the contents of the row and stores
+        it in native memory, i.e., off the JVM Heap. Serialized rows take
+        significantly less memory than "live" rows in the JVM, so you can cache
+        more rows in a given memory footprint.  And storing the cache off-heap
+        means you can use smaller heap sizes, reducing the impact of GC pauses.
 
         - compression_options: Options related to compression.
           Options have the form {key:value}.

Modified: 
cassandra/trunk/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/cache/CacheProviderTest.java?rev=1179467&r1=1179466&r2=1179467&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/cache/CacheProviderTest.java 
(original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/cache/CacheProviderTest.java 
Wed Oct  5 22:59:31 2011
@@ -117,6 +117,6 @@ public class CacheProviderTest extends S
         ICache<String, ColumnFamily> cache = new SerializingCache<String, 
ColumnFamily>(CAPACITY, ColumnFamily.serializer(), tableName, cfName);
         ColumnFamily cf = createCF();
         simpleCase(cf, cache);
-        // concurrentCase(cf, cache);
+        concurrentCase(cf, cache);
     }
 }


Reply via email to