Updated Branches: refs/heads/cassandra-1.2 e8732139d -> ab8a28e36 refs/heads/trunk 369415c26 -> 1d6bed3ba
fix bulk-loading compressed sstables patch by Tyler Hobbs and yukim; reviewed by jbellis for CASSANDRA-5820 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ab8a28e3 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ab8a28e3 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ab8a28e3 Branch: refs/heads/cassandra-1.2 Commit: ab8a28e365da8ef515a40e838e773d67ad92a282 Parents: e873213 Author: Jonathan Ellis <jbel...@apache.org> Authored: Tue Jul 30 14:51:07 2013 -0500 Committer: Jonathan Ellis <jbel...@apache.org> Committed: Tue Jul 30 14:54:08 2013 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/io/sstable/SSTableReader.java | 2 +- .../io/sstable/SSTableSimpleUnsortedWriter.java | 11 ++- .../io/util/CompressedPoolingSegmentedFile.java | 7 +- .../io/util/CompressedSegmentedFile.java | 9 +- .../cassandra/io/sstable/SSTableLoaderTest.java | 98 ++++++++++++++++++++ 6 files changed, 120 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/ab8a28e3/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index bd85e88..8578855 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 1.2.9 + * fix bulk-loading compressed sstables (CASSANDRA-5820) * (Hadoop) fix quoting in CqlPagingRecordReader and CqlRecordWriter (CASSANDRA-5824) * update default LCS sstable size to 160MB (CASSANDRA-5727) http://git-wip-us.apache.org/repos/asf/cassandra/blob/ab8a28e3/src/java/org/apache/cassandra/io/sstable/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java index 7f52bcf..e9a03c8 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java @@ -571,7 +571,7 @@ public class SSTableReader extends SSTable if (!compression) throw new IllegalStateException(this + " is not compressed"); - return ((CompressedPoolingSegmentedFile)dfile).metadata; + return ((ICompressedFile) dfile).getMetadata(); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/ab8a28e3/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java index 9207276..48770d3 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java @@ -77,9 +77,7 @@ public class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter int bufferSizeInMB, CompressionParameters compressParameters) { - super(directory, new CFMetaData(keyspace, columnFamily, subComparator == null ? ColumnFamilyType.Standard : ColumnFamilyType.Super, comparator, subComparator).compressionParameters(compressParameters), partitioner); - this.bufferSize = bufferSizeInMB * 1024L * 1024L; - this.diskWriter.start(); + this(directory, new CFMetaData(keyspace, columnFamily, subComparator == null ? ColumnFamilyType.Standard : ColumnFamilyType.Super, comparator, subComparator).compressionParameters(compressParameters), partitioner, bufferSizeInMB); } public SSTableSimpleUnsortedWriter(File directory, @@ -93,6 +91,13 @@ public class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter this(directory, partitioner, keyspace, columnFamily, comparator, subComparator, bufferSizeInMB, new CompressionParameters(null)); } + public SSTableSimpleUnsortedWriter(File directory, CFMetaData metadata, IPartitioner partitioner, long bufferSizeInMB) + { + super(directory, metadata, partitioner); + this.bufferSize = bufferSizeInMB * 1024L * 1024L; + this.diskWriter.start(); + } + protected void writeRow(DecoratedKey key, ColumnFamily columnFamily) throws IOException { currentSize += key.key.remaining() + ColumnFamily.serializer.serializedSize(columnFamily, MessagingService.current_version) * 1.2; http://git-wip-us.apache.org/repos/asf/cassandra/blob/ab8a28e3/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java b/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java index 6d78b3e..e142f2e 100644 --- a/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java +++ b/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java @@ -20,7 +20,7 @@ package org.apache.cassandra.io.util; import org.apache.cassandra.io.compress.CompressedRandomAccessReader; import org.apache.cassandra.io.compress.CompressionMetadata; -public class CompressedPoolingSegmentedFile extends PoolingSegmentedFile +public class CompressedPoolingSegmentedFile extends PoolingSegmentedFile implements ICompressedFile { public final CompressionMetadata metadata; @@ -48,6 +48,11 @@ public class CompressedPoolingSegmentedFile extends PoolingSegmentedFile return CompressedRandomAccessReader.open(path, metadata, this); } + public CompressionMetadata getMetadata() + { + return metadata; + } + @Override public void cleanup() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/ab8a28e3/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java index f861e6b..f41b1aa 100644 --- a/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java +++ b/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java @@ -17,12 +17,10 @@ */ package org.apache.cassandra.io.util; -import java.io.File; - import org.apache.cassandra.io.compress.CompressedRandomAccessReader; import org.apache.cassandra.io.compress.CompressionMetadata; -public class CompressedSegmentedFile extends SegmentedFile +public class CompressedSegmentedFile extends SegmentedFile implements ICompressedFile { public final CompressionMetadata metadata; @@ -52,6 +50,11 @@ public class CompressedSegmentedFile extends SegmentedFile return reader; } + public CompressionMetadata getMetadata() + { + return metadata; + } + public void cleanup() { } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ab8a28e3/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java new file mode 100644 index 0000000..8fa886e --- /dev/null +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.sstable; + +import java.io.File; +import java.util.List; + +import com.google.common.io.Files; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.Util; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.Schema; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Row; +import org.apache.cassandra.db.Table; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.OutputHandler; + +import static junit.framework.Assert.assertEquals; + +public class SSTableLoaderTest extends SchemaLoader +{ + @BeforeClass + public static void setup() throws Exception + { + StorageService.instance.initServer(); + } + + @Test + public void testLoadingSSTable() throws Exception + { + File tempdir = Files.createTempDir(); + File dataDir = new File(tempdir.getAbsolutePath() + File.separator + "Keyspace1" + File.separator + "Standard1"); + assert dataDir.mkdirs(); + CFMetaData cfmeta = Schema.instance.getCFMetaData("Keyspace1", "Standard1"); + SSTableSimpleUnsortedWriter writer = new SSTableSimpleUnsortedWriter(dataDir, + cfmeta, + StorageService.getPartitioner(), + 1); + DecoratedKey key = Util.dk("key1"); + writer.newRow(key.key); + writer.addColumn(ByteBufferUtil.bytes("col1"), ByteBufferUtil.bytes(100), 1); + writer.close(); + + SSTableLoader loader = new SSTableLoader(dataDir, new SSTableLoader.Client() + { + @Override + public void init(String keyspace) + { + try + { + for (Range<Token> range : StorageService.instance.getLocalRanges("Keyspace1")) + addRangeForEndpoint(range, FBUtilities.getBroadcastAddress()); + setPartitioner(StorageService.getPartitioner()); + } catch (ConfigurationException e) + { + throw new RuntimeException(e); + } + } + + @Override + public boolean validateColumnFamily(String keyspace, String cfName) + { + return true; + } + }, new OutputHandler.SystemOutput(false, false)); + + loader.stream().get(); + + List<Row> rows = Util.getRangeSlice(Table.open("Keyspace1").getColumnFamilyStore("Standard1")); + assertEquals(1, rows.size()); + assertEquals(key, rows.get(0).key); + assertEquals(ByteBufferUtil.bytes(100), rows.get(0).cf.getColumn(ByteBufferUtil.bytes("col1")).value()); + } +}