Updated Branches: refs/heads/cassandra-1.1 648e62e58 -> 5dac2086d
Support compression using BulkWriter Patch by goffinet, reviewed by Brandon Williams for CASSANDRA-3907 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5dac2086 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5dac2086 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5dac2086 Branch: refs/heads/cassandra-1.1 Commit: 5dac2086dcd4cb6845b814384d04952c986ac240 Parents: 648e62e Author: Chris Goffinet <c...@chrisgoffinet.com> Authored: Tue Feb 14 09:58:15 2012 -0800 Committer: Chris Goffinet <c...@chrisgoffinet.com> Committed: Tue Feb 14 09:58:15 2012 -0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/hadoop/BulkRecordWriter.java | 3 +- .../org/apache/cassandra/hadoop/ConfigHelper.java | 40 +++++++++++++++ .../io/sstable/SSTableSimpleUnsortedWriter.java | 17 ++++++- 4 files changed, 58 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dac2086/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 9481f5e..e3207ea 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -76,6 +76,7 @@ * Finish cleanup up tombstone purge code (CASSANDRA-3872) * Avoid NPE on aboarted stream-out sessions (CASSANDRA-3904) * BulkRecordWriter throws NPE for counter columns (CASSANDRA-3906) + * Support compression using BulkWriter (CASSANDRA-3907) 1.0.8 * fix race between cleanup and flush on secondary index CFSes (CASSANDRA-3712) http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dac2086/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java index aded15e..6f056c4 100644 --- a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java +++ b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java @@ -127,7 +127,8 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>> ConfigHelper.getOutputColumnFamily(conf), BytesType.instance, subcomparator, - Integer.valueOf(conf.get(BUFFER_SIZE_IN_MB, "64"))); + Integer.valueOf(conf.get(BUFFER_SIZE_IN_MB, "64")), + ConfigHelper.getOutputCompressionParamaters(conf)); this.loader = new SSTableLoader(outputdir, new ExternalClient(ConfigHelper.getOutputInitialAddress(conf), ConfigHelper.getOutputRpcPort(conf)), new NullOutputHandler()); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dac2086/src/java/org/apache/cassandra/hadoop/ConfigHelper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java index bafb195..c9ab70e 100644 --- a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java +++ b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java @@ -22,8 +22,11 @@ package org.apache.cassandra.hadoop; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import org.apache.cassandra.io.compress.CompressionParameters; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,6 +71,8 @@ public class ConfigHelper private static final String OUTPUT_INITIAL_THRIFT_ADDRESS = "cassandra.output.thrift.address"; private static final String READ_CONSISTENCY_LEVEL = "cassandra.consistencylevel.read"; private static final String WRITE_CONSISTENCY_LEVEL = "cassandra.consistencylevel.write"; + private static final String OUTPUT_COMPRESSION_CLASS = "cassandra.output.compression.class"; + private static final String OUTPUT_COMPRESSION_CHUNK_LENGTH = "cassandra.output.compression.length"; private static final Logger logger = LoggerFactory.getLogger(ConfigHelper.class); @@ -406,6 +411,41 @@ public class ConfigHelper } } + public static String getOutputCompressionClass(Configuration conf) + { + return conf.get(OUTPUT_COMPRESSION_CLASS); + } + + public static String getOutputCompressionChunkLength(Configuration conf) + { + return conf.get(OUTPUT_COMPRESSION_CHUNK_LENGTH, String.valueOf(CompressionParameters.DEFAULT_CHUNK_LENGTH)); + } + + public static void setOutputCompressionClass(Configuration conf, String classname) + { + conf.set(OUTPUT_COMPRESSION_CLASS, classname); + } + + public static void setOutputCompressionChunkLength(Configuration conf, String length) + { + conf.set(OUTPUT_COMPRESSION_CHUNK_LENGTH, length); + } + + public static CompressionParameters getOutputCompressionParamaters(Configuration conf) + { + if (getOutputCompressionClass(conf) == null) + return new CompressionParameters(null); + + Map<String, String> options = new HashMap<String, String>(); + options.put(CompressionParameters.SSTABLE_COMPRESSION, getOutputCompressionClass(conf)); + options.put(CompressionParameters.CHUNK_LENGTH_KB, getOutputCompressionChunkLength(conf)); + + try { + return CompressionParameters.create(options); + } catch (ConfigurationException e) { + throw new RuntimeException(e); + } + } public static Cassandra.Client getClientFromInputAddressList(Configuration conf) throws IOException { http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dac2086/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java index eadc16d..b869d1e 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java @@ -31,6 +31,7 @@ import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.*; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.io.compress.CompressionParameters; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.HeapAllocator; @@ -75,13 +76,25 @@ public class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter String columnFamily, AbstractType<?> comparator, AbstractType<?> subComparator, - int bufferSizeInMB) throws IOException + int bufferSizeInMB, + CompressionParameters compressParameters) throws IOException { - super(directory, new CFMetaData(keyspace, columnFamily, subComparator == null ? ColumnFamilyType.Standard : ColumnFamilyType.Super, comparator, subComparator), partitioner); + super(directory, new CFMetaData(keyspace, columnFamily, subComparator == null ? ColumnFamilyType.Standard : ColumnFamilyType.Super, comparator, subComparator).compressionParameters(compressParameters), partitioner); this.bufferSize = bufferSizeInMB * 1024L * 1024L; this.diskWriter.start(); } + public SSTableSimpleUnsortedWriter(File directory, + IPartitioner partitioner, + String keyspace, + String columnFamily, + AbstractType<?> comparator, + AbstractType<?> subComparator, + int bufferSizeInMB) throws IOException + { + this(directory, partitioner, keyspace, columnFamily, comparator, subComparator, bufferSizeInMB, new CompressionParameters(null)); + } + protected void writeRow(DecoratedKey key, ColumnFamily columnFamily) throws IOException { currentSize += key.key.remaining() + columnFamily.serializedSize() * 1.2;