Added: 
cassandra/trunk/src/java/org/apache/cassandra/io/compress/CompressionParameters.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/compress/CompressionParameters.java?rev=1163801&view=auto
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/io/compress/CompressionParameters.java
 (added)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/io/compress/CompressionParameters.java
 Wed Aug 31 20:30:20 2011
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.io.compress;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.lang.builder.EqualsBuilder;
+import org.apache.commons.lang.builder.HashCodeBuilder;
+
+import org.apache.cassandra.config.ConfigurationException;
+
+public class CompressionParameters
+{
+    public final static int DEFAULT_CHUNK_LENGTH = 65536;
+    public static final String CHUNK_LENGTH_PARAMETER = "chunk_length_kb";
+
+    public final Class<? extends ICompressor> compressorClass;
+    public final Map<String, String> compressionOptions;
+
+    public final transient ICompressor compressor;
+    public final transient int chunkLength;
+
+    public CompressionParameters(CharSequence compressorClassName, Map<? 
extends CharSequence, ? extends CharSequence> options) throws 
ConfigurationException
+    {
+        this(compressorClassName, copyOptions(options), -1);
+    }
+
+    public CompressionParameters(CharSequence compressorClassName, Map<String, 
String> options, int chunkLength) throws ConfigurationException
+    {
+        this(createCompressor(parseCompressorClass(compressorClassName), 
options), options, chunkLength < 0 ? getChunkLength(options) : chunkLength);
+        validateChunkLength();
+    }
+
+    public CompressionParameters(ICompressor compressor)
+    {
+        this(compressor, null, DEFAULT_CHUNK_LENGTH);
+    }
+
+    public CompressionParameters(ICompressor compressor, Map<String, String> 
compressionOptions, int chunkLength)
+    {
+        this.compressorClass = compressor == null ? null : 
compressor.getClass();
+        this.compressionOptions = compressor == null ? null : 
(compressionOptions == null ? Collections.<String, String>emptyMap() : 
compressionOptions);
+        this.chunkLength = chunkLength;
+        this.compressor = compressor;
+    }
+
+    private static Class<? extends ICompressor> 
parseCompressorClass(CharSequence cc) throws ConfigurationException
+    {
+        if (cc == null)
+            return null;
+
+        String className = cc.toString();
+        className = className.contains(".") ? className : 
"org.apache.cassandra.io.compress." + className;
+        try
+        {
+            return (Class<? extends ICompressor>)Class.forName(className);
+        }
+        catch (Exception e)
+        {
+            throw new ConfigurationException("Could not create Compression for 
type " + cc.toString(), e);
+        }
+    }
+
+    private static ICompressor createCompressor(Class<? extends ICompressor> 
compressorClass, Map<String, String> compressionOptions) throws 
ConfigurationException
+    {
+        if (compressorClass == null)
+            return null;
+
+        try
+        {
+            Method method = compressorClass.getMethod("create", Map.class);
+            return (ICompressor)method.invoke(null, compressionOptions);
+        }
+        catch (NoSuchMethodException e)
+        {
+            throw new ConfigurationException("create method not found", e);
+        }
+        catch (SecurityException e)
+        {
+            throw new ConfigurationException("Access forbiden", e);
+        }
+        catch (IllegalAccessException e)
+        {
+            throw new ConfigurationException("Cannot access method create in " 
+ compressorClass.getName(), e);
+        }
+        catch (InvocationTargetException e)
+        {
+            throw new ConfigurationException(compressorClass.getSimpleName() + 
".create() throwed an error", e);
+        }
+        catch (ExceptionInInitializerError e)
+        {
+            throw new ConfigurationException("Cannot initialize class " + 
compressorClass.getName());
+        }
+    }
+
+    private static Map<String, String> copyOptions(Map<? extends CharSequence, 
? extends CharSequence> co)
+    {
+        if (co == null || co.isEmpty())
+            return Collections.<String, String>emptyMap();
+
+        Map<String, String> compressionOptions = new HashMap<String, String>();
+        for (Map.Entry<? extends CharSequence, ? extends CharSequence> entry : 
co.entrySet())
+        {
+            compressionOptions.put(entry.getKey().toString(), 
entry.getValue().toString());
+        }
+        return compressionOptions;
+    }
+
+    private static int getChunkLength(Map<String, String> options) throws 
ConfigurationException
+    {
+        int chunkLength = DEFAULT_CHUNK_LENGTH;
+        if (options != null && options.containsKey(CHUNK_LENGTH_PARAMETER))
+        {
+            try
+            {
+                chunkLength = 
Integer.parseInt(options.get(CHUNK_LENGTH_PARAMETER));
+            }
+            catch (NumberFormatException e)
+            {
+                throw new ConfigurationException("Invalid value for " + 
CHUNK_LENGTH_PARAMETER, e);
+            }
+        }
+        return chunkLength;
+    }
+
+    // chunkLength must be a power of 2 because we assume so when
+    // computing the chunk number from an uncompressed file offset (see
+    // CompressedRandomAccessReader.decompresseChunk())
+    private void validateChunkLength() throws ConfigurationException
+    {
+        if (chunkLength <= 0)
+            throw new ConfigurationException("Invalid negative or null " + 
CHUNK_LENGTH_PARAMETER);
+
+        int c = chunkLength;
+        boolean found = false;
+        while (c != 0)
+        {
+            if ((c & 0x01) != 0)
+            {
+                if (found)
+                    throw new ConfigurationException(CHUNK_LENGTH_PARAMETER + 
" must be a power of 2");
+                else
+                    found = true;
+            }
+            c >>= 1;
+        }
+    }
+
+    @Override
+    public boolean equals(Object obj)
+    {
+        if (obj == this)
+        {
+            return true;
+        }
+        else if (obj == null || obj.getClass() != getClass())
+        {
+            return false;
+        }
+
+        CompressionParameters cp = (CompressionParameters) obj;
+        return new EqualsBuilder()
+            .append(compressorClass, cp.compressorClass)
+            .append(compressionOptions, cp.compressionOptions)
+            .isEquals();
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return new HashCodeBuilder(29, 1597)
+            .append(compressorClass)
+            .append(compressionOptions)
+            .toHashCode();
+    }
+}

Added: 
cassandra/trunk/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java?rev=1163801&view=auto
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java
 (added)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java
 Wed Aug 31 20:30:20 2011
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.io.compress;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.zip.DataFormatException;
+import java.util.zip.Deflater;
+import java.util.zip.Inflater;
+
+public class DeflateCompressor implements ICompressor
+{
+    public static final DeflateCompressor instance = new DeflateCompressor();
+
+    private final ThreadLocal<Deflater> deflater;
+    private final ThreadLocal<Inflater> inflater;
+
+    public static DeflateCompressor create(Map<String, String> 
compressionOptions)
+    {
+        // no specific options supported so far
+        return instance;
+    }
+
+    private DeflateCompressor()
+    {
+        deflater = new ThreadLocal<Deflater>()
+        {
+            @Override
+            protected Deflater initialValue()
+            {
+                return new Deflater();
+            }
+        };
+        inflater = new ThreadLocal<Inflater>()
+        {
+            @Override
+            protected Inflater initialValue()
+            {
+                return new Inflater();
+            }
+        };
+    }
+
+    public int initialCompressedBufferLength(int chunkLength)
+    {
+        return chunkLength;
+    }
+
+    public int compress(byte[] input, int inputOffset, int inputLength, 
ICompressor.WrappedArray output, int outputOffset) throws IOException
+    {
+        Deflater def = deflater.get();
+        def.reset();
+        def.setInput(input, inputOffset, inputLength);
+        def.finish();
+        if (def.needsInput())
+            return 0;
+
+        int offs = outputOffset;
+        while (true)
+        {
+            offs += def.deflate(output.buffer, offs, output.buffer.length - 
offs);
+            if (def.needsInput())
+            {
+                return offs - outputOffset;
+            }
+            else
+            {
+                // We're not done, output was too small. Increase it and 
continue
+                byte[] newBuffer = new byte[(output.buffer.length*4)/3 + 1];
+                System.arraycopy(output.buffer, 0, newBuffer, 0, offs);
+                output.buffer = newBuffer;
+            }
+        }
+    }
+
+    public int uncompress(byte[] input, int inputOffset, int inputLength, 
byte[] output, int outputOffset) throws IOException
+    {
+        Inflater inf = inflater.get();
+        inf.reset();
+        inf.setInput(input, inputOffset, inputLength);
+        if (inf.needsInput())
+            return 0;
+
+        // We assume output is big enough
+        try
+        {
+            return inf.inflate(output, outputOffset, output.length - 
outputOffset);
+        }
+        catch (DataFormatException e)
+        {
+            throw new IOException(e);
+        }
+    }
+}

Added: 
cassandra/trunk/src/java/org/apache/cassandra/io/compress/ICompressor.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/compress/ICompressor.java?rev=1163801&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/compress/ICompressor.java 
(added)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/compress/ICompressor.java 
Wed Aug 31 20:30:20 2011
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.io.compress;
+
+import java.io.IOException;
+import java.util.Map;
+
+public interface ICompressor
+{
+    public int initialCompressedBufferLength(int chunkLength);
+
+    public int compress(byte[] input, int inputOffset, int inputLength, 
WrappedArray output, int outputOffset) throws IOException;
+
+    public int uncompress(byte[] input, int inputOffset, int inputLength, 
byte[] output, int outputOffset) throws IOException;
+
+    /**
+     * A simple wrapper of a byte array.
+     * Not all implementation allows to know what is the maximum size after
+     * compression. This make it hard to size the ouput buffer for compress
+     * (and we want to reuse the buffer).  Instead we use this wrapped buffer
+     * so that compress can have the liberty to resize underlying array if
+     * need be.
+     */
+    public static class WrappedArray
+    {
+        public byte[] buffer;
+
+        public WrappedArray(byte[] buffer)
+        {
+            this.buffer = buffer;
+        }
+    }
+}

Added: 
cassandra/trunk/src/java/org/apache/cassandra/io/compress/SnappyCompressor.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/compress/SnappyCompressor.java?rev=1163801&view=auto
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/io/compress/SnappyCompressor.java 
(added)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/io/compress/SnappyCompressor.java 
Wed Aug 31 20:30:20 2011
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.io.compress;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.xerial.snappy.Snappy;
+
+public class SnappyCompressor implements ICompressor
+{
+    public static final SnappyCompressor instance = new SnappyCompressor();
+
+    public static SnappyCompressor create(Map<String, String> 
compressionOptions)
+    {
+        // no specific options supported so far
+        return instance;
+    }
+
+    public int initialCompressedBufferLength(int chunkLength)
+    {
+        return Snappy.maxCompressedLength(chunkLength);
+    }
+
+    public int compress(byte[] input, int inputOffset, int inputLength, 
ICompressor.WrappedArray output, int outputOffset) throws IOException
+    {
+        return Snappy.rawCompress(input, inputOffset, inputLength, 
output.buffer, outputOffset);
+    }
+
+    public int uncompress(byte[] input, int inputOffset, int inputLength, 
byte[] output, int outputOffset) throws IOException
+    {
+        return Snappy.rawUncompress(input, inputOffset, inputLength, output, 
outputOffset);
+    }
+}

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=1163801&r1=1163800&r2=1163801&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java 
Wed Aug 31 20:30:20 2011
@@ -65,7 +65,7 @@ public class SSTableWriter extends SSTab
     private static Set<Component> components(CFMetaData metadata)
     {
         Set<Component> components = new 
HashSet<Component>(Arrays.asList(Component.DATA, Component.FILTER, 
Component.PRIMARY_INDEX, Component.STATS));
-        if (metadata.useCompression())
+        if (metadata.compressionParameters().compressor != null)
             components.add(Component.COMPRESSION_INFO);
         return components;
     }
@@ -87,7 +87,8 @@ public class SSTableWriter extends SSTab
             dbuilder = SegmentedFile.getCompressedBuilder();
             dataFile = CompressedSequentialWriter.open(getFilename(),
                                                        
descriptor.filenameFor(Component.COMPRESSION_INFO),
-                                                       true);
+                                                       true,
+                                                       
metadata.compressionParameters());
         }
         else
         {

Modified: cassandra/trunk/src/resources/org/apache/cassandra/cli/CliHelp.yaml
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/resources/org/apache/cassandra/cli/CliHelp.yaml?rev=1163801&r1=1163800&r2=1163801&view=diff
==============================================================================
--- cassandra/trunk/src/resources/org/apache/cassandra/cli/CliHelp.yaml 
(original)
+++ cassandra/trunk/src/resources/org/apache/cassandra/cli/CliHelp.yaml Wed Aug 
31 20:30:20 2011
@@ -555,10 +555,25 @@ commands:
 
         SerializingCacheProvider serialises the contents of the row and
         stores the data off the JVM Heap. This may reduce the GC pressure.
-        NOTE: Thsi provider requires JNA.jar to be in the class path to
+        NOTE: This provider requires JNA.jar to be in the class path to
         enable native methods.
 
-        - compression: Use compression for SSTable data files. Accepts the 
values true and false.
+        - compression: Use compression for SSTable data files.
+
+        Supported values are:
+            - null: to disable compression
+            - SnappyCompressor: compression based on the Snappy algorithm
+            - DeflateCompressor: compression based on the deflate algorithm
+            (through Java native support)
+
+        It is also valid to specify the fully-qualified class name to a class
+        that implements org.apache.cassandra.io.ICompressor.
+
+        - compression_options: Optional additional options for compression.
+          Options have the form [{key:value}], and may depends on the
+          compression algorithm used. One generic option is chunk_length_kb
+          that allows to specify the size of the chunk used by compression
+          (default to 64, must be a power of 2).
 
         Examples:
         create column family Super4

Modified: cassandra/trunk/test/unit/org/apache/cassandra/SchemaLoader.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/SchemaLoader.java?rev=1163801&r1=1163800&r2=1163801&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/SchemaLoader.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/SchemaLoader.java Wed Aug 31 
20:30:20 2011
@@ -27,6 +27,8 @@ import com.google.common.base.Charsets;
 import org.apache.cassandra.config.*;
 import org.apache.cassandra.db.ColumnFamilyType;
 import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.io.compress.CompressionParameters;
+import org.apache.cassandra.io.compress.SnappyCompressor;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.locator.SimpleStrategy;
 import org.apache.cassandra.thrift.IndexType;
@@ -259,7 +261,7 @@ public class SchemaLoader
         {
             for (CFMetaData cfm : ksm.cfMetaData().values())
             {
-                cfm.compression(true);
+                cfm.compressionParameters(new 
CompressionParameters(SnappyCompressor.instance));
             }
         }
     }

Modified: 
cassandra/trunk/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java?rev=1163801&r1=1163800&r2=1163801&view=diff
==============================================================================
--- 
cassandra/trunk/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
 (original)
+++ 
cassandra/trunk/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
 Wed Aug 31 20:30:20 2011
@@ -37,7 +37,7 @@ public class CompressedRandomAccessReade
     {
         // test reset in current buffer or previous one
         testResetAndTruncate(false, 10);
-        testResetAndTruncate(false, CompressedSequentialWriter.CHUNK_LENGTH);
+        testResetAndTruncate(false, 
CompressionParameters.DEFAULT_CHUNK_LENGTH);
     }
 
     @Test
@@ -45,7 +45,7 @@ public class CompressedRandomAccessReade
     {
         // test reset in current buffer or previous one
         testResetAndTruncate(true, 10);
-        testResetAndTruncate(true, CompressedSequentialWriter.CHUNK_LENGTH);
+        testResetAndTruncate(true, CompressionParameters.DEFAULT_CHUNK_LENGTH);
     }
 
     private void testResetAndTruncate(boolean compressed, int junkSize) throws 
IOException
@@ -56,8 +56,8 @@ public class CompressedRandomAccessReade
         try
         {
             SequentialWriter writer = compressed
-                ? new CompressedSequentialWriter(f, filename + ".metadata", 
false)
-                : new SequentialWriter(f, 
CompressedSequentialWriter.CHUNK_LENGTH, false);
+                ? new CompressedSequentialWriter(f, filename + ".metadata", 
false, new CompressionParameters(SnappyCompressor.instance))
+                : new SequentialWriter(f, 
CompressionParameters.DEFAULT_CHUNK_LENGTH, false);
 
             writer.write("The quick ".getBytes());
             FileMark mark = writer.mark();
@@ -76,7 +76,7 @@ public class CompressedRandomAccessReade
             assert f.exists();
             RandomAccessReader reader = compressed
                 ? new CompressedRandomAccessReader(filename, new 
CompressionMetadata(filename + ".metadata", f.length()), false)
-                : new RandomAccessReader(f, 
CompressedSequentialWriter.CHUNK_LENGTH, false);
+                : new RandomAccessReader(f, 
CompressionParameters.DEFAULT_CHUNK_LENGTH, false);
             String expected = "The quick brown fox jumps over the lazy dog";
             assert reader.length() == expected.length();
             byte[] b = new byte[expected.length()];
@@ -105,7 +105,7 @@ public class CompressedRandomAccessReade
         File metadata = new File(file.getPath() + ".meta");
         metadata.deleteOnExit();
 
-        SequentialWriter writer = new CompressedSequentialWriter(file, 
metadata.getPath(), false);
+        SequentialWriter writer = new CompressedSequentialWriter(file, 
metadata.getPath(), false, new 
CompressionParameters(SnappyCompressor.instance));
 
         writer.write(CONTENT.getBytes());
         writer.close();

Modified: 
cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/Session.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/Session.java?rev=1163801&r1=1163800&r2=1163801&view=diff
==============================================================================
--- cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/Session.java 
(original)
+++ cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/Session.java 
Wed Aug 31 20:30:20 2011
@@ -84,7 +84,7 @@ public class Session implements Serializ
         availableOptions.addOption("W",  "no-replicate-on-write",false,  "Set 
replicate_on_write to false for counters. Only counter add with CL=ONE will 
work");
         availableOptions.addOption("V",  "average-size-values",  false,  
"Generate column values of average rather than specific size");
         availableOptions.addOption("T",  "send-to",              true,   "Send 
this as a request to the stress daemon at specified address.");
-        availableOptions.addOption("I",  "compression",          false,  "Use 
sstable compression when creating schema");
+        availableOptions.addOption("I",  "compression",          true,   
"Specify the compression to use for sstable, default:no compression");
         availableOptions.addOption("Q",  "query-names",          true,   
"Comma-separated list of column names to retrieve from each row.");
     }
 
@@ -101,7 +101,7 @@ public class Session implements Serializ
     private int retryTimes       = 10;
     private int port             = 9160;
     private int superColumns     = 1;
-    private boolean compression  = false;
+    private String compression   = null;
 
     private int progressInterval  = 10;
     private int keysPerCall       = 1000;
@@ -272,7 +272,7 @@ public class Session implements Serializ
                 replicateOnWrite = false;
 
             if (cmd.hasOption("I"))
-                compression = true;
+                compression = cmd.getOptionValue("I");
 
             averageSizeValues = cmd.hasOption("V");
 


Reply via email to