Author: xedin Date: Wed Oct 5 22:59:31 2011 New Revision: 1179467 URL: http://svn.apache.org/viewvc?rev=1179467&view=rev Log: off-heap cache to use sun.misc.Unsafe instead of JNA patch by Pavel Yaskevich; reviewed by Jonathan Ellis for CASSANDRA-3271
Added: cassandra/trunk/src/java/org/apache/cassandra/io/util/Memory.java Modified: cassandra/trunk/CHANGES.txt cassandra/trunk/src/java/org/apache/cassandra/cache/FreeableMemory.java cassandra/trunk/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java cassandra/trunk/src/java/org/apache/cassandra/io/util/MemoryInputStream.java cassandra/trunk/src/java/org/apache/cassandra/io/util/MemoryOutputStream.java cassandra/trunk/src/resources/org/apache/cassandra/cli/CliHelp.yaml cassandra/trunk/test/unit/org/apache/cassandra/cache/CacheProviderTest.java Modified: cassandra/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1179467&r1=1179466&r2=1179467&view=diff ============================================================================== --- cassandra/trunk/CHANGES.txt (original) +++ cassandra/trunk/CHANGES.txt Wed Oct 5 22:59:31 2011 @@ -3,7 +3,7 @@ * Thrift sockets are not properly buffered (CASSANDRA-3261) * performance improvement for bytebufferutil compare function (CASSANDRA-3286) * add system.versions ColumnFamily (CASSANDRA-3140) - + * off-heap cache to use sun.misc.Unsafe instead of JNA (CASSANDRA-3271) 1.0.0-final * fix bug preventing obsolete commitlog segments from being removed Modified: cassandra/trunk/src/java/org/apache/cassandra/cache/FreeableMemory.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cache/FreeableMemory.java?rev=1179467&r1=1179466&r2=1179467&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/cache/FreeableMemory.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/cache/FreeableMemory.java Wed Oct 5 22:59:31 2011 @@ -20,11 +20,9 @@ package org.apache.cassandra.cache; * */ - -import java.io.IOException; import java.util.concurrent.atomic.AtomicInteger; -import com.sun.jna.Memory; +import org.apache.cassandra.io.util.Memory; public class FreeableMemory extends Memory { @@ -58,23 +56,16 @@ public class FreeableMemory extends Memo free(); } - private void free() - { - assert peer != 0; - super.finalize(); // calls free and sets peer to zero - } - - /** - * avoid re-freeing already-freed memory - */ @Override - protected void finalize() + protected void finalize() throws Throwable { assert references.get() <= 0; assert peer == 0; + super.finalize(); } - - public byte getValidByte(long offset) + + @Override + public byte getByte(long offset) { assert peer != 0; return super.getByte(offset); Modified: cassandra/trunk/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java?rev=1179467&r1=1179466&r2=1179467&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java Wed Oct 5 22:59:31 2011 @@ -20,26 +20,11 @@ package org.apache.cassandra.cache; * */ -import org.apache.cassandra.config.ConfigurationException; import org.apache.cassandra.db.ColumnFamily; import org.apache.cassandra.db.DecoratedKey; -import com.sun.jna.Memory; - public class SerializingCacheProvider implements IRowCacheProvider { - public SerializingCacheProvider() throws ConfigurationException - { - try - { - Memory.class.getName(); - } - catch (NoClassDefFoundError e) - { - throw new ConfigurationException("Cannot initialize SerializationCache without JNA in the class path"); - } - } - public ICache<DecoratedKey, ColumnFamily> create(int capacity, String tableName, String cfName) { return new SerializingCache<DecoratedKey, ColumnFamily>(capacity, ColumnFamily.serializer(), tableName, cfName); Modified: cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java?rev=1179467&r1=1179466&r2=1179467&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java Wed Oct 5 22:59:31 2011 @@ -67,7 +67,7 @@ public final class CFMetaData public final static int DEFAULT_MIN_COMPACTION_THRESHOLD = 4; public final static int DEFAULT_MAX_COMPACTION_THRESHOLD = 32; public final static double DEFAULT_MERGE_SHARDS_CHANCE = 0.1; - public final static IRowCacheProvider DEFAULT_ROW_CACHE_PROVIDER = initDefaultRowCacheProvider(); + public final static IRowCacheProvider DEFAULT_ROW_CACHE_PROVIDER = new SerializingCacheProvider(); public final static String DEFAULT_COMPACTION_STRATEGY_CLASS = "SizeTieredCompactionStrategy"; public final static ByteBuffer DEFAULT_KEY_NAME = ByteBufferUtil.bytes("KEY"); @@ -97,18 +97,6 @@ public final class CFMetaData } } - private static IRowCacheProvider initDefaultRowCacheProvider() - { - try - { - return new SerializingCacheProvider(); - } - catch (ConfigurationException e) - { - return new ConcurrentLinkedHashCacheProvider(); - } - } - //REQUIRED public final Integer cfId; // internal id, never exposed to user public final String ksName; // name of keyspace Added: cassandra/trunk/src/java/org/apache/cassandra/io/util/Memory.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/util/Memory.java?rev=1179467&view=auto ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/io/util/Memory.java (added) +++ cassandra/trunk/src/java/org/apache/cassandra/io/util/Memory.java Wed Oct 5 22:59:31 2011 @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.cassandra.io.util; + +import sun.misc.Unsafe; + +import java.lang.reflect.Field; + +public class Memory +{ + private static final Unsafe unsafe; + + static + { + try + { + Field field = sun.misc.Unsafe.class.getDeclaredField("theUnsafe"); + field.setAccessible(true); + unsafe = (sun.misc.Unsafe) field.get(null); + } + catch (Exception e) + { + throw new AssertionError(e); + } + } + + protected long peer; + // size of the memory region + private final long size; + + protected Memory(long bytes) + { + size = bytes; + peer = unsafe.allocateMemory(size); + } + + public static Memory allocate(long bytes) + { + if (bytes < 0) + throw new IllegalArgumentException(); + + return new Memory(bytes); + } + + public void setByte(long offset, byte b) + { + checkPosition(offset); + unsafe.putByte(peer + offset, b); + } + + /** + * Transfers count bytes from buffer to Memory + * + * @param memoryOffset start offset in the memory + * @param buffer the data buffer + * @param bufferOffset start offset of the buffer + * @param count number of bytes to transfer + */ + public void setBytes(long memoryOffset, byte[] buffer, int bufferOffset, int count) + { + if (buffer == null) + throw new NullPointerException(); + else if (bufferOffset < 0 + || count < 0 + || bufferOffset + count > buffer.length) + throw new IndexOutOfBoundsException(); + else if (count == 0) + return; + + checkPosition(memoryOffset); + long end = memoryOffset + count; + checkPosition(end - 1); + while (memoryOffset < end) + unsafe.putByte(peer + memoryOffset++, buffer[bufferOffset++]); + } + + public byte getByte(long offset) + { + checkPosition(offset); + return unsafe.getByte(peer + offset); + } + + /** + * Transfers count bytes from Memory starting at memoryOffset to buffer starting at bufferOffset + * + * @param memoryOffset start offset in the memory + * @param buffer the data buffer + * @param bufferOffset start offset of the buffer + * @param count number of bytes to transfer + */ + public void getBytes(long memoryOffset, byte[] buffer, int bufferOffset, int count) + { + if (buffer == null) + throw new NullPointerException(); + else if (bufferOffset < 0 || count < 0 || count > buffer.length - bufferOffset) + throw new IndexOutOfBoundsException(); + else if (count == 0) + return; + + checkPosition(memoryOffset); + long end = memoryOffset + count; + checkPosition(end - 1); + while (memoryOffset < end) + buffer[bufferOffset++] = unsafe.getByte(peer + memoryOffset++); + } + + private void checkPosition(long offset) + { + if (peer == 0) + throw new IllegalStateException("Memory was freed"); + + if (offset < 0 || offset >= size) + throw new IndexOutOfBoundsException("Illegal offset: " + offset + ", size: " + size); + } + + public void free() + { + assert peer != 0; + unsafe.freeMemory(peer); + peer = 0; + } + + @Override + protected void finalize() throws Throwable + { + try + { + if (peer != 0) + free(); + } + finally + { + super.finalize(); + } + } + + public long size() + { + return size; + } +} + Modified: cassandra/trunk/src/java/org/apache/cassandra/io/util/MemoryInputStream.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/util/MemoryInputStream.java?rev=1179467&r1=1179466&r2=1179467&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/io/util/MemoryInputStream.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/io/util/MemoryInputStream.java Wed Oct 5 22:59:31 2011 @@ -38,9 +38,15 @@ public class MemoryInputStream extends A public int read() throws IOException { - return mem.getValidByte(position++) & 0xFF; + return mem.getByte(position++) & 0xFF; } - + + public void readFully(byte[] buffer, int offset, int count) throws IOException + { + mem.getBytes(position, buffer, offset, count); + position += count; + } + protected void seekInternal(int pos) { position = pos; Modified: cassandra/trunk/src/java/org/apache/cassandra/io/util/MemoryOutputStream.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/util/MemoryOutputStream.java?rev=1179467&r1=1179466&r2=1179467&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/io/util/MemoryOutputStream.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/io/util/MemoryOutputStream.java Wed Oct 5 22:59:31 2011 @@ -21,10 +21,9 @@ package org.apache.cassandra.io.util; */ +import java.io.IOException; import java.io.OutputStream; -import com.sun.jna.Memory; - /** * This class provides a way to stream the writes into the {@link Memory} */ @@ -39,16 +38,20 @@ public class MemoryOutputStream extends this.mem = mem; } - @Override public void write(int b) { - mem.setByte(this.position, (byte)b); - this.position++; + mem.setByte(position++, (byte) b); } - + + @Override + public void write(byte[] b, int off, int len) throws IOException + { + mem.setBytes(position, b, off, len); + position += len; + } + public int position() { - return this.position; + return position; } - } Modified: cassandra/trunk/src/resources/org/apache/cassandra/cli/CliHelp.yaml URL: http://svn.apache.org/viewvc/cassandra/trunk/src/resources/org/apache/cassandra/cli/CliHelp.yaml?rev=1179467&r1=1179466&r2=1179467&view=diff ============================================================================== --- cassandra/trunk/src/resources/org/apache/cassandra/cli/CliHelp.yaml (original) +++ cassandra/trunk/src/resources/org/apache/cassandra/cli/CliHelp.yaml Wed Oct 5 22:59:31 2011 @@ -544,13 +544,12 @@ commands: Supported values are: - ConcurrentLinkedHashCacheProvider - - SerializingCacheProvider (requires JNA) + - SerializingCacheProvider It is also valid to specify the fully-qualified class name to a class that implements org.apache.cassandra.cache.IRowCacheProvider. - row_cache_provider defaults to SerializingCacheProvider if you have JNA - enabled, otherwise ConcurrentLinkedHashCacheProvider. + row_cache_provider defaults to SerializingCacheProvider. SerializingCacheProvider serialises the contents of the row and stores it in native memory, i.e., off the JVM Heap. Serialized rows take significantly less memory than "live" rows in the JVM, so you can cache @@ -805,19 +804,17 @@ commands: Supported values are: - ConcurrentLinkedHashCacheProvider - - SerializingCacheProvider (requires JNA) + - SerializingCacheProvider It is also valid to specify the fully-qualified class name to a class that implements org.apache.cassandra.cache.IRowCacheProvider. - row_cache_provider defaults to ConcurrentLinkedHashCacheProvider, - but if you have JNA installed you should usually use - SerializingCacheProvider, which serialises the contents of the - row and stores it in native memory, i.e., off the JVM - Heap. Serialized rows take significantly less memory than - "live" rows in the JVM, so you can cache more rows in a given - memory footprint. And storing the cache off-heap means you - can use smaller heap sizes, reducing the impact of GC pauses. + row_cache_provider defaults to SerializingCacheProvider. + SerializingCacheProvider serialises the contents of the row and stores + it in native memory, i.e., off the JVM Heap. Serialized rows take + significantly less memory than "live" rows in the JVM, so you can cache + more rows in a given memory footprint. And storing the cache off-heap + means you can use smaller heap sizes, reducing the impact of GC pauses. - compression_options: Options related to compression. Options have the form {key:value}. Modified: cassandra/trunk/test/unit/org/apache/cassandra/cache/CacheProviderTest.java URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/cache/CacheProviderTest.java?rev=1179467&r1=1179466&r2=1179467&view=diff ============================================================================== --- cassandra/trunk/test/unit/org/apache/cassandra/cache/CacheProviderTest.java (original) +++ cassandra/trunk/test/unit/org/apache/cassandra/cache/CacheProviderTest.java Wed Oct 5 22:59:31 2011 @@ -117,6 +117,6 @@ public class CacheProviderTest extends S ICache<String, ColumnFamily> cache = new SerializingCache<String, ColumnFamily>(CAPACITY, ColumnFamily.serializer(), tableName, cfName); ColumnFamily cf = createCF(); simpleCase(cf, cache); - // concurrentCase(cf, cache); + concurrentCase(cf, cache); } }