Repository: cassandra Updated Branches: refs/heads/trunk eb5a59a31 -> adfbf518e
Run CommitLog tests with different compression settings patch by Benjamin Lerer; reviewed by Branimir Lambov for CASSANDRA-9039 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6c445d6b Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6c445d6b Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6c445d6b Branch: refs/heads/trunk Commit: 6c445d6b7f3c8933a0bfd599ba8455b7254a323d Parents: b8f5c1f Author: Benjamin Lerer <b.le...@gmail.com> Authored: Thu Jun 2 12:31:31 2016 +0200 Committer: Benjamin Lerer <b.le...@gmail.com> Committed: Thu Jun 2 12:31:31 2016 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/db/commitlog/CommitLog.java | 71 ++++++++++- .../db/commitlog/CommitLogSegment.java | 5 +- .../db/commitlog/CommitLogSegmentManager.java | 15 ++- .../db/commitlog/CompressedSegment.java | 6 +- .../db/commitlog/CommitLogStressTest.java | 2 +- .../cassandra/db/RecoveryManager2Test.java | 36 ++++++ .../cassandra/db/RecoveryManager3Test.java | 33 +++++ .../cassandra/db/RecoveryManagerTest.java | 42 +++++-- .../db/RecoveryManagerTruncateTest.java | 35 ++++++ .../db/commitlog/CommitLogDescriptorTest.java | 103 ++++++++++++++++ .../cassandra/db/commitlog/CommitLogTest.java | 121 ++++++------------- .../db/commitlog/CommitLogUpgradeTestMaker.java | 2 +- 13 files changed, 358 insertions(+), 114 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c445d6b/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index c97293d..9752d16 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.2.7 + * Run CommitLog tests with different compression settings (CASSANDRA-9039) * cqlsh: fix tab completion for case-sensitive identifiers (CASSANDRA-11664) * Avoid showing estimated key as -1 in tablestats (CASSANDRA-11587) * Fix possible race condition in CommitLog.recover (CASSANDRA-11743) http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c445d6b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java index 9a6ba34..460ecfe 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java @@ -70,8 +70,7 @@ public class CommitLog implements CommitLogMBean final CommitLogMetrics metrics; final AbstractCommitLogService executor; - final ICompressor compressor; - public ParameterizedClass compressorClass; + volatile Configuration configuration; final public String location; static private CommitLog construct() @@ -93,12 +92,10 @@ public class CommitLog implements CommitLogMBean @VisibleForTesting CommitLog(String location, CommitLogArchiver archiver) { - compressorClass = DatabaseDescriptor.getCommitLogCompression(); this.location = location; - ICompressor compressor = compressorClass != null ? CompressionParameters.createCompressor(compressorClass) : null; + this.configuration = new Configuration(DatabaseDescriptor.getCommitLogCompression()); DatabaseDescriptor.createAllDirectories(); - this.compressor = compressor; this.archiver = archiver; metrics = new CommitLogMetrics(); @@ -412,6 +409,7 @@ public class CommitLog implements CommitLogMBean public int resetUnsafe(boolean deleteSegments) throws IOException { stopUnsafe(deleteSegments); + resetConfiguration(); return restartUnsafe(); } @@ -434,6 +432,14 @@ public class CommitLog implements CommitLogMBean } /** + * FOR TESTING PURPOSES. + */ + public void resetConfiguration() + { + this.configuration = new Configuration(DatabaseDescriptor.getCommitLogCompression()); + } + + /** * FOR TESTING PURPOSES. See CommitLogAllocator */ public int restartUnsafe() throws IOException @@ -488,4 +494,59 @@ public class CommitLog implements CommitLogMBean throw new AssertionError(DatabaseDescriptor.getCommitFailurePolicy()); } } + + public static final class Configuration + { + /** + * The compressor class. + */ + private final ParameterizedClass compressorClass; + + /** + * The compressor used to compress the segments. + */ + private final ICompressor compressor; + + public Configuration(ParameterizedClass compressorClass) + { + this.compressorClass = compressorClass; + this.compressor = compressorClass != null ? CompressionParameters.createCompressor(compressorClass) : null; + } + + /** + * Checks if the segments must be compressed. + * @return <code>true</code> if the segments must be compressed, <code>false</code> otherwise. + */ + public boolean useCompression() + { + return compressor != null; + } + + /** + * Returns the compressor used to compress the segments. + * @return the compressor used to compress the segments + */ + public ICompressor getCompressor() + { + return compressor; + } + + /** + * Returns the compressor class. + * @return the compressor class + */ + public ParameterizedClass getCompressorClass() + { + return compressorClass; + } + + /** + * Returns the compressor name. + * @return the compressor name. + */ + public String getCompressorName() + { + return useCompression() ? compressor.getClass().getSimpleName() : "none"; + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c445d6b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java index b6801d2..ba28f3e 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java @@ -119,7 +119,8 @@ public abstract class CommitLogSegment static CommitLogSegment createSegment(CommitLog commitLog) { - return commitLog.compressor != null ? new CompressedSegment(commitLog) : new MemoryMappedSegment(commitLog); + return commitLog.configuration.useCompression() ? new CompressedSegment(commitLog) + : new MemoryMappedSegment(commitLog); } static long getNextId() @@ -136,7 +137,7 @@ public abstract class CommitLogSegment { this.commitLog = commitLog; id = getNextId(); - descriptor = new CommitLogDescriptor(id, commitLog.compressorClass); + descriptor = new CommitLogDescriptor(id, commitLog.configuration.getCompressorClass()); logFile = new File(commitLog.location, descriptor.fileName()); try http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c445d6b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java index 636c73b..8670fd7 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java @@ -491,13 +491,16 @@ public class CommitLogSegmentManager throw new RuntimeException(e); } - for (CommitLogSegment segment : activeSegments) - closeAndDeleteSegmentUnsafe(segment, deleteSegments); - activeSegments.clear(); + synchronized (this) + { + for (CommitLogSegment segment : activeSegments) + closeAndDeleteSegmentUnsafe(segment, deleteSegments); + activeSegments.clear(); - for (CommitLogSegment segment : availableSegments) - closeAndDeleteSegmentUnsafe(segment, deleteSegments); - availableSegments.clear(); + for (CommitLogSegment segment : availableSegments) + closeAndDeleteSegmentUnsafe(segment, deleteSegments); + availableSegments.clear(); + } allocatingFrom = null; http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c445d6b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java index 8c62536..219709b 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java +++ b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java @@ -61,7 +61,7 @@ public class CompressedSegment extends CommitLogSegment CompressedSegment(CommitLog commitLog) { super(commitLog); - this.compressor = commitLog.compressor; + this.compressor = commitLog.configuration.getCompressor(); try { channel.write((ByteBuffer) buffer.duplicate().flip()); @@ -84,7 +84,9 @@ public class CompressedSegment extends CommitLogSegment if (buf == null) { // this.compressor is not yet set, so we must use the commitLog's one. - buf = commitLog.compressor.preferredBufferType().allocate(DatabaseDescriptor.getCommitLogSegmentSize()); + buf = commitLog.configuration.getCompressor() + .preferredBufferType() + .allocate(DatabaseDescriptor.getCommitLogSegmentSize()); } else buf.clear(); return buf; http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c445d6b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java ---------------------------------------------------------------------- diff --git a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java index f9b4156..4604c49 100644 --- a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java +++ b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java @@ -218,7 +218,7 @@ public class CommitLogStressTest { System.out.format("\nTesting commit log size %.0fmb, compressor %s, sync %s%s%s\n", mb(DatabaseDescriptor.getCommitLogSegmentSize()), - commitLog.compressor != null ? commitLog.compressor.getClass().getSimpleName() : "none", + commitLog.configuration.getCompressorName(), commitLog.executor.getClass().getSimpleName(), randomSize ? " random size" : "", discardedRun ? " with discarded run" : ""); http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c445d6b/test/unit/org/apache/cassandra/db/RecoveryManager2Test.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/RecoveryManager2Test.java b/test/unit/org/apache/cassandra/db/RecoveryManager2Test.java index 13c3452..3beb28e 100644 --- a/test/unit/org/apache/cassandra/db/RecoveryManager2Test.java +++ b/test/unit/org/apache/cassandra/db/RecoveryManager2Test.java @@ -21,22 +21,37 @@ package org.apache.cassandra.db; */ +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; + +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static org.apache.cassandra.Util.column; import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.KSMetaData; +import org.apache.cassandra.config.ParameterizedClass; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.io.compress.DeflateCompressor; +import org.apache.cassandra.io.compress.LZ4Compressor; +import org.apache.cassandra.io.compress.SnappyCompressor; import org.apache.cassandra.locator.SimpleStrategy; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; +@RunWith(Parameterized.class) public class RecoveryManager2Test { private static Logger logger = LoggerFactory.getLogger(RecoveryManager2Test.class); @@ -56,6 +71,27 @@ public class RecoveryManager2Test SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD2)); } + public RecoveryManager2Test(ParameterizedClass commitLogCompression) + { + DatabaseDescriptor.setCommitLogCompression(commitLogCompression); + } + + @Before + public void setUp() throws IOException + { + CommitLog.instance.resetUnsafe(true); + } + + @Parameters() + public static Collection<Object[]> generateData() + { + return Arrays.asList(new Object[][] { + { null }, // No compression + { new ParameterizedClass(LZ4Compressor.class.getName(), Collections.<String, String>emptyMap()) }, + { new ParameterizedClass(SnappyCompressor.class.getName(), Collections.<String, String>emptyMap()) }, + { new ParameterizedClass(DeflateCompressor.class.getName(), Collections.<String, String>emptyMap()) } }); + } + @Test /* test that commit logs do not replay flushed data */ public void testWithFlush() throws Exception http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c445d6b/test/unit/org/apache/cassandra/db/RecoveryManager3Test.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/RecoveryManager3Test.java b/test/unit/org/apache/cassandra/db/RecoveryManager3Test.java index a94d94d..2dd7eae 100644 --- a/test/unit/org/apache/cassandra/db/RecoveryManager3Test.java +++ b/test/unit/org/apache/cassandra/db/RecoveryManager3Test.java @@ -23,22 +23,34 @@ package org.apache.cassandra.db; import java.io.File; import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.KSMetaData; +import org.apache.cassandra.config.ParameterizedClass; import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.io.compress.DeflateCompressor; +import org.apache.cassandra.io.compress.LZ4Compressor; +import org.apache.cassandra.io.compress.SnappyCompressor; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.locator.SimpleStrategy; import static org.apache.cassandra.Util.column; import static org.apache.cassandra.db.KeyspaceTest.assertColumns; +@RunWith(Parameterized.class) public class RecoveryManager3Test { private static final String KEYSPACE1 = "RecoveryManager3Test1"; @@ -47,6 +59,27 @@ public class RecoveryManager3Test private static final String KEYSPACE2 = "RecoveryManager3Test2"; private static final String CF_STANDARD3 = "Standard3"; + public RecoveryManager3Test(ParameterizedClass commitLogCompression) + { + DatabaseDescriptor.setCommitLogCompression(commitLogCompression); + } + + @Before + public void setUp() throws IOException + { + CommitLog.instance.resetUnsafe(true); + } + + @Parameters() + public static Collection<Object[]> generateData() + { + return Arrays.asList(new Object[][] { + { null }, // No compression + { new ParameterizedClass(LZ4Compressor.class.getName(), Collections.<String, String>emptyMap()) }, + { new ParameterizedClass(SnappyCompressor.class.getName(), Collections.<String, String>emptyMap()) }, + { new ParameterizedClass(DeflateCompressor.class.getName(), Collections.<String, String>emptyMap()) } }); + } + @BeforeClass public static void defineSchema() throws ConfigurationException { http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c445d6b/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java b/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java index c9abe0d..5676b99 100644 --- a/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java +++ b/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java @@ -19,31 +19,38 @@ package org.apache.cassandra.db; import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; import java.util.Date; import java.util.concurrent.TimeUnit; -import org.apache.cassandra.OrderedJUnit4ClassRunner; -import org.apache.cassandra.Util; -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.KSMetaData; -import org.apache.cassandra.db.marshal.BytesType; -import org.apache.cassandra.db.marshal.CounterColumnType; -import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.locator.SimpleStrategy; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.Util; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.KSMetaData; +import org.apache.cassandra.config.ParameterizedClass; import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.db.commitlog.CommitLogArchiver; +import org.apache.cassandra.db.marshal.CounterColumnType; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.io.compress.DeflateCompressor; +import org.apache.cassandra.io.compress.LZ4Compressor; +import org.apache.cassandra.io.compress.SnappyCompressor; +import org.apache.cassandra.locator.SimpleStrategy; +import static org.apache.cassandra.Util.cellname; import static org.apache.cassandra.Util.column; import static org.apache.cassandra.db.KeyspaceTest.assertColumns; -import static org.apache.cassandra.Util.cellname; -@RunWith(OrderedJUnit4ClassRunner.class) +@RunWith(Parameterized.class) public class RecoveryManagerTest { private static final String KEYSPACE1 = "RecoveryManagerTest1"; @@ -68,6 +75,21 @@ public class RecoveryManagerTest SchemaLoader.standardCFMD(KEYSPACE2, CF_STANDARD3)); } + public RecoveryManagerTest(ParameterizedClass commitLogCompression) + { + DatabaseDescriptor.setCommitLogCompression(commitLogCompression); + } + + @Parameters() + public static Collection<Object[]> generateData() + { + return Arrays.asList(new Object[][] { + { null }, // No compression + { new ParameterizedClass(LZ4Compressor.class.getName(), Collections.<String, String>emptyMap()) }, + { new ParameterizedClass(SnappyCompressor.class.getName(), Collections.<String, String>emptyMap()) }, + { new ParameterizedClass(DeflateCompressor.class.getName(), Collections.<String, String>emptyMap()) } }); + } + @Test public void testNothingToRecover() throws IOException { http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c445d6b/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java b/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java index a004105..769316f 100644 --- a/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java +++ b/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java @@ -22,26 +22,61 @@ import static org.apache.cassandra.Util.column; import static org.junit.Assert.*; import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; + +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.KSMetaData; +import org.apache.cassandra.config.ParameterizedClass; import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.io.compress.DeflateCompressor; +import org.apache.cassandra.io.compress.LZ4Compressor; +import org.apache.cassandra.io.compress.SnappyCompressor; import org.apache.cassandra.locator.SimpleStrategy; import org.apache.cassandra.utils.ByteBufferUtil; /** * Test for the truncate operation. */ +@RunWith(Parameterized.class) public class RecoveryManagerTruncateTest { private static final String KEYSPACE1 = "RecoveryManagerTruncateTest"; private static final String CF_STANDARD1 = "Standard1"; private static final String CF_STANDARD2 = "Standard2"; + public RecoveryManagerTruncateTest(ParameterizedClass commitLogCompression) + { + DatabaseDescriptor.setCommitLogCompression(commitLogCompression); + } + + @Before + public void setUp() throws IOException + { + CommitLog.instance.resetUnsafe(true); + } + + @Parameters() + public static Collection<Object[]> generateData() + { + return Arrays.asList(new Object[][] { + { null }, // No compression + { new ParameterizedClass(LZ4Compressor.class.getName(), Collections.<String, String>emptyMap()) }, + { new ParameterizedClass(SnappyCompressor.class.getName(), Collections.<String, String>emptyMap()) }, + { new ParameterizedClass(DeflateCompressor.class.getName(), Collections.<String, String>emptyMap()) } }); + } + @BeforeClass public static void defineSchema() throws ConfigurationException { http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c445d6b/test/unit/org/apache/cassandra/db/commitlog/CommitLogDescriptorTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogDescriptorTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogDescriptorTest.java new file mode 100644 index 0000000..8d63959 --- /dev/null +++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogDescriptorTest.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.commitlog; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; + +import com.google.common.collect.ImmutableMap; + +import org.junit.Test; + +import org.apache.cassandra.config.ParameterizedClass; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.io.util.ByteBufferDataInput; +import org.apache.cassandra.io.util.FileDataInput; +import org.apache.cassandra.net.MessagingService; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class CommitLogDescriptorTest +{ + @Test + public void testVersions() + { + assertTrue(CommitLogDescriptor.isValid("CommitLog-1340512736956320000.log")); + assertTrue(CommitLogDescriptor.isValid("CommitLog-2-1340512736956320000.log")); + assertFalse(CommitLogDescriptor.isValid("CommitLog--1340512736956320000.log")); + assertFalse(CommitLogDescriptor.isValid("CommitLog--2-1340512736956320000.log")); + assertFalse(CommitLogDescriptor.isValid("CommitLog-2-1340512736956320000-123.log")); + + assertEquals(1340512736956320000L, CommitLogDescriptor.fromFileName("CommitLog-2-1340512736956320000.log").id); + + assertEquals(MessagingService.current_version, new CommitLogDescriptor(1340512736956320000L, null).getMessagingVersion()); + String newCLName = "CommitLog-" + CommitLogDescriptor.current_version + "-1340512736956320000.log"; + assertEquals(MessagingService.current_version, CommitLogDescriptor.fromFileName(newCLName).getMessagingVersion()); + } + + private void testDescriptorPersistence(CommitLogDescriptor desc) throws IOException + { + ByteBuffer buf = ByteBuffer.allocate(1024); + CommitLogDescriptor.writeHeader(buf, desc); + long length = buf.position(); + // Put some extra data in the stream. + buf.putDouble(0.1); + buf.flip(); + try (FileDataInput input = new ByteBufferDataInput(buf, "input", 0, 0)) + { + CommitLogDescriptor read = CommitLogDescriptor.readHeader(input); + assertEquals("Descriptor length", length, input.getFilePointer()); + assertEquals("Descriptors", desc, read); + } + } + + @Test + public void testDescriptorPersistence() throws IOException + { + testDescriptorPersistence(new CommitLogDescriptor(11, null)); + testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_21, 13, null)); + testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_22, 15, null)); + testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_22, 17, new ParameterizedClass("LZ4Compressor", null))); + testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_22, 19, + new ParameterizedClass("StubbyCompressor", ImmutableMap.of("parameter1", "value1", "flag2", "55", "argument3", "null")))); + } + + @Test + public void testDescriptorInvalidParametersSize() throws IOException + { + Map<String, String> params = new HashMap<>(); + for (int i=0; i<6000; ++i) + params.put("key"+i, Integer.toString(i, 16)); + try { + CommitLogDescriptor desc = new CommitLogDescriptor(CommitLogDescriptor.VERSION_22, + 21, + new ParameterizedClass("LZ4Compressor", params)); + ByteBuffer buf = ByteBuffer.allocate(1024000); + CommitLogDescriptor.writeHeader(buf, desc); + fail("Parameter object too long should fail on writing descriptor."); + } catch (ConfigurationException e) + { + // correct path + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c445d6b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java index 0ad880b..9999b42 100644 --- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java +++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java @@ -19,53 +19,46 @@ package org.apache.cassandra.db.commitlog; -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.OutputStream; +import java.io.*; import java.nio.ByteBuffer; -import java.util.HashMap; -import java.util.Map; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.zip.CRC32; import java.util.zip.Checksum; -import com.google.common.collect.ImmutableMap; - import org.junit.Assert; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.KSMetaData; import org.apache.cassandra.config.ParameterizedClass; -import org.apache.cassandra.db.Cell; -import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.db.Keyspace; -import org.apache.cassandra.db.Mutation; -import org.apache.cassandra.db.ReadCommand; -import org.apache.cassandra.db.Row; -import org.apache.cassandra.db.SliceByNamesReadCommand; +import org.apache.cassandra.db.*; import org.apache.cassandra.db.commitlog.CommitLogReplayer.CommitLogReplayException; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.composites.CellName; import org.apache.cassandra.db.composites.CellNameType; import org.apache.cassandra.db.filter.NamesQueryFilter; import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.io.util.ByteBufferDataInput; -import org.apache.cassandra.io.util.FileDataInput; +import org.apache.cassandra.io.compress.DeflateCompressor; +import org.apache.cassandra.io.compress.LZ4Compressor; +import org.apache.cassandra.io.compress.SnappyCompressor; import org.apache.cassandra.locator.SimpleStrategy; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.utils.*; import static org.apache.cassandra.utils.ByteBufferUtil.bytes; +@RunWith(Parameterized.class) public class CommitLogTest { private static final String KEYSPACE1 = "CommitLogTest"; @@ -73,6 +66,27 @@ public class CommitLogTest private static final String CF1 = "Standard1"; private static final String CF2 = "Standard2"; + public CommitLogTest(ParameterizedClass commitLogCompression) + { + DatabaseDescriptor.setCommitLogCompression(commitLogCompression); + } + + @Before + public void setUp() throws IOException + { + CommitLog.instance.resetUnsafe(true); + } + + @Parameters() + public static Collection<Object[]> generateData() + { + return Arrays.asList(new Object[][] { + { null }, // No compression + { new ParameterizedClass(LZ4Compressor.class.getName(), Collections.<String, String>emptyMap()) }, + { new ParameterizedClass(SnappyCompressor.class.getName(), Collections.<String, String>emptyMap()) }, + { new ParameterizedClass(DeflateCompressor.class.getName(), Collections.<String, String>emptyMap()) } }); + } + @BeforeClass public static void defineSchema() throws ConfigurationException { @@ -194,7 +208,6 @@ public class CommitLogTest @Test public void testDontDeleteIfDirty() throws Exception { - CommitLog.instance.resetUnsafe(true); // Roughly 32 MB mutation Mutation rm = new Mutation(KEYSPACE1, bytes("k")); rm.add(CF1, Util.cellname("c1"), ByteBuffer.allocate(DatabaseDescriptor.getCommitLogSegmentSize()/4), 0); @@ -224,7 +237,6 @@ public class CommitLogTest public void testDeleteIfNotDirty() throws Exception { DatabaseDescriptor.getCommitLogSegmentSize(); - CommitLog.instance.resetUnsafe(true); // Roughly 32 MB mutation Mutation rm = new Mutation(KEYSPACE1, bytes("k")); rm.add(CF1, Util.cellname("c1"), ByteBuffer.allocate((DatabaseDescriptor.getCommitLogSegmentSize()/4) - 1), 0); @@ -282,8 +294,6 @@ public class CommitLogTest @Test public void testEqualRecordLimit() throws Exception { - CommitLog.instance.resetUnsafe(true); - Mutation rm = new Mutation(KEYSPACE1, bytes("k")); rm.add(CF1, Util.cellname("c1"), ByteBuffer.allocate(getMaxRecordDataSize()), 0); CommitLog.instance.add(rm); @@ -292,7 +302,6 @@ public class CommitLogTest @Test public void testExceedRecordLimit() throws Exception { - CommitLog.instance.resetUnsafe(true); try { Mutation rm = new Mutation(KEYSPACE1, bytes("k")); @@ -423,25 +432,8 @@ public class CommitLogTest } @Test - public void testVersions() - { - Assert.assertTrue(CommitLogDescriptor.isValid("CommitLog-1340512736956320000.log")); - Assert.assertTrue(CommitLogDescriptor.isValid("CommitLog-2-1340512736956320000.log")); - Assert.assertFalse(CommitLogDescriptor.isValid("CommitLog--1340512736956320000.log")); - Assert.assertFalse(CommitLogDescriptor.isValid("CommitLog--2-1340512736956320000.log")); - Assert.assertFalse(CommitLogDescriptor.isValid("CommitLog-2-1340512736956320000-123.log")); - - Assert.assertEquals(1340512736956320000L, CommitLogDescriptor.fromFileName("CommitLog-2-1340512736956320000.log").id); - - Assert.assertEquals(MessagingService.current_version, new CommitLogDescriptor(1340512736956320000L, null).getMessagingVersion()); - String newCLName = "CommitLog-" + CommitLogDescriptor.current_version + "-1340512736956320000.log"; - Assert.assertEquals(MessagingService.current_version, CommitLogDescriptor.fromFileName(newCLName).getMessagingVersion()); - } - - @Test public void testTruncateWithoutSnapshot() throws ExecutionException, InterruptedException, IOException { - CommitLog.instance.resetUnsafe(true); boolean prev = DatabaseDescriptor.isAutoSnapshot(); DatabaseDescriptor.setAutoSnapshot(false); ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore("Standard1"); @@ -470,7 +462,6 @@ public class CommitLogTest @Test public void testTruncateWithoutSnapshotNonDurable() throws IOException { - CommitLog.instance.resetUnsafe(true); boolean prevAutoSnapshot = DatabaseDescriptor.isAutoSnapshot(); DatabaseDescriptor.setAutoSnapshot(false); Keyspace notDurableKs = Keyspace.open(KEYSPACE2); @@ -494,48 +485,4 @@ public class CommitLogTest row = command.getRow(notDurableKs); Assert.assertEquals(null, row.cf); } - - private void testDescriptorPersistence(CommitLogDescriptor desc) throws IOException - { - ByteBuffer buf = ByteBuffer.allocate(1024); - CommitLogDescriptor.writeHeader(buf, desc); - long length = buf.position(); - // Put some extra data in the stream. - buf.putDouble(0.1); - buf.flip(); - FileDataInput input = new ByteBufferDataInput(buf, "input", 0, 0); - CommitLogDescriptor read = CommitLogDescriptor.readHeader(input); - Assert.assertEquals("Descriptor length", length, input.getFilePointer()); - Assert.assertEquals("Descriptors", desc, read); - } - - @Test - public void testDescriptorPersistence() throws IOException - { - testDescriptorPersistence(new CommitLogDescriptor(11, null)); - testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_21, 13, null)); - testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_22, 15, null)); - testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_22, 17, new ParameterizedClass("LZ4Compressor", null))); - testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_22, 19, - new ParameterizedClass("StubbyCompressor", ImmutableMap.of("parameter1", "value1", "flag2", "55", "argument3", "null")))); - } - - @Test - public void testDescriptorInvalidParametersSize() throws IOException - { - Map<String, String> params = new HashMap<>(); - for (int i=0; i<6000; ++i) - params.put("key"+i, Integer.toString(i, 16)); - try { - CommitLogDescriptor desc = new CommitLogDescriptor(CommitLogDescriptor.VERSION_22, - 21, - new ParameterizedClass("LZ4Compressor", params)); - ByteBuffer buf = ByteBuffer.allocate(1024000); - CommitLogDescriptor.writeHeader(buf, desc); - Assert.fail("Parameter object too long should fail on writing descriptor."); - } catch (ConfigurationException e) - { - // correct path - } - } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c445d6b/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTestMaker.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTestMaker.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTestMaker.java index 7b07c8e..175a8d6 100644 --- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTestMaker.java +++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTestMaker.java @@ -98,7 +98,7 @@ public class CommitLogUpgradeTestMaker CommitLog commitLog = CommitLog.instance; System.out.format("\nUsing commit log size %dmb, compressor %s, sync %s%s\n", mb(DatabaseDescriptor.getCommitLogSegmentSize()), - commitLog.compressor != null ? commitLog.compressor.getClass().getSimpleName() : "none", + commitLog.configuration.getCompressorName(), commitLog.executor.getClass().getSimpleName(), randomSize ? " random size" : ""); final List<CommitlogExecutor> threads = new ArrayList<>();