Updated Branches:
  refs/heads/cassandra-1.2 e8732139d -> ab8a28e36
  refs/heads/trunk 369415c26 -> 1d6bed3ba


fix bulk-loading compressed sstables
patch by Tyler Hobbs and yukim; reviewed by jbellis for CASSANDRA-5820


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ab8a28e3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ab8a28e3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ab8a28e3

Branch: refs/heads/cassandra-1.2
Commit: ab8a28e365da8ef515a40e838e773d67ad92a282
Parents: e873213
Author: Jonathan Ellis <jbel...@apache.org>
Authored: Tue Jul 30 14:51:07 2013 -0500
Committer: Jonathan Ellis <jbel...@apache.org>
Committed: Tue Jul 30 14:54:08 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/io/sstable/SSTableReader.java     |  2 +-
 .../io/sstable/SSTableSimpleUnsortedWriter.java | 11 ++-
 .../io/util/CompressedPoolingSegmentedFile.java |  7 +-
 .../io/util/CompressedSegmentedFile.java        |  9 +-
 .../cassandra/io/sstable/SSTableLoaderTest.java | 98 ++++++++++++++++++++
 6 files changed, 120 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/ab8a28e3/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index bd85e88..8578855 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.2.9
+ * fix bulk-loading compressed sstables (CASSANDRA-5820)
  * (Hadoop) fix quoting in CqlPagingRecordReader and CqlRecordWriter 
    (CASSANDRA-5824)
  * update default LCS sstable size to 160MB (CASSANDRA-5727)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ab8a28e3/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 7f52bcf..e9a03c8 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -571,7 +571,7 @@ public class SSTableReader extends SSTable
         if (!compression)
             throw new IllegalStateException(this + " is not compressed");
 
-        return ((CompressedPoolingSegmentedFile)dfile).metadata;
+        return ((ICompressedFile) dfile).getMetadata();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ab8a28e3/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
index 9207276..48770d3 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
@@ -77,9 +77,7 @@ public class SSTableSimpleUnsortedWriter extends 
AbstractSSTableSimpleWriter
                                        int bufferSizeInMB,
                                        CompressionParameters 
compressParameters)
     {
-        super(directory, new CFMetaData(keyspace, columnFamily, subComparator 
== null ? ColumnFamilyType.Standard : ColumnFamilyType.Super, comparator, 
subComparator).compressionParameters(compressParameters), partitioner);
-        this.bufferSize = bufferSizeInMB * 1024L * 1024L;
-        this.diskWriter.start();
+        this(directory, new CFMetaData(keyspace, columnFamily, subComparator 
== null ? ColumnFamilyType.Standard : ColumnFamilyType.Super, comparator, 
subComparator).compressionParameters(compressParameters), partitioner, 
bufferSizeInMB);
     }
 
     public SSTableSimpleUnsortedWriter(File directory,
@@ -93,6 +91,13 @@ public class SSTableSimpleUnsortedWriter extends 
AbstractSSTableSimpleWriter
         this(directory, partitioner, keyspace, columnFamily, comparator, 
subComparator, bufferSizeInMB, new CompressionParameters(null));
     }
 
+    public SSTableSimpleUnsortedWriter(File directory, CFMetaData metadata, 
IPartitioner partitioner, long bufferSizeInMB)
+    {
+        super(directory, metadata, partitioner);
+        this.bufferSize = bufferSizeInMB * 1024L * 1024L;
+        this.diskWriter.start();
+    }
+
     protected void writeRow(DecoratedKey key, ColumnFamily columnFamily) 
throws IOException
     {
         currentSize += key.key.remaining() + 
ColumnFamily.serializer.serializedSize(columnFamily, 
MessagingService.current_version) * 1.2;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ab8a28e3/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java 
b/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
index 6d78b3e..e142f2e 100644
--- a/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
@@ -20,7 +20,7 @@ package org.apache.cassandra.io.util;
 import org.apache.cassandra.io.compress.CompressedRandomAccessReader;
 import org.apache.cassandra.io.compress.CompressionMetadata;
 
-public class CompressedPoolingSegmentedFile extends PoolingSegmentedFile
+public class CompressedPoolingSegmentedFile extends PoolingSegmentedFile 
implements ICompressedFile
 {
     public final CompressionMetadata metadata;
 
@@ -48,6 +48,11 @@ public class CompressedPoolingSegmentedFile extends 
PoolingSegmentedFile
         return CompressedRandomAccessReader.open(path, metadata, this);
     }
 
+    public CompressionMetadata getMetadata()
+    {
+        return metadata;
+    }
+
     @Override
     public void cleanup()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ab8a28e3/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java 
b/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
index f861e6b..f41b1aa 100644
--- a/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
@@ -17,12 +17,10 @@
  */
 package org.apache.cassandra.io.util;
 
-import java.io.File;
-
 import org.apache.cassandra.io.compress.CompressedRandomAccessReader;
 import org.apache.cassandra.io.compress.CompressionMetadata;
 
-public class CompressedSegmentedFile extends SegmentedFile
+public class CompressedSegmentedFile extends SegmentedFile implements 
ICompressedFile
 {
     public final CompressionMetadata metadata;
 
@@ -52,6 +50,11 @@ public class CompressedSegmentedFile extends SegmentedFile
         return reader;
     }
 
+    public CompressionMetadata getMetadata()
+    {
+        return metadata;
+    }
+
     public void cleanup()
     {
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ab8a28e3/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java 
b/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
new file mode 100644
index 0000000..8fa886e
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable;
+
+import java.io.File;
+import java.util.List;
+
+import com.google.common.io.Files;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.OutputHandler;
+
+import static junit.framework.Assert.assertEquals;
+
+public class SSTableLoaderTest extends SchemaLoader
+{
+    @BeforeClass
+    public static void setup() throws Exception
+    {
+        StorageService.instance.initServer();
+    }
+
+    @Test
+    public void testLoadingSSTable() throws Exception
+    {
+        File tempdir = Files.createTempDir();
+        File dataDir = new File(tempdir.getAbsolutePath() + File.separator + 
"Keyspace1" + File.separator + "Standard1");
+        assert dataDir.mkdirs();
+        CFMetaData cfmeta = Schema.instance.getCFMetaData("Keyspace1", 
"Standard1");
+        SSTableSimpleUnsortedWriter writer = new 
SSTableSimpleUnsortedWriter(dataDir,
+                                                                             
cfmeta,
+                                                                             
StorageService.getPartitioner(),
+                                                                             
1);
+        DecoratedKey key = Util.dk("key1");
+        writer.newRow(key.key);
+        writer.addColumn(ByteBufferUtil.bytes("col1"), 
ByteBufferUtil.bytes(100), 1);
+        writer.close();
+
+        SSTableLoader loader = new SSTableLoader(dataDir, new 
SSTableLoader.Client()
+        {
+            @Override
+            public void init(String keyspace)
+            {
+                try
+                {
+                    for (Range<Token> range : 
StorageService.instance.getLocalRanges("Keyspace1"))
+                        addRangeForEndpoint(range, 
FBUtilities.getBroadcastAddress());
+                    setPartitioner(StorageService.getPartitioner());
+                } catch (ConfigurationException e)
+                {
+                    throw new RuntimeException(e);
+                }
+            }
+
+            @Override
+            public boolean validateColumnFamily(String keyspace, String cfName)
+            {
+                return true;
+            }
+        }, new OutputHandler.SystemOutput(false, false));
+
+        loader.stream().get();
+
+        List<Row> rows = 
Util.getRangeSlice(Table.open("Keyspace1").getColumnFamilyStore("Standard1"));
+        assertEquals(1, rows.size());
+        assertEquals(key, rows.get(0).key);
+        assertEquals(ByteBufferUtil.bytes(100), 
rows.get(0).cf.getColumn(ByteBufferUtil.bytes("col1")).value());
+    }
+}

Reply via email to