Merge branch cassandra-3.0 into cassandra-3.7
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/dc6ffc25 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/dc6ffc25 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/dc6ffc25 Branch: refs/heads/cassandra-3.7 Commit: dc6ffc25a8d00659385a1219d0189bd068ef110d Parents: dbf0310 1e82695 Author: Benjamin Lerer <b.le...@gmail.com> Authored: Thu Jun 2 12:47:03 2016 +0200 Committer: Benjamin Lerer <b.le...@gmail.com> Committed: Thu Jun 2 12:50:19 2016 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/db/commitlog/CommitLog.java | 102 +++++++++- .../db/commitlog/CommitLogSegment.java | 15 +- .../db/commitlog/CommitLogSegmentManager.java | 17 +- .../db/commitlog/CompressedSegment.java | 4 +- .../db/commitlog/EncryptedSegment.java | 4 +- .../db/commitlog/CommitLogStressTest.java | 12 +- .../db/RecoveryManagerFlushedTest.java | 40 ++++ .../db/RecoveryManagerMissingHeaderTest.java | 38 +++- .../cassandra/db/RecoveryManagerTest.java | 167 ++++++++++------- .../db/RecoveryManagerTruncateTest.java | 38 ++++ .../db/commitlog/CommitLogDescriptorTest.java | 3 +- .../cassandra/db/commitlog/CommitLogTest.java | 187 ++++++------------- .../db/commitlog/CommitLogUpgradeTestMaker.java | 4 +- 14 files changed, 407 insertions(+), 225 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc6ffc25/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index a54f4fd,70da4ad..2a66eb4 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -22,11 -20,10 +22,12 @@@ Merged from 2.2 * Enable client encryption in sstableloader with cli options (CASSANDRA-11708) * Possible memory leak in NIODataInputStream (CASSANDRA-11867) * Add seconds to cqlsh tracing session duration (CASSANDRA-11753) + * Fix commit log replay after out-of-order flush completion (CASSANDRA-9669) * Prohibit Reversed Counter type as part of the PK (CASSANDRA-9395) + * cqlsh: correctly handle non-ascii chars in error messages (CASSANDRA-11626) Merged from 2.1: ++ * Run CommitLog tests with different compression settings (CASSANDRA-9039) * cqlsh: apply current keyspace to source command (CASSANDRA-11152) - * Backport CASSANDRA-11578 (CASSANDRA-11750) * Clear out parent repair session if repair coordinator dies (CASSANDRA-11824) * Set default streaming_socket_timeout_in_ms to 24 hours (CASSANDRA-11840) * Do not consider local node a valid source during replace (CASSANDRA-11848) http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc6ffc25/src/java/org/apache/cassandra/db/commitlog/CommitLog.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLog.java index 10bc91a,dcdd855..4a660ca --- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java @@@ -96,13 -92,10 +94,11 @@@ public class CommitLog implements Commi @VisibleForTesting CommitLog(String location, CommitLogArchiver archiver) { - compressorClass = DatabaseDescriptor.getCommitLogCompression(); this.location = location; - ICompressor compressor = compressorClass != null ? CompressionParams.createCompressor(compressorClass) : null; - this.configuration = new Configuration(DatabaseDescriptor.getCommitLogCompression()); ++ this.configuration = new Configuration(DatabaseDescriptor.getCommitLogCompression(), ++ DatabaseDescriptor.getEncryptionContext()); DatabaseDescriptor.createAllDirectories(); - encryptionContext = DatabaseDescriptor.getEncryptionContext(); - this.compressor = compressor; this.archiver = archiver; metrics = new CommitLogMetrics(); @@@ -146,7 -139,7 +142,8 @@@ }; // submit all existing files in the commit log dir for archiving prior to recovery - CASSANDRA-6904 -- for (File file : new File(DatabaseDescriptor.getCommitLogLocation()).listFiles(unmanagedFilesFilter)) ++ File[] listFiles = new File(DatabaseDescriptor.getCommitLogLocation()).listFiles(unmanagedFilesFilter); ++ for (File file : listFiles) { archiver.maybeArchive(file.getPath(), file.getName()); archiver.maybeWaitForArchiving(file.getName()); @@@ -420,6 -413,6 +418,15 @@@ } /** ++ * FOR TESTING PURPOSES. ++ */ ++ public void resetConfiguration() ++ { ++ configuration = new Configuration(DatabaseDescriptor.getCommitLogCompression(), ++ DatabaseDescriptor.getEncryptionContext()); ++ } ++ ++ /** * FOR TESTING PURPOSES. See CommitLogAllocator. */ public void stopUnsafe(boolean deleteSegments) @@@ -492,4 -493,59 +499,83 @@@ throw new AssertionError(DatabaseDescriptor.getCommitFailurePolicy()); } } + + public static final class Configuration + { + /** + * The compressor class. + */ + private final ParameterizedClass compressorClass; + + /** + * The compressor used to compress the segments. + */ + private final ICompressor compressor; + - public Configuration(ParameterizedClass compressorClass) ++ /** ++ * The encryption context used to encrypt the segments. ++ */ ++ private EncryptionContext encryptionContext; ++ ++ public Configuration(ParameterizedClass compressorClass, EncryptionContext encryptionContext) + { + this.compressorClass = compressorClass; + this.compressor = compressorClass != null ? CompressionParams.createCompressor(compressorClass) : null; ++ this.encryptionContext = encryptionContext; + } + + /** + * Checks if the segments must be compressed. + * @return <code>true</code> if the segments must be compressed, <code>false</code> otherwise. + */ + public boolean useCompression() + { + return compressor != null; + } + + /** ++ * Checks if the segments must be encrypted. ++ * @return <code>true</code> if the segments must be encrypted, <code>false</code> otherwise. ++ */ ++ public boolean useEncryption() ++ { ++ return encryptionContext.isEnabled(); ++ } ++ ++ /** + * Returns the compressor used to compress the segments. + * @return the compressor used to compress the segments + */ + public ICompressor getCompressor() + { + return compressor; + } + + /** + * Returns the compressor class. + * @return the compressor class + */ + public ParameterizedClass getCompressorClass() + { + return compressorClass; + } + + /** + * Returns the compressor name. + * @return the compressor name. + */ + public String getCompressorName() + { + return useCompression() ? compressor.getClass().getSimpleName() : "none"; + } ++ ++ /** ++ * Returns the encryption context used to encrypt the segments. ++ * @return the encryption context used to encrypt the segments ++ */ ++ public EncryptionContext getEncryptionContext() ++ { ++ return encryptionContext; ++ } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc6ffc25/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java index 8f8b523,27c05b4..2045c35 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java @@@ -46,6 -45,6 +46,7 @@@ import org.apache.cassandra.config.CFMe import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.Mutation; ++import org.apache.cassandra.db.commitlog.CommitLog.Configuration; import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.io.FSWriteError; import org.apache.cassandra.io.util.FileUtils; @@@ -122,11 -120,8 +123,12 @@@ public abstract class CommitLogSegmen static CommitLogSegment createSegment(CommitLog commitLog, Runnable onClose) { - CommitLogSegment segment = commitLog.encryptionContext.isEnabled() ? new EncryptedSegment(commitLog, commitLog.encryptionContext, onClose) : - commitLog.compressor != null ? new CompressedSegment(commitLog, onClose) : - new MemoryMappedSegment(commitLog); - return commitLog.configuration.useCompression() ? new CompressedSegment(commitLog, onClose) - : new MemoryMappedSegment(commitLog); ++ Configuration config = commitLog.configuration; ++ CommitLogSegment segment = config.useEncryption() ? new EncryptedSegment(commitLog, onClose) ++ : config.useCompression() ? new CompressedSegment(commitLog, onClose) ++ : new MemoryMappedSegment(commitLog); + segment.writeLogHeader(); + return segment; } /** @@@ -137,7 -132,7 +139,8 @@@ */ static boolean usesBufferPool(CommitLog commitLog) { - return commitLog.encryptionContext.isEnabled() || commitLog.compressor != null; - return commitLog.configuration.useCompression(); ++ Configuration config = commitLog.configuration; ++ return config.useEncryption() || config.useCompression(); } static long getNextId() @@@ -152,7 -149,7 +155,9 @@@ { this.commitLog = commitLog; id = getNextId(); - descriptor = new CommitLogDescriptor(id, commitLog.compressorClass, commitLog.encryptionContext); - descriptor = new CommitLogDescriptor(id, commitLog.configuration.getCompressorClass()); ++ descriptor = new CommitLogDescriptor(id, ++ commitLog.configuration.getCompressorClass(), ++ commitLog.configuration.getEncryptionContext()); logFile = new File(commitLog.location, descriptor.fileName()); try http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc6ffc25/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc6ffc25/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java index 573428a,c73a30a..684fc2c --- a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java +++ b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java @@@ -46,8 -68,18 +46,8 @@@ public class CompressedSegment extends */ CompressedSegment(CommitLog commitLog, Runnable onClose) { - super(commitLog); + super(commitLog, onClose); - this.compressor = commitLog.compressor; + this.compressor = commitLog.configuration.getCompressor(); - this.onClose = onClose; - try - { - channel.write((ByteBuffer) buffer.duplicate().flip()); - commitLog.allocator.addSize(lastWrittenPos = buffer.position()); - } - catch (IOException e) - { - throw new FSWriteError(e, getPath()); - } } ByteBuffer allocate(int size) @@@ -57,9 -89,21 +57,9 @@@ ByteBuffer createBuffer(CommitLog commitLog) { - return createBuffer(commitLog.compressor.preferredBufferType()); - usedBuffers.incrementAndGet(); - ByteBuffer buf = bufferPool.poll(); - if (buf == null) - { - // this.compressor is not yet set, so we must use the commitLog's one. - buf = commitLog.configuration.getCompressor() - .preferredBufferType() - .allocate(DatabaseDescriptor.getCommitLogSegmentSize()); - } else - buf.clear(); - return buf; ++ return createBuffer(commitLog.configuration.getCompressor().preferredBufferType()); } - static long startMillis = System.currentTimeMillis(); - @Override void write(int startMarker, int nextMarker) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc6ffc25/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java index 731dea4,0000000..c34a365 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java +++ b/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java @@@ -1,161 -1,0 +1,161 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.commitlog; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Map; +import javax.crypto.Cipher; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.io.FSWriteError; +import org.apache.cassandra.io.compress.BufferType; +import org.apache.cassandra.io.compress.ICompressor; +import org.apache.cassandra.security.EncryptionUtils; +import org.apache.cassandra.security.EncryptionContext; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.Hex; +import org.apache.cassandra.utils.SyncUtil; + +import static org.apache.cassandra.security.EncryptionUtils.ENCRYPTED_BLOCK_HEADER_SIZE; + +/** + * Writes encrypted segments to disk. Data is compressed before encrypting to (hopefully) reduce the size of the data into + * the encryption algorithms. + * + * The format of the encrypted commit log is as follows: + * - standard commit log header (as written by {@link CommitLogDescriptor#writeHeader(ByteBuffer, CommitLogDescriptor)}) + * - a series of 'sync segments' that are written every time the commit log is sync()'ed + * -- a sync section header, see {@link CommitLogSegment#writeSyncMarker(ByteBuffer, int, int, int)} + * -- total plain text length for this section + * -- a series of encrypted data blocks, each of which contains: + * --- the length of the encrypted block (cipher text) + * --- the length of the unencrypted data (compressed text) + * --- the encrypted block, which contains: + * ---- the length of the plain text (raw) data + * ---- block of compressed data + * + * Notes: + * - "length of the unencrypted data" is different from the length of resulting decrypted buffer as encryption adds padding + * to the output buffer, and we need to ignore that padding when processing. + */ +public class EncryptedSegment extends FileDirectSegment +{ + private static final Logger logger = LoggerFactory.getLogger(EncryptedSegment.class); + + private static final int ENCRYPTED_SECTION_HEADER_SIZE = SYNC_MARKER_SIZE + 4; + + private final EncryptionContext encryptionContext; + private final Cipher cipher; + - public EncryptedSegment(CommitLog commitLog, EncryptionContext encryptionContext, Runnable onClose) ++ public EncryptedSegment(CommitLog commitLog, Runnable onClose) + { + super(commitLog, onClose); - this.encryptionContext = encryptionContext; ++ this.encryptionContext = commitLog.configuration.getEncryptionContext(); + + try + { + cipher = encryptionContext.getEncryptor(); + } + catch (IOException e) + { + throw new FSWriteError(e, logFile); + } + logger.debug("created a new encrypted commit log segment: {}", logFile); + } + + protected Map<String, String> additionalHeaderParameters() + { + Map<String, String> map = encryptionContext.toHeaderParameters(); + map.put(EncryptionContext.ENCRYPTION_IV, Hex.bytesToHex(cipher.getIV())); + return map; + } + + ByteBuffer createBuffer(CommitLog commitLog) + { + //Note: we want to keep the compression buffers on-heap as we need those bytes for encryption, + // and we want to avoid copying from off-heap (compression buffer) to on-heap encryption APIs + return createBuffer(BufferType.ON_HEAP); + } + + void write(int startMarker, int nextMarker) + { + int contentStart = startMarker + SYNC_MARKER_SIZE; + final int length = nextMarker - contentStart; + // The length may be 0 when the segment is being closed. + assert length > 0 || length == 0 && !isStillAllocating(); + + final ICompressor compressor = encryptionContext.getCompressor(); + final int blockSize = encryptionContext.getChunkLength(); + try + { + ByteBuffer inputBuffer = buffer.duplicate(); + inputBuffer.limit(contentStart + length).position(contentStart); + ByteBuffer buffer = reusableBufferHolder.get(); + + // save space for the sync marker at the beginning of this section + final long syncMarkerPosition = lastWrittenPos; + channel.position(syncMarkerPosition + ENCRYPTED_SECTION_HEADER_SIZE); + + // loop over the segment data in encryption buffer sized chunks + while (contentStart < nextMarker) + { + int nextBlockSize = nextMarker - blockSize > contentStart ? blockSize : nextMarker - contentStart; + ByteBuffer slice = inputBuffer.duplicate(); + slice.limit(contentStart + nextBlockSize).position(contentStart); + + buffer = EncryptionUtils.compress(slice, buffer, true, compressor); + + // reuse the same buffer for the input and output of the encryption operation + buffer = EncryptionUtils.encryptAndWrite(buffer, channel, true, cipher); + + contentStart += nextBlockSize; + commitLog.allocator.addSize(buffer.limit() + ENCRYPTED_BLOCK_HEADER_SIZE); + } + + lastWrittenPos = channel.position(); + + // rewind to the beginning of the section and write out the sync marker, + // reusing the one of the existing buffers + buffer = ByteBufferUtil.ensureCapacity(buffer, ENCRYPTED_SECTION_HEADER_SIZE, true); + writeSyncMarker(buffer, 0, (int) syncMarkerPosition, (int) lastWrittenPos); + buffer.putInt(SYNC_MARKER_SIZE, length); + buffer.position(0).limit(ENCRYPTED_SECTION_HEADER_SIZE); + commitLog.allocator.addSize(buffer.limit()); + + channel.position(syncMarkerPosition); + channel.write(buffer); + + SyncUtil.force(channel, true); + + if (reusableBufferHolder.get().capacity() < buffer.capacity()) + reusableBufferHolder.set(buffer); + } + catch (Exception e) + { + throw new FSWriteError(e, getPath()); + } + } + + public long onDiskSize() + { + return lastWrittenPos; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc6ffc25/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java ---------------------------------------------------------------------- diff --cc test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java index 8e45eea,d517055..0474b32 --- a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java +++ b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java @@@ -200,43 -198,34 +200,43 @@@ public class CommitLogStressTes DatabaseDescriptor.setCommitLogSyncBatchWindow(1); DatabaseDescriptor.setCommitLogSyncPeriod(30); DatabaseDescriptor.setCommitLogSegmentSize(32); - for (ParameterizedClass compressor : new ParameterizedClass[] { - null, - new ParameterizedClass("LZ4Compressor", null), - new ParameterizedClass("SnappyCompressor", null), - new ParameterizedClass("DeflateCompressor", null) }) + + // test plain vanilla commit logs (the choice of 98% of users) + testLog(null, EncryptionContextGenerator.createDisabledContext()); + + // test the compression types + testLog(new ParameterizedClass("LZ4Compressor", null), EncryptionContextGenerator.createDisabledContext()); + testLog(new ParameterizedClass("SnappyCompressor", null), EncryptionContextGenerator.createDisabledContext()); + testLog(new ParameterizedClass("DeflateCompressor", null), EncryptionContextGenerator.createDisabledContext()); + + // test the encrypted commit log + testLog(null, EncryptionContextGenerator.createContext(true)); + } + + public void testLog(ParameterizedClass compression, EncryptionContext encryptionContext) throws IOException, InterruptedException + { + DatabaseDescriptor.setCommitLogCompression(compression); + DatabaseDescriptor.setEncryptionContext(encryptionContext); + for (CommitLogSync sync : CommitLogSync.values()) { - DatabaseDescriptor.setCommitLogCompression(compressor); - for (CommitLogSync sync : CommitLogSync.values()) - { - DatabaseDescriptor.setCommitLogSync(sync); - CommitLog commitLog = new CommitLog(location, CommitLogArchiver.disabled()).start(); - testLog(commitLog); - } + DatabaseDescriptor.setCommitLogSync(sync); + CommitLog commitLog = new CommitLog(location, CommitLogArchiver.disabled()).start(); + testLog(commitLog); + assert !failed; } - assert !failed; } - public void testLog(CommitLog commitLog) throws IOException, InterruptedException - { - System.out.format("\nTesting commit log size %.0fmb, compressor %s, sync %s%s%s\n", - mb(DatabaseDescriptor.getCommitLogSegmentSize()), - commitLog.configuration.getCompressorName(), - commitLog.executor.getClass().getSimpleName(), - randomSize ? " random size" : "", - discardedRun ? " with discarded run" : ""); + public void testLog(CommitLog commitLog) throws IOException, InterruptedException { + System.out.format("\nTesting commit log size %.0fmb, compressor: %s, encryption enabled: %b, sync %s%s%s\n", + mb(DatabaseDescriptor.getCommitLogSegmentSize()), - commitLog.compressor != null ? commitLog.compressor.getClass().getSimpleName() : "none", - commitLog.encryptionContext.isEnabled(), ++ commitLog.configuration.getCompressorName(), ++ commitLog.configuration.useEncryption(), + commitLog.executor.getClass().getSimpleName(), + randomSize ? " random size" : "", + discardedRun ? " with discarded run" : ""); commitLog.allocator.enableReserveSegmentCreation(); - - final List<CommitlogExecutor> threads = new ArrayList<>(); + + final List<CommitlogThread> threads = new ArrayList<>(); ScheduledExecutorService scheduled = startThreads(commitLog, threads); discardedPos = ReplayPosition.NONE; @@@ -294,17 -282,14 +294,17 @@@ Assert.fail("Failed to delete " + f); if (hash == repl.hash && cells == repl.cells) - System.out.println("Test success."); + System.out.format("Test success. compressor = %s, encryption enabled = %b; discarded = %d, skipped = %d\n", - commitLog.compressor != null ? commitLog.compressor.getClass().getSimpleName() : "none", - commitLog.encryptionContext.isEnabled(), ++ commitLog.configuration.getCompressorName(), ++ commitLog.configuration.useEncryption(), + repl.discarded, repl.skipped); else { - System.out.format("Test failed. Cells %d expected %d, hash %d expected %d.\n", - repl.cells, - cells, - repl.hash, - hash); + System.out.format("Test failed (compressor = %s, encryption enabled = %b). Cells %d, expected %d, diff %d; discarded = %d, skipped = %d - hash %d expected %d.\n", - commitLog.compressor != null ? commitLog.compressor.getClass().getSimpleName() : "none", - commitLog.encryptionContext.isEnabled(), ++ commitLog.configuration.getCompressorName(), ++ commitLog.configuration.useEncryption(), + repl.cells, cells, cells - repl.cells, repl.discarded, repl.skipped, + repl.hash, hash); failed = true; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc6ffc25/test/unit/org/apache/cassandra/db/RecoveryManagerFlushedTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/RecoveryManagerFlushedTest.java index e24af0f,d06c112..86fa5b4 --- a/test/unit/org/apache/cassandra/db/RecoveryManagerFlushedTest.java +++ b/test/unit/org/apache/cassandra/db/RecoveryManagerFlushedTest.java @@@ -25,13 -34,19 +34,21 @@@ import org.slf4j.Logger import org.slf4j.LoggerFactory; import org.apache.cassandra.SchemaLoader; + import org.apache.cassandra.config.DatabaseDescriptor; + import org.apache.cassandra.config.ParameterizedClass; -import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.exceptions.ConfigurationException; + import org.apache.cassandra.io.compress.DeflateCompressor; + import org.apache.cassandra.io.compress.LZ4Compressor; + import org.apache.cassandra.io.compress.SnappyCompressor; import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.schema.SchemaKeyspace; ++import org.apache.cassandra.security.EncryptionContext; ++import org.apache.cassandra.security.EncryptionContextGenerator; import org.apache.cassandra.utils.FBUtilities; + @RunWith(Parameterized.class) public class RecoveryManagerFlushedTest { private static Logger logger = LoggerFactory.getLogger(RecoveryManagerFlushedTest.class); @@@ -40,14 -55,35 +57,37 @@@ private static final String CF_STANDARD1 = "Standard1"; private static final String CF_STANDARD2 = "Standard2"; - @BeforeClass - public static void defineSchema() throws ConfigurationException ++ public RecoveryManagerFlushedTest(ParameterizedClass commitLogCompression, EncryptionContext encryptionContext) + { - SchemaLoader.prepareServer(); - SchemaLoader.createKeyspace(KEYSPACE1, - KeyspaceParams.simple(1), - SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD1), - SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD2)); ++ DatabaseDescriptor.setCommitLogCompression(commitLogCompression); ++ DatabaseDescriptor.setEncryptionContext(encryptionContext); + } + - public RecoveryManagerFlushedTest(ParameterizedClass commitLogCompression) ++ @Parameters() ++ public static Collection<Object[]> generateData() + { - DatabaseDescriptor.setCommitLogCompression(commitLogCompression); ++ return Arrays.asList(new Object[][]{ ++ {null, EncryptionContextGenerator.createDisabledContext()}, // No compression, no encryption ++ {null, EncryptionContextGenerator.createContext(true)}, // Encryption ++ {new ParameterizedClass(LZ4Compressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()}, ++ {new ParameterizedClass(SnappyCompressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()}, ++ {new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()}}); + } + + @Before + public void setUp() throws IOException + { + CommitLog.instance.resetUnsafe(true); + } + - @Parameters() - public static Collection<Object[]> generateData() + @BeforeClass + public static void defineSchema() throws ConfigurationException { - return Arrays.asList(new Object[][] { - { null }, // No compression - { new ParameterizedClass(LZ4Compressor.class.getName(), Collections.emptyMap()) }, - { new ParameterizedClass(SnappyCompressor.class.getName(), Collections.emptyMap()) }, - { new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()) } }); + SchemaLoader.prepareServer(); + SchemaLoader.createKeyspace(KEYSPACE1, + KeyspaceParams.simple(1), + SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD1), + SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD2)); } @Test http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc6ffc25/test/unit/org/apache/cassandra/db/RecoveryManagerMissingHeaderTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/RecoveryManagerMissingHeaderTest.java index 9275dae,8ac7c5d..a67e9e5 --- a/test/unit/org/apache/cassandra/db/RecoveryManagerMissingHeaderTest.java +++ b/test/unit/org/apache/cassandra/db/RecoveryManagerMissingHeaderTest.java @@@ -28,13 -35,17 +35,19 @@@ import org.junit.runners.Parameterized. import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; import org.apache.cassandra.config.DatabaseDescriptor; - import org.apache.cassandra.db.rows.AbstractUnfilteredRowIterator; + import org.apache.cassandra.config.ParameterizedClass; -import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.exceptions.ConfigurationException; + import org.apache.cassandra.io.compress.DeflateCompressor; + import org.apache.cassandra.io.compress.LZ4Compressor; + import org.apache.cassandra.io.compress.SnappyCompressor; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.schema.KeyspaceParams; ++import org.apache.cassandra.security.EncryptionContext; ++import org.apache.cassandra.security.EncryptionContextGenerator; + @RunWith(Parameterized.class) public class RecoveryManagerMissingHeaderTest { private static final String KEYSPACE1 = "RecoveryManager3Test1"; @@@ -43,6 -54,27 +56,29 @@@ private static final String KEYSPACE2 = "RecoveryManager3Test2"; private static final String CF_STANDARD3 = "Standard3"; - public RecoveryManagerMissingHeaderTest(ParameterizedClass commitLogCompression) ++ public RecoveryManagerMissingHeaderTest(ParameterizedClass commitLogCompression, EncryptionContext encryptionContext) + { + DatabaseDescriptor.setCommitLogCompression(commitLogCompression); ++ DatabaseDescriptor.setEncryptionContext(encryptionContext); + } + - @Before - public void setUp() throws IOException ++ @Parameters() ++ public static Collection<Object[]> generateData() + { - CommitLog.instance.resetUnsafe(true); ++ return Arrays.asList(new Object[][]{ ++ {null, EncryptionContextGenerator.createDisabledContext()}, // No compression, no encryption ++ {null, EncryptionContextGenerator.createContext(true)}, // Encryption ++ {new ParameterizedClass(LZ4Compressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()}, ++ {new ParameterizedClass(SnappyCompressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()}, ++ {new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()}}); + } + - @Parameters() - public static Collection<Object[]> generateData() ++ @Before ++ public void setUp() throws IOException + { - return Arrays.asList(new Object[][] { - { null }, // No compression - { new ParameterizedClass(LZ4Compressor.class.getName(), Collections.emptyMap()) }, - { new ParameterizedClass(SnappyCompressor.class.getName(), Collections.emptyMap()) }, - { new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()) } }); ++ CommitLog.instance.resetUnsafe(true); + } + @BeforeClass public static void defineSchema() throws ConfigurationException { http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc6ffc25/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/RecoveryManagerTest.java index 5ac53f6,397030a..37d719e --- a/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java +++ b/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java @@@ -19,40 -19,43 +19,51 @@@ package org.apache.cassandra.db; import java.io.IOException; + import java.util.Arrays; + import java.util.Collection; + import java.util.Collections; import java.util.Date; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; - -import org.junit.Assert; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.cassandra.OrderedJUnit4ClassRunner; -import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; import org.apache.cassandra.config.ColumnDefinition; + import org.apache.cassandra.config.DatabaseDescriptor; + import org.apache.cassandra.config.ParameterizedClass; -import org.apache.cassandra.db.commitlog.CommitLog; -import org.apache.cassandra.db.commitlog.CommitLogArchiver; +import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.context.CounterContext; -import org.apache.cassandra.db.rows.Row; -import org.apache.cassandra.db.rows.UnfilteredRowIterator; import org.apache.cassandra.exceptions.ConfigurationException; + import org.apache.cassandra.io.compress.DeflateCompressor; + import org.apache.cassandra.io.compress.LZ4Compressor; + import org.apache.cassandra.io.compress.SnappyCompressor; -import org.apache.cassandra.schema.KeyspaceParams; -import org.apache.cassandra.utils.ByteBufferUtil; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; ++import org.junit.runners.Parameterized; ++import org.junit.runners.Parameterized.Parameters; import static org.junit.Assert.assertEquals; +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.db.commitlog.CommitLog; +import org.apache.cassandra.db.commitlog.CommitLogArchiver; +import org.apache.cassandra.schema.KeyspaceParams; ++import org.apache.cassandra.security.EncryptionContext; ++import org.apache.cassandra.security.EncryptionContextGenerator; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.db.commitlog.CommitLogReplayer; + - @RunWith(OrderedJUnit4ClassRunner.class) + @RunWith(Parameterized.class) public class RecoveryManagerTest { private static Logger logger = LoggerFactory.getLogger(RecoveryManagerTest.class); @@@ -123,6 -67,6 +75,29 @@@ private static final String KEYSPACE2 = "RecoveryManagerTest2"; private static final String CF_STANDARD3 = "Standard3"; ++ public RecoveryManagerTest(ParameterizedClass commitLogCompression, EncryptionContext encryptionContext) ++ { ++ DatabaseDescriptor.setCommitLogCompression(commitLogCompression); ++ DatabaseDescriptor.setEncryptionContext(encryptionContext); ++ } ++ ++ @Parameters() ++ public static Collection<Object[]> generateData() ++ { ++ return Arrays.asList(new Object[][]{ ++ {null, EncryptionContextGenerator.createDisabledContext()}, // No compression, no encryption ++ {null, EncryptionContextGenerator.createContext(true)}, // Encryption ++ {new ParameterizedClass(LZ4Compressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()}, ++ {new ParameterizedClass(SnappyCompressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()}, ++ {new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()}}); ++ } ++ ++ @Before ++ public void setUp() throws IOException ++ { ++ CommitLog.instance.resetUnsafe(true); ++ } ++ @BeforeClass public static void defineSchema() throws ConfigurationException { @@@ -139,6 -83,6 +114,7 @@@ @Before public void clearData() { ++ // clear data Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1).truncateBlocking(); Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_COUNTER1).truncateBlocking(); Keyspace.open(KEYSPACE2).getColumnFamilyStore(CF_STANDARD3).truncateBlocking(); @@@ -151,77 -103,11 +127,78 @@@ } @Test - public void testNothingToRecover() throws IOException + public void testRecoverBlocksOnBytesOutstanding() throws Exception { - CommitLog.instance.resetUnsafe(true); + long originalMaxOutstanding = CommitLogReplayer.MAX_OUTSTANDING_REPLAY_BYTES; + CommitLogReplayer.MAX_OUTSTANDING_REPLAY_BYTES = 1; + CommitLogReplayer.MutationInitiator originalInitiator = CommitLogReplayer.mutationInitiator; ++ MockInitiator mockInitiator = new MockInitiator(); + CommitLogReplayer.mutationInitiator = mockInitiator; + try + { + CommitLog.instance.resetUnsafe(true); + Keyspace keyspace1 = Keyspace.open(KEYSPACE1); + Keyspace keyspace2 = Keyspace.open(KEYSPACE2); + + UnfilteredRowIterator upd1 = Util.apply(new RowUpdateBuilder(keyspace1.getColumnFamilyStore(CF_STANDARD1).metadata, 1L, 0, "keymulti") + .clustering("col1").add("val", "1") + .build()); + + UnfilteredRowIterator upd2 = Util.apply(new RowUpdateBuilder(keyspace2.getColumnFamilyStore(CF_STANDARD3).metadata, 1L, 0, "keymulti") + .clustering("col2").add("val", "1") + .build()); + + keyspace1.getColumnFamilyStore("Standard1").clearUnsafe(); + keyspace2.getColumnFamilyStore("Standard3").clearUnsafe(); + + DecoratedKey dk = Util.dk("keymulti"); + Assert.assertTrue(Util.getAllUnfiltered(Util.cmd(keyspace1.getColumnFamilyStore(CF_STANDARD1), dk).build()).isEmpty()); + Assert.assertTrue(Util.getAllUnfiltered(Util.cmd(keyspace2.getColumnFamilyStore(CF_STANDARD3), dk).build()).isEmpty()); + + final AtomicReference<Throwable> err = new AtomicReference<Throwable>(); + Thread t = new Thread() { + @Override + public void run() + { + try + { + CommitLog.instance.resetUnsafe(false); // disassociate segments from live CL + } + catch (Throwable t) + { + err.set(t); + } + } + }; + t.start(); - Assert.assertTrue(blocked.tryAcquire(1, 20, TimeUnit.SECONDS)); ++ Assert.assertTrue(mockInitiator.blocked.tryAcquire(1, 20, TimeUnit.SECONDS)); + Thread.sleep(100); + Assert.assertTrue(t.isAlive()); - blocker.release(Integer.MAX_VALUE); ++ mockInitiator.blocker.release(Integer.MAX_VALUE); + t.join(20 * 1000); + + if (err.get() != null) + throw new RuntimeException(err.get()); + + if (t.isAlive()) + { + Throwable toPrint = new Throwable(); + toPrint.setStackTrace(Thread.getAllStackTraces().get(t)); + toPrint.printStackTrace(System.out); + } + Assert.assertFalse(t.isAlive()); + + Assert.assertTrue(Util.equal(upd1, Util.getOnlyPartitionUnfiltered(Util.cmd(keyspace1.getColumnFamilyStore(CF_STANDARD1), dk).build()).unfilteredIterator())); + Assert.assertTrue(Util.equal(upd2, Util.getOnlyPartitionUnfiltered(Util.cmd(keyspace2.getColumnFamilyStore(CF_STANDARD3), dk).build()).unfilteredIterator())); + } + finally + { + CommitLogReplayer.mutationInitiator = originalInitiator; + CommitLogReplayer.MAX_OUTSTANDING_REPLAY_BYTES = originalMaxOutstanding; + } } + @Test public void testOne() throws IOException { @@@ -273,8 -159,8 +250,8 @@@ @Test public void testRecoverPIT() throws Exception { -- ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1); CommitLog.instance.resetUnsafe(true); ++ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1); Date date = CommitLogArchiver.format.parse("2112:12:12 12:12:12"); long timeMS = date.getTime() - 5000; @@@ -301,8 -187,8 +278,8 @@@ @Test public void testRecoverPITUnordered() throws Exception { -- ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1); CommitLog.instance.resetUnsafe(true); ++ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1); Date date = CommitLogArchiver.format.parse("2112:12:12 12:12:12"); long timeMS = date.getTime(); @@@ -332,4 -218,4 +309,64 @@@ assertEquals(2, Util.getAll(Util.cmd(cfs).build()).size()); } ++ ++ private static class MockInitiator extends CommitLogReplayer.MutationInitiator ++ { ++ final Semaphore blocker = new Semaphore(0); ++ final Semaphore blocked = new Semaphore(0); ++ ++ @Override ++ protected Future<Integer> initiateMutation(final Mutation mutation, ++ final long segmentId, ++ final int serializedSize, ++ final int entryLocation, ++ final CommitLogReplayer clr) ++ { ++ final Future<Integer> toWrap = super.initiateMutation(mutation, ++ segmentId, ++ serializedSize, ++ entryLocation, ++ clr); ++ return new Future<Integer>() ++ { ++ ++ @Override ++ public boolean cancel(boolean mayInterruptIfRunning) ++ { ++ throw new UnsupportedOperationException(); ++ } ++ ++ @Override ++ public boolean isCancelled() ++ { ++ throw new UnsupportedOperationException(); ++ } ++ ++ @Override ++ public boolean isDone() ++ { ++ return blocker.availablePermits() > 0 && toWrap.isDone(); ++ } ++ ++ @Override ++ public Integer get() throws InterruptedException, ExecutionException ++ { ++ System.out.println("Got blocker once"); ++ blocked.release(); ++ blocker.acquire(); ++ return toWrap.get(); ++ } ++ ++ @Override ++ public Integer get(long timeout, TimeUnit unit) ++ throws InterruptedException, ExecutionException, TimeoutException ++ { ++ blocked.release(); ++ blocker.tryAcquire(1, timeout, unit); ++ return toWrap.get(timeout, unit); ++ } ++ ++ }; ++ } ++ }; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc6ffc25/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java index 7c8ab7d,5a59f1c..738888f --- a/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java +++ b/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java @@@ -19,17 -19,29 +19,31 @@@ package org.apache.cassandra.db; import java.io.IOException; + import java.util.Arrays; + import java.util.Collection; + import java.util.Collections; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; - import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; + import org.apache.cassandra.config.DatabaseDescriptor; + import org.apache.cassandra.config.ParameterizedClass; import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.exceptions.ConfigurationException; + import org.apache.cassandra.io.compress.DeflateCompressor; + import org.apache.cassandra.io.compress.LZ4Compressor; + import org.apache.cassandra.io.compress.SnappyCompressor; import org.apache.cassandra.schema.KeyspaceParams; ++import org.apache.cassandra.security.EncryptionContext; ++import org.apache.cassandra.security.EncryptionContextGenerator; + ++import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; ++import org.junit.runner.RunWith; ++import org.junit.runners.Parameterized; ++import org.junit.runners.Parameterized.Parameters; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; /** * Test for the truncate operation. @@@ -39,6 -52,27 +54,29 @@@ public class RecoveryManagerTruncateTes private static final String KEYSPACE1 = "RecoveryManagerTruncateTest"; private static final String CF_STANDARD1 = "Standard1"; - public RecoveryManagerTruncateTest(ParameterizedClass commitLogCompression) ++ public RecoveryManagerTruncateTest(ParameterizedClass commitLogCompression, EncryptionContext encryptionContext) + { + DatabaseDescriptor.setCommitLogCompression(commitLogCompression); ++ DatabaseDescriptor.setEncryptionContext(encryptionContext); + } + - @Before - public void setUp() throws IOException ++ @Parameters() ++ public static Collection<Object[]> generateData() + { - CommitLog.instance.resetUnsafe(true); ++ return Arrays.asList(new Object[][]{ ++ {null, EncryptionContextGenerator.createDisabledContext()}, // No compression, no encryption ++ {null, EncryptionContextGenerator.createContext(true)}, // Encryption ++ {new ParameterizedClass(LZ4Compressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()}, ++ {new ParameterizedClass(SnappyCompressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()}, ++ {new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()}}); + } + - @Parameters() - public static Collection<Object[]> generateData() ++ @Before ++ public void setUp() throws IOException + { - return Arrays.asList(new Object[][] { - { null }, // No compression - { new ParameterizedClass(LZ4Compressor.class.getName(), Collections.emptyMap()) }, - { new ParameterizedClass(SnappyCompressor.class.getName(), Collections.emptyMap()) }, - { new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()) } }); ++ CommitLog.instance.resetUnsafe(true); + } + @BeforeClass public static void defineSchema() throws ConfigurationException { http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc6ffc25/test/unit/org/apache/cassandra/db/commitlog/CommitLogDescriptorTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogDescriptorTest.java index ab9cb6f,898c19f..fdedafd --- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogDescriptorTest.java +++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogDescriptorTest.java @@@ -15,6 -15,6 +15,7 @@@ * See the License for the specific language governing permissions and * limitations under the License. */ ++ package org.apache.cassandra.db.commitlog; import java.io.IOException; @@@ -117,195 -83,20 +118,195 @@@ public class CommitLogDescriptorTes @Test public void testDescriptorInvalidParametersSize() throws IOException { - final int numberOfParameters = 65535; - Map<String, String> params = new HashMap<>(numberOfParameters); - for (int i=0; i<numberOfParameters; ++i) + Map<String, String> params = new HashMap<>(); + for (int i=0; i<65535; ++i) params.put("key"+i, Integer.toString(i, 16)); try { - CommitLogDescriptor desc = new CommitLogDescriptor(CommitLogDescriptor.VERSION_30, + CommitLogDescriptor desc = new CommitLogDescriptor(CommitLogDescriptor.VERSION_22, 21, - new ParameterizedClass("LZ4Compressor", params)); + new ParameterizedClass("LZ4Compressor", params), + neverEnabledEncryption); ++ ByteBuffer buf = ByteBuffer.allocate(1024000); CommitLogDescriptor.writeHeader(buf, desc); - fail("Parameter object too long should fail on writing descriptor."); + Assert.fail("Parameter object too long should fail on writing descriptor."); } catch (ConfigurationException e) { // correct path } } + + @Test + public void constructParametersString_NoCompressionOrEncryption() + { + String json = CommitLogDescriptor.constructParametersString(null, null, Collections.emptyMap()); + Assert.assertFalse(json.contains(CommitLogDescriptor.COMPRESSION_CLASS_KEY)); + Assert.assertFalse(json.contains(EncryptionContext.ENCRYPTION_CIPHER)); + + json = CommitLogDescriptor.constructParametersString(null, neverEnabledEncryption, Collections.emptyMap()); + Assert.assertFalse(json.contains(CommitLogDescriptor.COMPRESSION_CLASS_KEY)); + Assert.assertFalse(json.contains(EncryptionContext.ENCRYPTION_CIPHER)); + } + + @Test + public void constructParametersString_WithCompressionAndEncryption() + { + String json = CommitLogDescriptor.constructParametersString(compression, enabledEncryption, Collections.emptyMap()); + Assert.assertTrue(json.contains(CommitLogDescriptor.COMPRESSION_CLASS_KEY)); + Assert.assertTrue(json.contains(EncryptionContext.ENCRYPTION_CIPHER)); + } + + @Test + public void writeAndReadHeader_NoCompressionOrEncryption() throws IOException + { + CommitLogDescriptor descriptor = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, neverEnabledEncryption); + ByteBuffer buffer = ByteBuffer.allocate(16 * 1024); + CommitLogDescriptor.writeHeader(buffer, descriptor); + buffer.flip(); + FileSegmentInputStream dataInput = new FileSegmentInputStream(buffer, null, 0); + CommitLogDescriptor result = CommitLogDescriptor.readHeader(dataInput, neverEnabledEncryption); + Assert.assertNotNull(result); + Assert.assertNull(result.compression); + Assert.assertFalse(result.getEncryptionContext().isEnabled()); + } + + @Test + public void writeAndReadHeader_OnlyCompression() throws IOException + { + CommitLogDescriptor descriptor = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, compression, neverEnabledEncryption); + ByteBuffer buffer = ByteBuffer.allocate(16 * 1024); + CommitLogDescriptor.writeHeader(buffer, descriptor); + buffer.flip(); + FileSegmentInputStream dataInput = new FileSegmentInputStream(buffer, null, 0); + CommitLogDescriptor result = CommitLogDescriptor.readHeader(dataInput, neverEnabledEncryption); + Assert.assertNotNull(result); + Assert.assertEquals(compression, result.compression); + Assert.assertFalse(result.getEncryptionContext().isEnabled()); + } + + @Test + public void writeAndReadHeader_WithEncryptionHeader_EncryptionEnabledInYaml() throws IOException + { + CommitLogDescriptor descriptor = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, enabledEncryption); + ByteBuffer buffer = ByteBuffer.allocate(16 * 1024); + CommitLogDescriptor.writeHeader(buffer, descriptor); + buffer.flip(); + FileSegmentInputStream dataInput = new FileSegmentInputStream(buffer, null, 0); + CommitLogDescriptor result = CommitLogDescriptor.readHeader(dataInput, enabledEncryption); + Assert.assertNotNull(result); + Assert.assertNull(result.compression); + Assert.assertTrue(result.getEncryptionContext().isEnabled()); + Assert.assertArrayEquals(iv, result.getEncryptionContext().getIV()); + } + + /** + * Check that even though enabledTdeOptions is disabled in the yaml, we can still read the commit log header as encrypted. + */ + @Test + public void writeAndReadHeader_WithEncryptionHeader_EncryptionDisabledInYaml() throws IOException + { + CommitLogDescriptor descriptor = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, enabledEncryption); + ByteBuffer buffer = ByteBuffer.allocate(16 * 1024); + CommitLogDescriptor.writeHeader(buffer, descriptor); + buffer.flip(); + FileSegmentInputStream dataInput = new FileSegmentInputStream(buffer, null, 0); + CommitLogDescriptor result = CommitLogDescriptor.readHeader(dataInput, previouslyEnabledEncryption); + Assert.assertNotNull(result); + Assert.assertNull(result.compression); + Assert.assertTrue(result.getEncryptionContext().isEnabled()); + Assert.assertArrayEquals(iv, result.getEncryptionContext().getIV()); + } + + /** + * Shouldn't happen in the real world (should only have either compression or enabledTdeOptions), but the header + * functionality should be correct + */ + @Test + public void writeAndReadHeader_WithCompressionAndEncryption() throws IOException + { + CommitLogDescriptor descriptor = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, compression, enabledEncryption); + ByteBuffer buffer = ByteBuffer.allocate(16 * 1024); + CommitLogDescriptor.writeHeader(buffer, descriptor); + buffer.flip(); + FileSegmentInputStream dataInput = new FileSegmentInputStream(buffer, null, 0); + CommitLogDescriptor result = CommitLogDescriptor.readHeader(dataInput, enabledEncryption); + Assert.assertNotNull(result); + Assert.assertEquals(compression, result.compression); + Assert.assertTrue(result.getEncryptionContext().isEnabled()); + Assert.assertEquals(enabledEncryption, result.getEncryptionContext()); + Assert.assertArrayEquals(iv, result.getEncryptionContext().getIV()); + } + + @Test + public void equals_NoCompressionOrEncryption() + { + CommitLogDescriptor desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, null); + Assert.assertEquals(desc1, desc1); + + CommitLogDescriptor desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, null); + Assert.assertEquals(desc1, desc2); + + desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, neverEnabledEncryption); + Assert.assertEquals(desc1, desc1); + desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, neverEnabledEncryption); + Assert.assertEquals(desc1, desc2); + + desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, previouslyEnabledEncryption); + Assert.assertEquals(desc1, desc1); + desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, previouslyEnabledEncryption); + Assert.assertEquals(desc1, desc2); + } + + @Test + public void equals_OnlyCompression() + { + CommitLogDescriptor desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, compression, null); + Assert.assertEquals(desc1, desc1); + + CommitLogDescriptor desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, compression, null); + Assert.assertEquals(desc1, desc2); + + desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, compression, neverEnabledEncryption); + Assert.assertEquals(desc1, desc1); + desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, compression, neverEnabledEncryption); + Assert.assertEquals(desc1, desc2); + + desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, compression, previouslyEnabledEncryption); + Assert.assertEquals(desc1, desc1); + desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, compression, previouslyEnabledEncryption); + Assert.assertEquals(desc1, desc2); + } + + @Test + public void equals_OnlyEncryption() + { + CommitLogDescriptor desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, enabledEncryption); + Assert.assertEquals(desc1, desc1); + + CommitLogDescriptor desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, enabledEncryption); + Assert.assertEquals(desc1, desc2); + + desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, neverEnabledEncryption); + Assert.assertEquals(desc1, desc1); + desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, neverEnabledEncryption); + Assert.assertEquals(desc1, desc2); + + desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, previouslyEnabledEncryption); + Assert.assertEquals(desc1, desc1); + desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, previouslyEnabledEncryption); + Assert.assertEquals(desc1, desc2); + } + + /** + * Shouldn't have both enabled in real life, but ensure they are correct, nonetheless + */ + @Test + public void equals_BothCompressionAndEncryption() + { + CommitLogDescriptor desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, compression, enabledEncryption); + Assert.assertEquals(desc1, desc1); + + CommitLogDescriptor desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, compression, enabledEncryption); + Assert.assertEquals(desc1, desc2); + } - } http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc6ffc25/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java index 1ea0eb1,39ba886..caa9fee --- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java +++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java @@@ -26,9 -34,13 +26,12 @@@ import java.util.concurrent.ExecutionEx import java.util.zip.CRC32; import java.util.zip.Checksum; -import org.junit.Assert; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; +import com.google.common.collect.Iterables; + +import org.junit.*; + import org.junit.runner.RunWith; + import org.junit.runners.Parameterized; + import org.junit.runners.Parameterized.Parameters; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; @@@ -46,13 -58,11 +52,16 @@@ import org.apache.cassandra.exceptions. import org.apache.cassandra.io.compress.DeflateCompressor; import org.apache.cassandra.io.compress.LZ4Compressor; import org.apache.cassandra.io.compress.SnappyCompressor; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.FastByteArrayInputStream; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.schema.KeyspaceParams; -import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.security.EncryptionContext; +import org.apache.cassandra.security.EncryptionContextGenerator; - import org.apache.cassandra.utils.*; ++import org.apache.cassandra.utils.Hex; + import org.apache.cassandra.utils.JVMStabilityInspector; + import org.apache.cassandra.utils.KillerForTests; ++import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.vint.VIntCoding; import static org.apache.cassandra.utils.ByteBufferUtil.bytes; @@@ -66,7 -77,26 +76,22 @@@ public class CommitLogTes private static final String STANDARD1 = "Standard1"; private static final String STANDARD2 = "Standard2"; - String logDirectory; - public CommitLogTest(ParameterizedClass commitLogCompression) ++ public CommitLogTest(ParameterizedClass commitLogCompression, EncryptionContext encryptionContext) + { + DatabaseDescriptor.setCommitLogCompression(commitLogCompression); - } - - @Before - public void setUp() throws IOException - { - CommitLog.instance.resetUnsafe(true); ++ DatabaseDescriptor.setEncryptionContext(encryptionContext); + } + + @Parameters() + public static Collection<Object[]> generateData() + { - return Arrays.asList(new Object[][] { - { null }, // No compression - { new ParameterizedClass(LZ4Compressor.class.getName(), Collections.emptyMap()) }, - { new ParameterizedClass(SnappyCompressor.class.getName(), Collections.emptyMap()) }, - { new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()) } }); ++ return Arrays.asList(new Object[][]{ ++ {null, EncryptionContextGenerator.createDisabledContext()}, // No compression, no encryption ++ {null, EncryptionContextGenerator.createContext(true)}, // Encryption ++ {new ParameterizedClass(LZ4Compressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()}, ++ {new ParameterizedClass(SnappyCompressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()}, ++ {new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()}}); + } @BeforeClass public static void defineSchema() throws ConfigurationException @@@ -83,13 -113,6 +108,12 @@@ CompactionManager.instance.disableAutoCompaction(); } + @Before + public void setup() throws IOException + { - logDirectory = DatabaseDescriptor.getCommitLogLocation() + "/unit"; - new File(logDirectory).mkdirs(); ++ CommitLog.instance.resetUnsafe(true); + } + @Test public void testRecoveryWithEmptyLog() throws Exception { @@@ -302,17 -330,25 +322,16 @@@ CommitLog.instance.add(rm); } - @Test + @Test(expected = IllegalArgumentException.class) public void testExceedRecordLimit() throws Exception { -- CommitLog.instance.resetUnsafe(true); ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1); - try - { - Mutation rm = new RowUpdateBuilder(cfs.metadata, 0, "k") - .clustering("bytes") - .add("val", ByteBuffer.allocate(1 + getMaxRecordDataSize())) - .build(); - CommitLog.instance.add(rm); - throw new AssertionError("mutation larger than limit was accepted"); - } - catch (IllegalArgumentException e) - { - // IAE is thrown on too-large mutations - } + Mutation rm = new RowUpdateBuilder(cfs.metadata, 0, "k") + .clustering("bytes") + .add("val", ByteBuffer.allocate(1 + getMaxRecordDataSize())) + .build(); + CommitLog.instance.add(rm); + throw new AssertionError("mutation larger than limit was accepted"); } protected void testRecoveryWithBadSizeArgument(int size, int dataSize) throws Exception @@@ -333,49 -369,10 +352,50 @@@ testRecovery(out.toByteArray(), CommitLogReplayException.class); } + /** + * Create a temporary commit log file with an appropriate descriptor at the head. + * + * @return the commit log file reference and the first position after the descriptor in the file + * (so that subsequent writes happen at the correct file location). + */ + protected Pair<File, Integer> tmpFile() throws IOException + { + EncryptionContext encryptionContext = DatabaseDescriptor.getEncryptionContext(); + CommitLogDescriptor desc = new CommitLogDescriptor(CommitLogDescriptor.current_version, + CommitLogSegment.getNextId(), + DatabaseDescriptor.getCommitLogCompression(), + encryptionContext); + - // if we're testing encryption, we need to write out a cipher IV to the descriptor headers - Map<String, String> additionalHeaders = new HashMap<>(); - if (encryptionContext.isEnabled()) - { - byte[] buf = new byte[16]; - new Random().nextBytes(buf); - additionalHeaders.put(EncryptionContext.ENCRYPTION_IV, Hex.bytesToHex(buf)); - } + + ByteBuffer buf = ByteBuffer.allocate(1024); - CommitLogDescriptor.writeHeader(buf, desc, additionalHeaders); ++ CommitLogDescriptor.writeHeader(buf, desc, getAdditionalHeaders(encryptionContext)); + buf.flip(); + int positionAfterHeader = buf.limit() + 1; + - File logFile = new File(logDirectory, desc.fileName()); - logFile.deleteOnExit(); ++ File logFile = new File(DatabaseDescriptor.getCommitLogLocation(), desc.fileName()); + + try (OutputStream lout = new FileOutputStream(logFile)) + { + lout.write(buf.array(), 0, buf.limit()); + } + + return Pair.create(logFile, positionAfterHeader); + } + ++ private Map<String, String> getAdditionalHeaders(EncryptionContext encryptionContext) ++ { ++ if (!encryptionContext.isEnabled()) ++ return Collections.emptyMap(); ++ ++ // if we're testing encryption, we need to write out a cipher IV to the descriptor headers ++ byte[] buf = new byte[16]; ++ new Random().nextBytes(buf); ++ return Collections.singletonMap(EncryptionContext.ENCRYPTION_IV, Hex.bytesToHex(buf)); ++ } ++ protected File tmpFile(int version) throws IOException { File logFile = File.createTempFile("CommitLog-" + version + "-", ".log"); -- logFile.deleteOnExit(); assert logFile.length() == 0; return logFile; } @@@ -397,9 -394,9 +417,9 @@@ File logFile = tmpFile(desc.version); CommitLogDescriptor fromFile = CommitLogDescriptor.fromFileName(logFile.getName()); // Change id to match file. - desc = new CommitLogDescriptor(desc.version, fromFile.id, desc.compression); + desc = new CommitLogDescriptor(desc.version, fromFile.id, desc.compression, desc.getEncryptionContext()); ByteBuffer buf = ByteBuffer.allocate(1024); -- CommitLogDescriptor.writeHeader(buf, desc); ++ CommitLogDescriptor.writeHeader(buf, desc, getAdditionalHeaders(desc.getEncryptionContext())); try (OutputStream lout = new FileOutputStream(logFile)) { lout.write(buf.array(), 0, buf.position()); @@@ -440,11 -437,11 +460,8 @@@ protected void runExpecting(Callable<Void> r, Class<?> expected) { -- JVMStabilityInspector.Killer originalKiller; -- KillerForTests killerForTests; -- -- killerForTests = new KillerForTests(); -- originalKiller = JVMStabilityInspector.replaceKiller(killerForTests); ++ KillerForTests killerForTests = new KillerForTests(); ++ JVMStabilityInspector.Killer originalKiller = JVMStabilityInspector.replaceKiller(killerForTests); Throwable caught = null; try @@@ -466,21 -463,8 +483,23 @@@ protected void testRecovery(final byte[] logData, Class<?> expected) throws Exception { ++ ParameterizedClass commitLogCompression = DatabaseDescriptor.getCommitLogCompression(); ++ EncryptionContext encryptionContext = DatabaseDescriptor.getEncryptionContext(); runExpecting(() -> testRecovery(logData, CommitLogDescriptor.VERSION_20), expected); - runExpecting(() -> testRecovery(new CommitLogDescriptor(4, null, EncryptionContextGenerator.createDisabledContext()), logData), expected); - runExpecting(() -> testRecovery(new CommitLogDescriptor(4, null), logData), expected); ++ runExpecting(() -> testRecovery(new CommitLogDescriptor(4, commitLogCompression, encryptionContext), logData), expected); + } + + protected void testRecovery(byte[] logData) throws Exception + { + Pair<File, Integer> pair = tmpFile(); + try (RandomAccessFile raf = new RandomAccessFile(pair.left, "rw")) + { + raf.seek(pair.right); + raf.write(logData); + raf.close(); + + CommitLog.instance.recover(pair.left); //CASSANDRA-1119 / CASSANDRA-1179 throw on failure*/ + } } @Test @@@ -489,7 -473,7 +508,6 @@@ boolean originalState = DatabaseDescriptor.isAutoSnapshot(); try { -- CommitLog.instance.resetUnsafe(true); boolean prev = DatabaseDescriptor.isAutoSnapshot(); DatabaseDescriptor.setAutoSnapshot(false); ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1); @@@ -549,183 -532,5 +566,103 @@@ DatabaseDescriptor.setAutoSnapshot(originalState); } } + + @Test - public void replay_StandardMmapped() throws IOException - { - ParameterizedClass originalCompression = DatabaseDescriptor.getCommitLogCompression(); - EncryptionContext originalEncryptionContext = DatabaseDescriptor.getEncryptionContext(); - try - { - DatabaseDescriptor.setCommitLogCompression(null); - DatabaseDescriptor.setEncryptionContext(EncryptionContextGenerator.createDisabledContext()); - CommitLog.instance.resetUnsafe(true); - replaySimple(CommitLog.instance); - replayWithDiscard(CommitLog.instance); - } - finally - { - DatabaseDescriptor.setCommitLogCompression(originalCompression); - DatabaseDescriptor.setEncryptionContext(originalEncryptionContext); - CommitLog.instance.resetUnsafe(true); - } - } - - @Test - public void replay_Compressed_LZ4() throws IOException - { - replay_Compressed(new ParameterizedClass(LZ4Compressor.class.getName(), Collections.<String, String>emptyMap())); - } - - @Test - public void replay_Compressed_Snappy() throws IOException - { - replay_Compressed(new ParameterizedClass(SnappyCompressor.class.getName(), Collections.<String, String>emptyMap())); - } - - @Test - public void replay_Compressed_Deflate() throws IOException - { - replay_Compressed(new ParameterizedClass(DeflateCompressor.class.getName(), Collections.<String, String>emptyMap())); - } - - private void replay_Compressed(ParameterizedClass parameterizedClass) throws IOException - { - ParameterizedClass originalCompression = DatabaseDescriptor.getCommitLogCompression(); - EncryptionContext originalEncryptionContext = DatabaseDescriptor.getEncryptionContext(); - try - { - DatabaseDescriptor.setCommitLogCompression(parameterizedClass); - DatabaseDescriptor.setEncryptionContext(EncryptionContextGenerator.createDisabledContext()); - CommitLog.instance.resetUnsafe(true); - - replaySimple(CommitLog.instance); - replayWithDiscard(CommitLog.instance); - } - finally - { - DatabaseDescriptor.setCommitLogCompression(originalCompression); - DatabaseDescriptor.setEncryptionContext(originalEncryptionContext); - CommitLog.instance.resetUnsafe(true); - } - } - - @Test - public void replay_Encrypted() throws IOException - { - ParameterizedClass originalCompression = DatabaseDescriptor.getCommitLogCompression(); - EncryptionContext originalEncryptionContext = DatabaseDescriptor.getEncryptionContext(); - try - { - DatabaseDescriptor.setCommitLogCompression(null); - DatabaseDescriptor.setEncryptionContext(EncryptionContextGenerator.createContext(true)); - CommitLog.instance.resetUnsafe(true); - - replaySimple(CommitLog.instance); - replayWithDiscard(CommitLog.instance); - } - finally - { - DatabaseDescriptor.setCommitLogCompression(originalCompression); - DatabaseDescriptor.setEncryptionContext(originalEncryptionContext); - CommitLog.instance.resetUnsafe(true); - } - } - - private void replaySimple(CommitLog commitLog) throws IOException ++ public void replaySimple() throws IOException + { + int cellCount = 0; + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1); + final Mutation rm1 = new RowUpdateBuilder(cfs.metadata, 0, "k1") + .clustering("bytes") + .add("val", bytes("this is a string")) + .build(); + cellCount += 1; - commitLog.add(rm1); ++ CommitLog.instance.add(rm1); + + final Mutation rm2 = new RowUpdateBuilder(cfs.metadata, 0, "k2") + .clustering("bytes") + .add("val", bytes("this is a string")) + .build(); + cellCount += 1; - commitLog.add(rm2); ++ CommitLog.instance.add(rm2); + - commitLog.sync(true); ++ CommitLog.instance.sync(true); + - Replayer replayer = new Replayer(commitLog, ReplayPosition.NONE); - List<String> activeSegments = commitLog.getActiveSegmentNames(); ++ Replayer replayer = new Replayer(CommitLog.instance, ReplayPosition.NONE); ++ List<String> activeSegments = CommitLog.instance.getActiveSegmentNames(); + Assert.assertFalse(activeSegments.isEmpty()); + - File[] files = new File(commitLog.location).listFiles((file, name) -> activeSegments.contains(name)); ++ File[] files = new File(CommitLog.instance.location).listFiles((file, name) -> activeSegments.contains(name)); + replayer.recover(files); + + assertEquals(cellCount, replayer.cells); + } + - private void replayWithDiscard(CommitLog commitLog) throws IOException ++ @Test ++ public void replayWithDiscard() throws IOException + { + int cellCount = 0; + int max = 1024; + int discardPosition = (int)(max * .8); // an arbitrary number of entries that we'll skip on the replay + ReplayPosition replayPosition = null; + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1); + + for (int i = 0; i < max; i++) + { + final Mutation rm1 = new RowUpdateBuilder(cfs.metadata, 0, "k" + 1) + .clustering("bytes") + .add("val", bytes("this is a string")) + .build(); - ReplayPosition position = commitLog.add(rm1); ++ ReplayPosition position = CommitLog.instance.add(rm1); + + if (i == discardPosition) + replayPosition = position; + if (i > discardPosition) + { + cellCount += 1; + } + } + - commitLog.sync(true); ++ CommitLog.instance.sync(true); + - Replayer replayer = new Replayer(commitLog, replayPosition); - List<String> activeSegments = commitLog.getActiveSegmentNames(); ++ Replayer replayer = new Replayer(CommitLog.instance, replayPosition); ++ List<String> activeSegments = CommitLog.instance.getActiveSegmentNames(); + Assert.assertFalse(activeSegments.isEmpty()); + - File[] files = new File(commitLog.location).listFiles((file, name) -> activeSegments.contains(name)); ++ File[] files = new File(CommitLog.instance.location).listFiles((file, name) -> activeSegments.contains(name)); + replayer.recover(files); + + assertEquals(cellCount, replayer.cells); + } + + class Replayer extends CommitLogReplayer + { + private final ReplayPosition filterPosition; + int cells; + int skipped; + + Replayer(CommitLog commitLog, ReplayPosition filterPosition) + { + super(commitLog, filterPosition, Collections.emptyMap(), ReplayFilter.create()); + this.filterPosition = filterPosition; + } + + @SuppressWarnings("resource") + void replayMutation(byte[] inputBuffer, int size, final int entryLocation, final CommitLogDescriptor desc) throws IOException + { + if (entryLocation <= filterPosition.position) + { + // Skip over this mutation. + skipped++; + return; + } + + FastByteArrayInputStream bufIn = new FastByteArrayInputStream(inputBuffer, 0, size); + Mutation mutation = Mutation.serializer.deserialize(new DataInputPlus.DataInputStreamPlus(bufIn), desc.getMessagingVersion(), SerializationHelper.Flag.LOCAL); + for (PartitionUpdate partitionUpdate : mutation.getPartitionUpdates()) + for (Row row : partitionUpdate) + cells += Iterables.size(row.cells()); + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc6ffc25/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTestMaker.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTestMaker.java index 69764e6,3538bd1..c8a6033 --- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTestMaker.java +++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTestMaker.java @@@ -100,12 -97,11 +100,12 @@@ public class CommitLogUpgradeTestMake public void makeLog() throws IOException, InterruptedException { CommitLog commitLog = CommitLog.instance; - System.out.format("\nUsing commit log size %dmb, compressor %s, sync %s%s\n", + System.out.format("\nUsing commit log size: %dmb, compressor: %s, encryption: %s, sync: %s, %s\n", mb(DatabaseDescriptor.getCommitLogSegmentSize()), - commitLog.compressor != null ? commitLog.compressor.getClass().getSimpleName() : "none", - commitLog.encryptionContext.isEnabled() ? "enabled" : "none", + commitLog.configuration.getCompressorName(), ++ commitLog.configuration.useEncryption(), commitLog.executor.getClass().getSimpleName(), - randomSize ? " random size" : ""); + randomSize ? "random size" : ""); final List<CommitlogExecutor> threads = new ArrayList<>(); ScheduledExecutorService scheduled = startThreads(commitLog, threads);