Added: cassandra/trunk/src/java/org/apache/cassandra/io/compress/CompressionParameters.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/compress/CompressionParameters.java?rev=1163801&view=auto ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/io/compress/CompressionParameters.java (added) +++ cassandra/trunk/src/java/org/apache/cassandra/io/compress/CompressionParameters.java Wed Aug 31 20:30:20 2011 @@ -0,0 +1,196 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cassandra.io.compress; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.lang.builder.EqualsBuilder; +import org.apache.commons.lang.builder.HashCodeBuilder; + +import org.apache.cassandra.config.ConfigurationException; + +public class CompressionParameters +{ + public final static int DEFAULT_CHUNK_LENGTH = 65536; + public static final String CHUNK_LENGTH_PARAMETER = "chunk_length_kb"; + + public final Class<? extends ICompressor> compressorClass; + public final Map<String, String> compressionOptions; + + public final transient ICompressor compressor; + public final transient int chunkLength; + + public CompressionParameters(CharSequence compressorClassName, Map<? extends CharSequence, ? extends CharSequence> options) throws ConfigurationException + { + this(compressorClassName, copyOptions(options), -1); + } + + public CompressionParameters(CharSequence compressorClassName, Map<String, String> options, int chunkLength) throws ConfigurationException + { + this(createCompressor(parseCompressorClass(compressorClassName), options), options, chunkLength < 0 ? getChunkLength(options) : chunkLength); + validateChunkLength(); + } + + public CompressionParameters(ICompressor compressor) + { + this(compressor, null, DEFAULT_CHUNK_LENGTH); + } + + public CompressionParameters(ICompressor compressor, Map<String, String> compressionOptions, int chunkLength) + { + this.compressorClass = compressor == null ? null : compressor.getClass(); + this.compressionOptions = compressor == null ? null : (compressionOptions == null ? Collections.<String, String>emptyMap() : compressionOptions); + this.chunkLength = chunkLength; + this.compressor = compressor; + } + + private static Class<? extends ICompressor> parseCompressorClass(CharSequence cc) throws ConfigurationException + { + if (cc == null) + return null; + + String className = cc.toString(); + className = className.contains(".") ? className : "org.apache.cassandra.io.compress." + className; + try + { + return (Class<? extends ICompressor>)Class.forName(className); + } + catch (Exception e) + { + throw new ConfigurationException("Could not create Compression for type " + cc.toString(), e); + } + } + + private static ICompressor createCompressor(Class<? extends ICompressor> compressorClass, Map<String, String> compressionOptions) throws ConfigurationException + { + if (compressorClass == null) + return null; + + try + { + Method method = compressorClass.getMethod("create", Map.class); + return (ICompressor)method.invoke(null, compressionOptions); + } + catch (NoSuchMethodException e) + { + throw new ConfigurationException("create method not found", e); + } + catch (SecurityException e) + { + throw new ConfigurationException("Access forbiden", e); + } + catch (IllegalAccessException e) + { + throw new ConfigurationException("Cannot access method create in " + compressorClass.getName(), e); + } + catch (InvocationTargetException e) + { + throw new ConfigurationException(compressorClass.getSimpleName() + ".create() throwed an error", e); + } + catch (ExceptionInInitializerError e) + { + throw new ConfigurationException("Cannot initialize class " + compressorClass.getName()); + } + } + + private static Map<String, String> copyOptions(Map<? extends CharSequence, ? extends CharSequence> co) + { + if (co == null || co.isEmpty()) + return Collections.<String, String>emptyMap(); + + Map<String, String> compressionOptions = new HashMap<String, String>(); + for (Map.Entry<? extends CharSequence, ? extends CharSequence> entry : co.entrySet()) + { + compressionOptions.put(entry.getKey().toString(), entry.getValue().toString()); + } + return compressionOptions; + } + + private static int getChunkLength(Map<String, String> options) throws ConfigurationException + { + int chunkLength = DEFAULT_CHUNK_LENGTH; + if (options != null && options.containsKey(CHUNK_LENGTH_PARAMETER)) + { + try + { + chunkLength = Integer.parseInt(options.get(CHUNK_LENGTH_PARAMETER)); + } + catch (NumberFormatException e) + { + throw new ConfigurationException("Invalid value for " + CHUNK_LENGTH_PARAMETER, e); + } + } + return chunkLength; + } + + // chunkLength must be a power of 2 because we assume so when + // computing the chunk number from an uncompressed file offset (see + // CompressedRandomAccessReader.decompresseChunk()) + private void validateChunkLength() throws ConfigurationException + { + if (chunkLength <= 0) + throw new ConfigurationException("Invalid negative or null " + CHUNK_LENGTH_PARAMETER); + + int c = chunkLength; + boolean found = false; + while (c != 0) + { + if ((c & 0x01) != 0) + { + if (found) + throw new ConfigurationException(CHUNK_LENGTH_PARAMETER + " must be a power of 2"); + else + found = true; + } + c >>= 1; + } + } + + @Override + public boolean equals(Object obj) + { + if (obj == this) + { + return true; + } + else if (obj == null || obj.getClass() != getClass()) + { + return false; + } + + CompressionParameters cp = (CompressionParameters) obj; + return new EqualsBuilder() + .append(compressorClass, cp.compressorClass) + .append(compressionOptions, cp.compressionOptions) + .isEquals(); + } + + @Override + public int hashCode() + { + return new HashCodeBuilder(29, 1597) + .append(compressorClass) + .append(compressionOptions) + .toHashCode(); + } +}
Added: cassandra/trunk/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java?rev=1163801&view=auto ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java (added) +++ cassandra/trunk/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java Wed Aug 31 20:30:20 2011 @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cassandra.io.compress; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.zip.DataFormatException; +import java.util.zip.Deflater; +import java.util.zip.Inflater; + +public class DeflateCompressor implements ICompressor +{ + public static final DeflateCompressor instance = new DeflateCompressor(); + + private final ThreadLocal<Deflater> deflater; + private final ThreadLocal<Inflater> inflater; + + public static DeflateCompressor create(Map<String, String> compressionOptions) + { + // no specific options supported so far + return instance; + } + + private DeflateCompressor() + { + deflater = new ThreadLocal<Deflater>() + { + @Override + protected Deflater initialValue() + { + return new Deflater(); + } + }; + inflater = new ThreadLocal<Inflater>() + { + @Override + protected Inflater initialValue() + { + return new Inflater(); + } + }; + } + + public int initialCompressedBufferLength(int chunkLength) + { + return chunkLength; + } + + public int compress(byte[] input, int inputOffset, int inputLength, ICompressor.WrappedArray output, int outputOffset) throws IOException + { + Deflater def = deflater.get(); + def.reset(); + def.setInput(input, inputOffset, inputLength); + def.finish(); + if (def.needsInput()) + return 0; + + int offs = outputOffset; + while (true) + { + offs += def.deflate(output.buffer, offs, output.buffer.length - offs); + if (def.needsInput()) + { + return offs - outputOffset; + } + else + { + // We're not done, output was too small. Increase it and continue + byte[] newBuffer = new byte[(output.buffer.length*4)/3 + 1]; + System.arraycopy(output.buffer, 0, newBuffer, 0, offs); + output.buffer = newBuffer; + } + } + } + + public int uncompress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset) throws IOException + { + Inflater inf = inflater.get(); + inf.reset(); + inf.setInput(input, inputOffset, inputLength); + if (inf.needsInput()) + return 0; + + // We assume output is big enough + try + { + return inf.inflate(output, outputOffset, output.length - outputOffset); + } + catch (DataFormatException e) + { + throw new IOException(e); + } + } +} Added: cassandra/trunk/src/java/org/apache/cassandra/io/compress/ICompressor.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/compress/ICompressor.java?rev=1163801&view=auto ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/io/compress/ICompressor.java (added) +++ cassandra/trunk/src/java/org/apache/cassandra/io/compress/ICompressor.java Wed Aug 31 20:30:20 2011 @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cassandra.io.compress; + +import java.io.IOException; +import java.util.Map; + +public interface ICompressor +{ + public int initialCompressedBufferLength(int chunkLength); + + public int compress(byte[] input, int inputOffset, int inputLength, WrappedArray output, int outputOffset) throws IOException; + + public int uncompress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset) throws IOException; + + /** + * A simple wrapper of a byte array. + * Not all implementation allows to know what is the maximum size after + * compression. This make it hard to size the ouput buffer for compress + * (and we want to reuse the buffer). Instead we use this wrapped buffer + * so that compress can have the liberty to resize underlying array if + * need be. + */ + public static class WrappedArray + { + public byte[] buffer; + + public WrappedArray(byte[] buffer) + { + this.buffer = buffer; + } + } +} Added: cassandra/trunk/src/java/org/apache/cassandra/io/compress/SnappyCompressor.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/compress/SnappyCompressor.java?rev=1163801&view=auto ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/io/compress/SnappyCompressor.java (added) +++ cassandra/trunk/src/java/org/apache/cassandra/io/compress/SnappyCompressor.java Wed Aug 31 20:30:20 2011 @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cassandra.io.compress; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.xerial.snappy.Snappy; + +public class SnappyCompressor implements ICompressor +{ + public static final SnappyCompressor instance = new SnappyCompressor(); + + public static SnappyCompressor create(Map<String, String> compressionOptions) + { + // no specific options supported so far + return instance; + } + + public int initialCompressedBufferLength(int chunkLength) + { + return Snappy.maxCompressedLength(chunkLength); + } + + public int compress(byte[] input, int inputOffset, int inputLength, ICompressor.WrappedArray output, int outputOffset) throws IOException + { + return Snappy.rawCompress(input, inputOffset, inputLength, output.buffer, outputOffset); + } + + public int uncompress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset) throws IOException + { + return Snappy.rawUncompress(input, inputOffset, inputLength, output, outputOffset); + } +} Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=1163801&r1=1163800&r2=1163801&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java Wed Aug 31 20:30:20 2011 @@ -65,7 +65,7 @@ public class SSTableWriter extends SSTab private static Set<Component> components(CFMetaData metadata) { Set<Component> components = new HashSet<Component>(Arrays.asList(Component.DATA, Component.FILTER, Component.PRIMARY_INDEX, Component.STATS)); - if (metadata.useCompression()) + if (metadata.compressionParameters().compressor != null) components.add(Component.COMPRESSION_INFO); return components; } @@ -87,7 +87,8 @@ public class SSTableWriter extends SSTab dbuilder = SegmentedFile.getCompressedBuilder(); dataFile = CompressedSequentialWriter.open(getFilename(), descriptor.filenameFor(Component.COMPRESSION_INFO), - true); + true, + metadata.compressionParameters()); } else { Modified: cassandra/trunk/src/resources/org/apache/cassandra/cli/CliHelp.yaml URL: http://svn.apache.org/viewvc/cassandra/trunk/src/resources/org/apache/cassandra/cli/CliHelp.yaml?rev=1163801&r1=1163800&r2=1163801&view=diff ============================================================================== --- cassandra/trunk/src/resources/org/apache/cassandra/cli/CliHelp.yaml (original) +++ cassandra/trunk/src/resources/org/apache/cassandra/cli/CliHelp.yaml Wed Aug 31 20:30:20 2011 @@ -555,10 +555,25 @@ commands: SerializingCacheProvider serialises the contents of the row and stores the data off the JVM Heap. This may reduce the GC pressure. - NOTE: Thsi provider requires JNA.jar to be in the class path to + NOTE: This provider requires JNA.jar to be in the class path to enable native methods. - - compression: Use compression for SSTable data files. Accepts the values true and false. + - compression: Use compression for SSTable data files. + + Supported values are: + - null: to disable compression + - SnappyCompressor: compression based on the Snappy algorithm + - DeflateCompressor: compression based on the deflate algorithm + (through Java native support) + + It is also valid to specify the fully-qualified class name to a class + that implements org.apache.cassandra.io.ICompressor. + + - compression_options: Optional additional options for compression. + Options have the form [{key:value}], and may depends on the + compression algorithm used. One generic option is chunk_length_kb + that allows to specify the size of the chunk used by compression + (default to 64, must be a power of 2). Examples: create column family Super4 Modified: cassandra/trunk/test/unit/org/apache/cassandra/SchemaLoader.java URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/SchemaLoader.java?rev=1163801&r1=1163800&r2=1163801&view=diff ============================================================================== --- cassandra/trunk/test/unit/org/apache/cassandra/SchemaLoader.java (original) +++ cassandra/trunk/test/unit/org/apache/cassandra/SchemaLoader.java Wed Aug 31 20:30:20 2011 @@ -27,6 +27,8 @@ import com.google.common.base.Charsets; import org.apache.cassandra.config.*; import org.apache.cassandra.db.ColumnFamilyType; import org.apache.cassandra.db.marshal.*; +import org.apache.cassandra.io.compress.CompressionParameters; +import org.apache.cassandra.io.compress.SnappyCompressor; import org.apache.cassandra.locator.AbstractReplicationStrategy; import org.apache.cassandra.locator.SimpleStrategy; import org.apache.cassandra.thrift.IndexType; @@ -259,7 +261,7 @@ public class SchemaLoader { for (CFMetaData cfm : ksm.cfMetaData().values()) { - cfm.compression(true); + cfm.compressionParameters(new CompressionParameters(SnappyCompressor.instance)); } } } Modified: cassandra/trunk/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java?rev=1163801&r1=1163800&r2=1163801&view=diff ============================================================================== --- cassandra/trunk/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java (original) +++ cassandra/trunk/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java Wed Aug 31 20:30:20 2011 @@ -37,7 +37,7 @@ public class CompressedRandomAccessReade { // test reset in current buffer or previous one testResetAndTruncate(false, 10); - testResetAndTruncate(false, CompressedSequentialWriter.CHUNK_LENGTH); + testResetAndTruncate(false, CompressionParameters.DEFAULT_CHUNK_LENGTH); } @Test @@ -45,7 +45,7 @@ public class CompressedRandomAccessReade { // test reset in current buffer or previous one testResetAndTruncate(true, 10); - testResetAndTruncate(true, CompressedSequentialWriter.CHUNK_LENGTH); + testResetAndTruncate(true, CompressionParameters.DEFAULT_CHUNK_LENGTH); } private void testResetAndTruncate(boolean compressed, int junkSize) throws IOException @@ -56,8 +56,8 @@ public class CompressedRandomAccessReade try { SequentialWriter writer = compressed - ? new CompressedSequentialWriter(f, filename + ".metadata", false) - : new SequentialWriter(f, CompressedSequentialWriter.CHUNK_LENGTH, false); + ? new CompressedSequentialWriter(f, filename + ".metadata", false, new CompressionParameters(SnappyCompressor.instance)) + : new SequentialWriter(f, CompressionParameters.DEFAULT_CHUNK_LENGTH, false); writer.write("The quick ".getBytes()); FileMark mark = writer.mark(); @@ -76,7 +76,7 @@ public class CompressedRandomAccessReade assert f.exists(); RandomAccessReader reader = compressed ? new CompressedRandomAccessReader(filename, new CompressionMetadata(filename + ".metadata", f.length()), false) - : new RandomAccessReader(f, CompressedSequentialWriter.CHUNK_LENGTH, false); + : new RandomAccessReader(f, CompressionParameters.DEFAULT_CHUNK_LENGTH, false); String expected = "The quick brown fox jumps over the lazy dog"; assert reader.length() == expected.length(); byte[] b = new byte[expected.length()]; @@ -105,7 +105,7 @@ public class CompressedRandomAccessReade File metadata = new File(file.getPath() + ".meta"); metadata.deleteOnExit(); - SequentialWriter writer = new CompressedSequentialWriter(file, metadata.getPath(), false); + SequentialWriter writer = new CompressedSequentialWriter(file, metadata.getPath(), false, new CompressionParameters(SnappyCompressor.instance)); writer.write(CONTENT.getBytes()); writer.close(); Modified: cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/Session.java URL: http://svn.apache.org/viewvc/cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/Session.java?rev=1163801&r1=1163800&r2=1163801&view=diff ============================================================================== --- cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/Session.java (original) +++ cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/Session.java Wed Aug 31 20:30:20 2011 @@ -84,7 +84,7 @@ public class Session implements Serializ availableOptions.addOption("W", "no-replicate-on-write",false, "Set replicate_on_write to false for counters. Only counter add with CL=ONE will work"); availableOptions.addOption("V", "average-size-values", false, "Generate column values of average rather than specific size"); availableOptions.addOption("T", "send-to", true, "Send this as a request to the stress daemon at specified address."); - availableOptions.addOption("I", "compression", false, "Use sstable compression when creating schema"); + availableOptions.addOption("I", "compression", true, "Specify the compression to use for sstable, default:no compression"); availableOptions.addOption("Q", "query-names", true, "Comma-separated list of column names to retrieve from each row."); } @@ -101,7 +101,7 @@ public class Session implements Serializ private int retryTimes = 10; private int port = 9160; private int superColumns = 1; - private boolean compression = false; + private String compression = null; private int progressInterval = 10; private int keysPerCall = 1000; @@ -272,7 +272,7 @@ public class Session implements Serializ replicateOnWrite = false; if (cmd.hasOption("I")) - compression = true; + compression = cmd.getOptionValue("I"); averageSizeValues = cmd.hasOption("V");