Repository: cassandra Updated Branches: refs/heads/cassandra-3.0 eb05025c0 -> 05cb556f9 refs/heads/cassandra-3.11 d577918ba -> c3a1a4fa8 refs/heads/trunk d274c6ac0 -> 2402acd47
More frequent commitlog chained markers patch by jasobrown; reviewed by Sam Tunnecliffe for CASSANDRA-13987 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/05cb556f Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/05cb556f Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/05cb556f Branch: refs/heads/cassandra-3.0 Commit: 05cb556f90dbd1929a180254809e05620265419b Parents: eb05025 Author: Jason Brown <jasobr...@apple.com> Authored: Tue Oct 17 15:37:13 2017 -0700 Committer: Jason Brown <jasedbr...@gmail.com> Committed: Tue Dec 5 05:06:38 2017 -0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + conf/cassandra.yaml | 10 +- .../org/apache/cassandra/config/Config.java | 1 + .../cassandra/config/DatabaseDescriptor.java | 10 ++ .../db/commitlog/AbstractCommitLogService.java | 86 ++++++++++--- .../cassandra/db/commitlog/CommitLog.java | 4 +- .../db/commitlog/CommitLogSegment.java | 85 ++++++++---- .../db/commitlog/CompressedSegment.java | 12 ++ .../db/commitlog/MemoryMappedSegment.java | 7 +- .../db/commitlog/PeriodicCommitLogService.java | 2 +- .../commitlog/CommitLogChainedMarkersTest.java | 128 +++++++++++++++++++ .../CommitLogSegmentBackpressureTest.java | 4 +- .../cassandra/db/commitlog/CommitLogTest.java | 6 +- .../db/commitlog/CommitLogTestReplayer.java | 2 +- 14 files changed, 304 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 6c50a3f..2683dc2 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.16 + * More frequent commitlog chained markers (CASSANDRA-13987) * Fix serialized size of DataLimits (CASSANDRA-14057) * Add flag to allow dropping oversized read repair mutations (CASSANDRA-13975) * Fix SSTableLoader logger message (CASSANDRA-14003) http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/conf/cassandra.yaml ---------------------------------------------------------------------- diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index b783090..71e0b2a 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -300,10 +300,18 @@ counter_cache_save_period: 7200 # # the other option is "periodic" where writes may be acked immediately # and the CommitLog is simply synced every commitlog_sync_period_in_ms -# milliseconds. +# milliseconds. commitlog_sync: periodic commitlog_sync_period_in_ms: 10000 +# Time interval in millis at which we should update the chained markers in the commitlog. +# This allows more of the commitlog to be replayed from the mmapped file +# if the cassandra process crashes; this does not help in durability for surviving a host fail. +# This value only makes sense if it is significantly less that commitlog_sync_period_in_ms, +# and only applies to periodic mode when not using commitlog compression or encryption. +# commitlog_marker_period_in_ms: 100 + + # The size of the individual commitlog file segments. A commitlog # segment may be archived, deleted, or recycled once all the data # in it (potentially from each columnfamily in the system) has been http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/src/java/org/apache/cassandra/config/Config.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index 64d41bb..0796183 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -192,6 +192,7 @@ public class Config public CommitLogSync commitlog_sync; public Double commitlog_sync_batch_window_in_ms; public Integer commitlog_sync_period_in_ms; + public Integer commitlog_marker_period_in_ms = 0; public int commitlog_segment_size_in_mb = 32; public ParameterizedClass commitlog_compression; public int commitlog_max_compression_buffers_in_pool = 3; http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index efc71ef..169ed3d 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -1519,6 +1519,16 @@ public class DatabaseDescriptor conf.commitlog_sync_period_in_ms = periodMillis; } + public static void setCommitLogMarkerPeriod(int markerPeriod) + { + conf.commitlog_marker_period_in_ms = markerPeriod; + } + + public static int getCommitLogMarkerPeriod() + { + return conf.commitlog_marker_period_in_ms; + } + public static Config.CommitLogSync getCommitLogSync() { return conf.commitlog_sync; http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java index e5a5887..8a03b2f 100644 --- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java +++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java @@ -18,6 +18,9 @@ package org.apache.cassandra.db.commitlog; import org.apache.cassandra.concurrent.NamedThreadFactory; +import org.apache.cassandra.config.Config; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation; import org.apache.cassandra.utils.NoSpamLogger; import org.apache.cassandra.utils.concurrent.WaitQueue; import org.slf4j.*; @@ -26,8 +29,6 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import static org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation; - public abstract class AbstractCommitLogService { @@ -47,7 +48,24 @@ public abstract class AbstractCommitLogService final CommitLog commitLog; private final String name; - private final long pollIntervalMillis; + + /** + * The duration between syncs to disk. + */ + private final long syncIntervalMillis; + + /** + * The duration between updating the chained markers in the the commit log file. This value should be + * 0 < {@link #markerIntervalMillis} <= {@link #syncIntervalMillis}. + */ + private final long markerIntervalMillis; + + /** + * A flag that callers outside of the sync thread can use to signal they want the commitlog segments + * to be flushed to disk. Note: this flag is primarily to support commit log's batch mode, which requires + * an immediate flush to disk on every mutation; see {@link BatchCommitLogService#maybeWaitForSync(Allocation)}. + */ + private volatile boolean syncRequested; private static final Logger logger = LoggerFactory.getLogger(AbstractCommitLogService.class); @@ -57,18 +75,45 @@ public abstract class AbstractCommitLogService * * Subclasses may be notified when a sync finishes by using the syncComplete WaitQueue. */ - AbstractCommitLogService(final CommitLog commitLog, final String name, final long pollIntervalMillis) + AbstractCommitLogService(final CommitLog commitLog, final String name, final long syncIntervalMillis) + { + this(commitLog, name, syncIntervalMillis, syncIntervalMillis); + } + + /** + * CommitLogService provides a fsync service for Allocations, fulfilling either the + * Batch or Periodic contract. + * + * Subclasses may be notified when a sync finishes by using the syncComplete WaitQueue. + */ + AbstractCommitLogService(final CommitLog commitLog, final String name, final long syncIntervalMillis, long markerIntervalMillis) { this.commitLog = commitLog; this.name = name; - this.pollIntervalMillis = pollIntervalMillis; + this.syncIntervalMillis = syncIntervalMillis; + + // if we are not using periodic mode, or we using compression, we shouldn't update the chained markers + // faster than the sync interval + if (DatabaseDescriptor.getCommitLogSync() != Config.CommitLogSync.periodic || commitLog.configuration.useCompression()) + markerIntervalMillis = syncIntervalMillis; + + // apply basic bounds checking on the marker interval + if (markerIntervalMillis <= 0 || markerIntervalMillis > syncIntervalMillis) + { + logger.debug("commit log marker interval {} is less than zero or above the sync interval {}; setting value to sync interval", + markerIntervalMillis, syncIntervalMillis); + markerIntervalMillis = syncIntervalMillis; + } + + this.markerIntervalMillis = markerIntervalMillis; } // Separated into individual method to ensure relevant objects are constructed before this is started. void start() { - if (pollIntervalMillis < 1) - throw new IllegalArgumentException(String.format("Commit log flush interval must be positive: %dms", pollIntervalMillis)); + if (syncIntervalMillis < 1) + throw new IllegalArgumentException(String.format("Commit log flush interval must be positive: %fms", + syncIntervalMillis * 1e-6)); Runnable runnable = new Runnable() { @@ -89,16 +134,24 @@ public abstract class AbstractCommitLogService run = !shutdown; // sync and signal - long syncStarted = System.currentTimeMillis(); - //This is a target for Byteman in CommitLogSegmentManagerTest - commitLog.sync(shutdown); - lastSyncedAt = syncStarted; - syncComplete.signalAll(); - + long pollStarted = System.currentTimeMillis(); + if (lastSyncedAt + syncIntervalMillis <= pollStarted || shutdown || syncRequested) + { + // in this branch, we want to flush the commit log to disk + commitLog.sync(shutdown, true); + syncRequested = false; + lastSyncedAt = pollStarted; + syncComplete.signalAll(); + } + else + { + // in this branch, just update the commit log sync headers + commitLog.sync(false, false); + } // sleep any time we have left before the next one is due long now = System.currentTimeMillis(); - long sleep = syncStarted + pollIntervalMillis - now; + long sleep = pollStarted + markerIntervalMillis - now; if (sleep < 0) { // if we have lagged noticeably, update our lag counter @@ -111,7 +164,7 @@ public abstract class AbstractCommitLogService lagCount++; } syncCount++; - totalSyncDuration += now - syncStarted; + totalSyncDuration += now - pollStarted; if (firstLagAt > 0) { @@ -149,7 +202,7 @@ public abstract class AbstractCommitLogService // sleep for full poll-interval after an error, so we don't spam the log file try { - haveWork.tryAcquire(pollIntervalMillis, TimeUnit.MILLISECONDS); + haveWork.tryAcquire(markerIntervalMillis, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { @@ -180,6 +233,7 @@ public abstract class AbstractCommitLogService */ public WaitQueue.Signal requestExtraSync() { + syncRequested = true; WaitQueue.Signal signal = syncComplete.register(); haveWork.release(1); return signal; http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/src/java/org/apache/cassandra/db/commitlog/CommitLog.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java index 40040ed..ff1b712 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java @@ -220,14 +220,14 @@ public class CommitLog implements CommitLogMBean /** * Forces a disk flush on the commit log files that need it. Blocking. */ - public void sync(boolean syncAllSegments) + public void sync(boolean syncAllSegments, boolean flush) { CommitLogSegment current = allocator.allocatingFrom(); for (CommitLogSegment segment : allocator.getActiveSegments()) { if (!syncAllSegments && segment.id > current.id) return; - segment.sync(); + segment.sync(flush); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java index 236a1b1..8834c8c 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java @@ -94,6 +94,12 @@ public abstract class CommitLogSegment // sync marker in a segment will be zeroed out, or point to a position too close to the EOF to fit a marker. private volatile int lastSyncedOffset; + /** + * Everything before this offset has it's markers written into the {@link #buffer}, but has not necessarily + * been flushed to disk. This value should be greater than or equal to {@link #lastSyncedOffset}. + */ + private volatile int lastMarkerOffset; + // The end position of the buffer. Initially set to its capacity and updated to point to the last written position // as the segment is being closed. // No need to be volatile as writes are protected by appendOrder barrier. @@ -167,7 +173,8 @@ public abstract class CommitLogSegment // write the header CommitLogDescriptor.writeHeader(buffer, descriptor); endOfBuffer = buffer.capacity(); - lastSyncedOffset = buffer.position(); + + lastSyncedOffset = lastMarkerOffset = buffer.position(); allocatePosition.set(lastSyncedOffset + SYNC_MARKER_SIZE); } @@ -232,7 +239,7 @@ public abstract class CommitLogSegment // ensures no more of this segment is writeable, by allocating any unused section at the end and marking it discarded void discardUnusedTail() { - // We guard this with the OpOrdering instead of synchronised due to potential dead-lock with CLSM.advanceAllocatingFrom() + // We guard this with the OpOrdering instead of synchronised due to potential dead-lock with ACLSM.advanceAllocatingFrom() // Ensures endOfBuffer update is reflected in the buffer end position picked up by sync(). // This actually isn't strictly necessary, as currently all calls to discardUnusedTail are executed either by the thread // running sync or within a mutation already protected by this OpOrdering, but to prevent future potential mistakes, @@ -271,13 +278,18 @@ public abstract class CommitLogSegment } /** - * Forces a disk flush for this segment file. + * Update the chained markers in the commit log buffer and possibly force a disk flush for this segment file. + * + * @param flush true if the segment should flush to disk; else, false for just updating the chained markers. */ - synchronized void sync() + synchronized void sync(boolean flush) { - boolean close = false; + assert lastMarkerOffset >= lastSyncedOffset : String.format("commit log segment positions are incorrect: last marked = %d, last synced = %d", + lastMarkerOffset, lastSyncedOffset); // check we have more work to do - if (allocatePosition.get() <= lastSyncedOffset + SYNC_MARKER_SIZE) + final boolean needToMarkData = allocatePosition.get() > lastMarkerOffset + SYNC_MARKER_SIZE; + final boolean hasDataToFlush = lastSyncedOffset != lastMarkerOffset; + if (!(needToMarkData || hasDataToFlush)) return; // Note: Even if the very first allocation of this sync section failed, we still want to enter this // to ensure the segment is closed. As allocatePosition is set to 1 beyond the capacity of the buffer, @@ -285,31 +297,48 @@ public abstract class CommitLogSegment // succeeded in the previous sync. assert buffer != null; // Only close once. - int startMarker = lastSyncedOffset; - // Allocate a new sync marker; this is both necessary in itself, but also serves to demarcate - // the point at which we can safely consider records to have been completely written to. - int nextMarker = allocate(SYNC_MARKER_SIZE); - if (nextMarker < 0) + boolean close = false; + int startMarker = lastMarkerOffset; + int nextMarker, sectionEnd; + if (needToMarkData) { - // Ensure no more of this CLS is writeable, and mark ourselves for closing. - discardUnusedTail(); - close = true; - - // We use the buffer size as the synced position after a close instead of the end of the actual data - // to make sure we only close the buffer once. - // The endOfBuffer position may be incorrect at this point (to be written by another stalled thread). - nextMarker = buffer.capacity(); + // Allocate a new sync marker; this is both necessary in itself, but also serves to demarcate + // the point at which we can safely consider records to have been completely written to. + nextMarker = allocate(SYNC_MARKER_SIZE); + if (nextMarker < 0) + { + // Ensure no more of this CLS is writeable, and mark ourselves for closing. + discardUnusedTail(); + close = true; + + // We use the buffer size as the synced position after a close instead of the end of the actual data + // to make sure we only close the buffer once. + // The endOfBuffer position may be incorrect at this point (to be written by another stalled thread). + nextMarker = buffer.capacity(); + } + // Wait for mutations to complete as well as endOfBuffer to have been written. + waitForModifications(); + sectionEnd = close ? endOfBuffer : nextMarker; + + // Possibly perform compression or encryption and update the chained markers + write(startMarker, sectionEnd); + lastMarkerOffset = sectionEnd; + } + else + { + // note: we don't need to waitForModifications() as, once we get to this block, we are only doing the flush + // and any mutations have already been fully written into the segment (as we wait for it in the previous block). + nextMarker = lastMarkerOffset; + sectionEnd = nextMarker; } - // Wait for mutations to complete as well as endOfBuffer to have been written. - waitForModifications(); - int sectionEnd = close ? endOfBuffer : nextMarker; - // Perform compression, writing to file and flush. - write(startMarker, sectionEnd); + if (flush || close) + { + flush(startMarker, sectionEnd); + lastSyncedOffset = lastMarkerOffset = nextMarker; + } - // Signal the sync as complete. - lastSyncedOffset = nextMarker; if (close) internalClose(); syncComplete.signalAll(); @@ -327,6 +356,8 @@ public abstract class CommitLogSegment abstract void write(int lastSyncedOffset, int nextMarker); + abstract void flush(int startMarker, int nextMarker); + public boolean isStillAllocating() { return allocatePosition.get() < endOfBuffer; @@ -404,7 +435,7 @@ public abstract class CommitLogSegment synchronized void close() { discardUnusedTail(); - sync(); + sync(true); assert buffer == null; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java index c00ce18..8e05112 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java +++ b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java @@ -139,6 +139,18 @@ public class CompressedSegment extends CommitLogSegment channel.write(compressedBuffer); assert channel.position() - lastWrittenPos == compressedBuffer.limit(); lastWrittenPos = channel.position(); + } + catch (Exception e) + { + throw new FSWriteError(e, getPath()); + } + } + + @Override + protected void flush(int startMarker, int nextMarker) + { + try + { SyncUtil.force(channel, true); } catch (Exception e) http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java b/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java index 3a16d91..8259f04 100644 --- a/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java +++ b/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java @@ -78,8 +78,13 @@ public class MemoryMappedSegment extends CommitLogSegment // write previous sync marker to point to next sync marker // we don't chain the crcs here to ensure this method is idempotent if it fails writeSyncMarker(id, buffer, startMarker, startMarker, nextMarker); + } - try { + @Override + protected void flush(int startMarker, int nextMarker) + { + try + { SyncUtil.force((MappedByteBuffer) buffer); } catch (Exception e) // MappedByteBuffer.force() does not declare IOException but can actually throw it http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java index 86a248b..76419b7 100644 --- a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java +++ b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java @@ -26,7 +26,7 @@ class PeriodicCommitLogService extends AbstractCommitLogService public PeriodicCommitLogService(final CommitLog commitLog) { - super(commitLog, "PERIODIC-COMMIT-LOG-SYNCER", DatabaseDescriptor.getCommitLogSyncPeriod()); + super(commitLog, "PERIODIC-COMMIT-LOG-SYNCER", DatabaseDescriptor.getCommitLogSyncPeriod(), DatabaseDescriptor.getCommitLogMarkerPeriod()); } protected void maybeWaitForSync(CommitLogSegment.Allocation alloc) http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java new file mode 100644 index 0000000..e2b9f72 --- /dev/null +++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.commitlog; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Random; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.Config; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.RowUpdateBuilder; +import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.db.marshal.AsciiType; +import org.apache.cassandra.db.marshal.BytesType; +import org.apache.cassandra.db.rows.SerializationHelper; +import org.apache.cassandra.io.util.DataInputBuffer; +import org.apache.cassandra.io.util.RebufferingInputStream; +import org.apache.cassandra.schema.KeyspaceParams; +import org.jboss.byteman.contrib.bmunit.BMRule; +import org.jboss.byteman.contrib.bmunit.BMUnitRunner; + +/** + * Tests the commitlog to make sure we can replay it - explicitly for the case where we update the chained markers + * in the commit log segment but do not flush the file to disk. + */ +@RunWith(BMUnitRunner.class) +public class CommitLogChainedMarkersTest +{ + private static final String KEYSPACE1 = "CommitLogTest"; + private static final String STANDARD1 = "CommitLogChainedMarkersTest"; + + @Test + @BMRule(name = "force all calls to sync() to not flush to disk", + targetClass = "CommitLogSegment", + targetMethod = "sync(boolean)", + action = "$flush = false") + public void replayCommitLogWithoutFlushing() throws IOException + { + DatabaseDescriptor.setCommitLogSegmentSize(5); + DatabaseDescriptor.setCommitLogSync(Config.CommitLogSync.periodic); + DatabaseDescriptor.setCommitLogSyncPeriod(10000 * 1000); + DatabaseDescriptor.setCommitLogMarkerPeriod(1); + SchemaLoader.prepareServer(); + SchemaLoader.createKeyspace(KEYSPACE1, + KeyspaceParams.simple(1), + SchemaLoader.standardCFMD(KEYSPACE1, STANDARD1, 0, AsciiType.instance, BytesType.instance)); + + CompactionManager.instance.disableAutoCompaction(); + + ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1); + + byte[] entropy = new byte[1024]; + new Random().nextBytes(entropy); + final Mutation m = new RowUpdateBuilder(cfs1.metadata, 0, "k") + .clustering("bytes") + .add("val", ByteBuffer.wrap(entropy)) + .build(); + + int samples = 10000; + for (int i = 0; i < samples; i++) + CommitLog.instance.add(m); + + CommitLog.instance.sync(false, true); + + Replayer replayer = new Replayer(cfs1.metadata); + File commitLogDir = new File(DatabaseDescriptor.getCommitLogLocation()); + replayer.recover(commitLogDir.listFiles()); + Assert.assertEquals(samples, replayer.count); + } + + private static class Replayer extends CommitLogReplayer + { + private final CFMetaData cfm; + private int count; + + Replayer(CFMetaData cfm) + { + super(CommitLog.instance, ReplayPosition.NONE, null, ReplayFilter.create()); + this.cfm = cfm; + } + + @Override + void replayMutation(byte[] inputBuffer, int size, final int entryLocation, final CommitLogDescriptor desc) + { + RebufferingInputStream bufIn = new DataInputBuffer(inputBuffer, 0, size); + try + { + Mutation mutation = Mutation.serializer.deserialize(bufIn, + desc.getMessagingVersion(), + SerializationHelper.Flag.LOCAL); + + if (cfm == null || mutation.get(cfm) != null) + count++; + } + catch (IOException e) + { + // Test fails. + throw new AssertionError(e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java index b651098..a1999ef 100644 --- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java +++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java @@ -66,12 +66,12 @@ public class CommitLogSegmentBackpressureTest @BMRules(rules = {@BMRule(name = "Acquire Semaphore before sync", targetClass = "AbstractCommitLogService$1", targetMethod = "run", - targetLocation = "AT INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync", + targetLocation = "AT INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync(boolean, boolean)", action = "org.apache.cassandra.db.commitlog.CommitLogSegmentBackpressureTest.allowSync.acquire()"), @BMRule(name = "Release Semaphore after sync", targetClass = "AbstractCommitLogService$1", targetMethod = "run", - targetLocation = "AFTER INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync", + targetLocation = "AFTER INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync(boolean, boolean)", action = "org.apache.cassandra.db.commitlog.CommitLogSegmentBackpressureTest.allowSync.release()")}) public void testCompressedCommitLogBackpressure() throws Throwable { http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java index 9e9ee53..b8f68ed 100644 --- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java +++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java @@ -339,7 +339,7 @@ public class CommitLogTest // "Flush": this won't delete anything UUID cfid1 = rm.getColumnFamilyIds().iterator().next(); - CommitLog.instance.sync(true); + CommitLog.instance.sync(true, true); CommitLog.instance.discardCompletedSegments(cfid1, ReplayPosition.NONE, CommitLog.instance.getContext()); assert CommitLog.instance.activeSegments() == 1 : "Expecting 1 segment, got " + CommitLog.instance.activeSegments(); @@ -652,7 +652,7 @@ public class CommitLogTest DatabaseDescriptor.setDiskFailurePolicy(oldPolicy); } - CommitLog.instance.sync(true); + CommitLog.instance.sync(true, true); System.setProperty("cassandra.replayList", KEYSPACE1 + "." + STANDARD1); // Currently we don't attempt to re-flush a memtable that failed, thus make sure data is replayed by commitlog. // If retries work subsequent flushes should clear up error and this should change to expect 0. @@ -685,7 +685,7 @@ public class CommitLogTest for (SSTableReader reader : cfs.getLiveSSTables()) reader.reloadSSTableMetadata(); - CommitLog.instance.sync(true); + CommitLog.instance.sync(true, true); System.setProperty("cassandra.replayList", KEYSPACE1 + "." + STANDARD1); // In the absence of error, this should be 0 because forceBlockingFlush/forceRecycleAllSegments would have // persisted all data in the commit log. Because we know there was an error, there must be something left to http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java index e690785..36973f2 100644 --- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java +++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java @@ -38,7 +38,7 @@ public class CommitLogTestReplayer extends CommitLogReplayer { public static void examineCommitLog(Predicate<Mutation> processor) throws IOException { - CommitLog.instance.sync(true); + CommitLog.instance.sync(true, true); CommitLogTestReplayer replayer = new CommitLogTestReplayer(CommitLog.instance, processor); File commitLogDir = new File(DatabaseDescriptor.getCommitLogLocation()); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org