Repository: cassandra Updated Branches: refs/heads/cassandra-3.0 0d1216916 -> db788fe86 refs/heads/cassandra-3.11 e646e5032 -> c000827af refs/heads/trunk 7a40abb6a -> d569c831c
Improve commit log chain marker updating patch by jasobrown; reviewed by Ariel Weisberg for CASSANDRA-14108 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/db788fe8 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/db788fe8 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/db788fe8 Branch: refs/heads/cassandra-3.0 Commit: db788fe860dfd69f06ab97ae35fa67fcf2517b6d Parents: 0d12169 Author: Jason Brown <[email protected]> Authored: Mon Dec 11 16:25:29 2017 -0800 Committer: Jason Brown <[email protected]> Committed: Wed Dec 13 19:51:34 2017 -0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + conf/cassandra.yaml | 8 - .../org/apache/cassandra/config/Config.java | 1 - .../cassandra/config/DatabaseDescriptor.java | 10 - .../db/commitlog/AbstractCommitLogService.java | 250 +++++++++++-------- .../db/commitlog/PeriodicCommitLogService.java | 3 +- src/java/org/apache/cassandra/utils/Clock.java | 80 ++++++ .../commitlog/AbstractCommitLogServiceTest.java | 176 +++++++++++++ .../commitlog/CommitLogChainedMarkersTest.java | 1 - .../CommitLogSegmentBackpressureTest.java | 8 +- .../cassandra/utils/FreeRunningClock.java | 46 ++++ 11 files changed, 449 insertions(+), 135 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/db788fe8/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 6bfddcc..ee90a67 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.16 + * Improve commit log chain marker updating (CASSANDRA-14108) * Extra range tombstone bound creates double rows (CASSANDRA-14008) * Fix SStable ordering by max timestamp in SinglePartitionReadCommand (CASSANDRA-14010) * Accept role names containing forward-slash (CASSANDRA-14088) http://git-wip-us.apache.org/repos/asf/cassandra/blob/db788fe8/conf/cassandra.yaml ---------------------------------------------------------------------- diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index 71e0b2a..ef7b034 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -304,14 +304,6 @@ counter_cache_save_period: 7200 commitlog_sync: periodic commitlog_sync_period_in_ms: 10000 -# Time interval in millis at which we should update the chained markers in the commitlog. -# This allows more of the commitlog to be replayed from the mmapped file -# if the cassandra process crashes; this does not help in durability for surviving a host fail. -# This value only makes sense if it is significantly less that commitlog_sync_period_in_ms, -# and only applies to periodic mode when not using commitlog compression or encryption. -# commitlog_marker_period_in_ms: 100 - - # The size of the individual commitlog file segments. A commitlog # segment may be archived, deleted, or recycled once all the data # in it (potentially from each columnfamily in the system) has been http://git-wip-us.apache.org/repos/asf/cassandra/blob/db788fe8/src/java/org/apache/cassandra/config/Config.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index 0796183..64d41bb 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -192,7 +192,6 @@ public class Config public CommitLogSync commitlog_sync; public Double commitlog_sync_batch_window_in_ms; public Integer commitlog_sync_period_in_ms; - public Integer commitlog_marker_period_in_ms = 0; public int commitlog_segment_size_in_mb = 32; public ParameterizedClass commitlog_compression; public int commitlog_max_compression_buffers_in_pool = 3; http://git-wip-us.apache.org/repos/asf/cassandra/blob/db788fe8/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 169ed3d..efc71ef 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -1519,16 +1519,6 @@ public class DatabaseDescriptor conf.commitlog_sync_period_in_ms = periodMillis; } - public static void setCommitLogMarkerPeriod(int markerPeriod) - { - conf.commitlog_marker_period_in_ms = markerPeriod; - } - - public static int getCommitLogMarkerPeriod() - { - return conf.commitlog_marker_period_in_ms; - } - public static Config.CommitLogSync getCommitLogSync() { return conf.commitlog_sync; http://git-wip-us.apache.org/repos/asf/cassandra/blob/db788fe8/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java index 8a03b2f..829530d 100644 --- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java +++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java @@ -19,8 +19,8 @@ package org.apache.cassandra.db.commitlog; import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.config.Config; -import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation; +import org.apache.cassandra.utils.Clock; import org.apache.cassandra.utils.NoSpamLogger; import org.apache.cassandra.utils.concurrent.WaitQueue; import org.slf4j.*; @@ -31,6 +31,11 @@ import java.util.concurrent.atomic.AtomicLong; public abstract class AbstractCommitLogService { + /** + * When in {@link Config.CommitLogSync#periodic} mode, the default number of milliseconds to wait between updating + * the commit log chained markers. + */ + static final long DEFAULT_MARKER_INTERVAL_MILLIS = 100; private Thread thread; private volatile boolean shutdown = false; @@ -52,13 +57,13 @@ public abstract class AbstractCommitLogService /** * The duration between syncs to disk. */ - private final long syncIntervalMillis; + final long syncIntervalMillis; /** * The duration between updating the chained markers in the the commit log file. This value should be * 0 < {@link #markerIntervalMillis} <= {@link #syncIntervalMillis}. */ - private final long markerIntervalMillis; + final long markerIntervalMillis; /** * A flag that callers outside of the sync thread can use to signal they want the commitlog segments @@ -75,9 +80,9 @@ public abstract class AbstractCommitLogService * * Subclasses may be notified when a sync finishes by using the syncComplete WaitQueue. */ - AbstractCommitLogService(final CommitLog commitLog, final String name, final long syncIntervalMillis) + AbstractCommitLogService(final CommitLog commitLog, final String name, long syncIntervalMillis) { - this(commitLog, name, syncIntervalMillis, syncIntervalMillis); + this (commitLog, name, syncIntervalMillis, false); } /** @@ -85,138 +90,163 @@ public abstract class AbstractCommitLogService * Batch or Periodic contract. * * Subclasses may be notified when a sync finishes by using the syncComplete WaitQueue. + * + * @param markHeadersFaster true if the chained markers should be updated more frequently than on the disk sync bounds. */ - AbstractCommitLogService(final CommitLog commitLog, final String name, final long syncIntervalMillis, long markerIntervalMillis) + AbstractCommitLogService(final CommitLog commitLog, final String name, long syncIntervalMillis, boolean markHeadersFaster) { this.commitLog = commitLog; this.name = name; - this.syncIntervalMillis = syncIntervalMillis; - // if we are not using periodic mode, or we using compression, we shouldn't update the chained markers - // faster than the sync interval - if (DatabaseDescriptor.getCommitLogSync() != Config.CommitLogSync.periodic || commitLog.configuration.useCompression()) - markerIntervalMillis = syncIntervalMillis; + if (markHeadersFaster && syncIntervalMillis > DEFAULT_MARKER_INTERVAL_MILLIS) + { + markerIntervalMillis = DEFAULT_MARKER_INTERVAL_MILLIS; + long modulo = syncIntervalMillis % markerIntervalMillis; + if (modulo != 0) + { + // quantize syncIntervalMillis to a multiple of markerIntervalMillis + syncIntervalMillis -= modulo; - // apply basic bounds checking on the marker interval - if (markerIntervalMillis <= 0 || markerIntervalMillis > syncIntervalMillis) + if (modulo >= markerIntervalMillis / 2) + syncIntervalMillis += markerIntervalMillis; + } + logger.debug("Will update the commitlog markers every {}ms and flush every {}ms", markerIntervalMillis, syncIntervalMillis); + } + else { - logger.debug("commit log marker interval {} is less than zero or above the sync interval {}; setting value to sync interval", - markerIntervalMillis, syncIntervalMillis); markerIntervalMillis = syncIntervalMillis; } - this.markerIntervalMillis = markerIntervalMillis; + assert syncIntervalMillis % markerIntervalMillis == 0; + this.syncIntervalMillis = syncIntervalMillis; } // Separated into individual method to ensure relevant objects are constructed before this is started. void start() { if (syncIntervalMillis < 1) - throw new IllegalArgumentException(String.format("Commit log flush interval must be positive: %fms", - syncIntervalMillis * 1e-6)); + throw new IllegalArgumentException(String.format("Commit log flush interval must be positive: %dms", + syncIntervalMillis)); + shutdown = false; + Runnable runnable = new SyncRunnable(new Clock()); + thread = new Thread(NamedThreadFactory.threadLocalDeallocator(runnable), name); + thread.start(); + } - Runnable runnable = new Runnable() + class SyncRunnable implements Runnable + { + final Clock clock; + long firstLagAt = 0; + long totalSyncDuration = 0; // total time spent syncing since firstLagAt + long syncExceededIntervalBy = 0; // time that syncs exceeded pollInterval since firstLagAt + int lagCount = 0; + int syncCount = 0; + + SyncRunnable(Clock clock) { - public void run() + this.clock = clock; + } + + public void run() + { + while (true) { - long firstLagAt = 0; - long totalSyncDuration = 0; // total time spent syncing since firstLagAt - long syncExceededIntervalBy = 0; // time that syncs exceeded pollInterval since firstLagAt - int lagCount = 0; - int syncCount = 0; - - boolean run = true; - while (run) + if (!sync()) + break; + } + } + + boolean sync() + { + try + { + // always run once after shutdown signalled + boolean run = !shutdown; + + // sync and signal + long pollStarted = clock.currentTimeMillis(); + if (lastSyncedAt + syncIntervalMillis <= pollStarted || shutdown || syncRequested) { - try - { - // always run once after shutdown signalled - run = !shutdown; - - // sync and signal - long pollStarted = System.currentTimeMillis(); - if (lastSyncedAt + syncIntervalMillis <= pollStarted || shutdown || syncRequested) - { - // in this branch, we want to flush the commit log to disk - commitLog.sync(shutdown, true); - syncRequested = false; - lastSyncedAt = pollStarted; - syncComplete.signalAll(); - } - else - { - // in this branch, just update the commit log sync headers - commitLog.sync(false, false); - } - - // sleep any time we have left before the next one is due - long now = System.currentTimeMillis(); - long sleep = pollStarted + markerIntervalMillis - now; - if (sleep < 0) - { - // if we have lagged noticeably, update our lag counter - if (firstLagAt == 0) - { - firstLagAt = now; - totalSyncDuration = syncExceededIntervalBy = syncCount = lagCount = 0; - } - syncExceededIntervalBy -= sleep; - lagCount++; - } - syncCount++; - totalSyncDuration += now - pollStarted; - - if (firstLagAt > 0) - { - //Only reset the lag tracking if it actually logged this time - boolean logged = NoSpamLogger.log( - logger, - NoSpamLogger.Level.WARN, - 5, - TimeUnit.MINUTES, - "Out of {} commit log syncs over the past {}s with average duration of {}ms, {} have exceeded the configured commit interval by an average of {}ms", - syncCount, (now - firstLagAt) / 1000, String.format("%.2f", (double) totalSyncDuration / syncCount), lagCount, String.format("%.2f", (double) syncExceededIntervalBy / lagCount)); - if (logged) - firstLagAt = 0; - } - - // if we have lagged this round, we probably have work to do already so we don't sleep - if (sleep < 0 || !run) - continue; - - try - { - haveWork.tryAcquire(sleep, TimeUnit.MILLISECONDS); - haveWork.drainPermits(); - } - catch (InterruptedException e) - { - throw new AssertionError(); - } - } - catch (Throwable t) + // in this branch, we want to flush the commit log to disk + commitLog.sync(shutdown, true); + syncRequested = false; + lastSyncedAt = pollStarted; + syncComplete.signalAll(); + } + else + { + // in this branch, just update the commit log sync headers + commitLog.sync(false, false); + } + + // sleep any time we have left before the next one is due + long now = clock.currentTimeMillis(); + long sleep = pollStarted + markerIntervalMillis - now; + if (sleep < 0) + { + // if we have lagged noticeably, update our lag counter + if (firstLagAt == 0) { - if (!CommitLog.handleCommitError("Failed to persist commits to disk", t)) - break; - - // sleep for full poll-interval after an error, so we don't spam the log file - try - { - haveWork.tryAcquire(markerIntervalMillis, TimeUnit.MILLISECONDS); - } - catch (InterruptedException e) - { - throw new AssertionError(); - } + firstLagAt = now; + totalSyncDuration = syncExceededIntervalBy = syncCount = lagCount = 0; } + syncExceededIntervalBy -= sleep; + lagCount++; + } + syncCount++; + totalSyncDuration += now - pollStarted; + + if (firstLagAt > 0) + { + //Only reset the lag tracking if it actually logged this time + boolean logged = NoSpamLogger.log( + logger, + NoSpamLogger.Level.WARN, + 5, + TimeUnit.MINUTES, + "Out of {} commit log syncs over the past {}s with average duration of {}ms, {} have exceeded the configured commit interval by an average of {}ms", + syncCount, (now - firstLagAt) / 1000, String.format("%.2f", (double) totalSyncDuration / syncCount), lagCount, String.format("%.2f", (double) syncExceededIntervalBy / lagCount)); + if (logged) + firstLagAt = 0; + } + + if (!run) + return false; + + // if we have lagged this round, we probably have work to do already so we don't sleep + if (sleep < 0) + return true; + + try + { + haveWork.tryAcquire(sleep, TimeUnit.MILLISECONDS); + haveWork.drainPermits(); + } + catch (InterruptedException e) + { + throw new AssertionError(); } } - }; + catch (Throwable t) + { + if (!CommitLog.handleCommitError("Failed to persist commits to disk", t)) + return false; - thread = new Thread(NamedThreadFactory.threadLocalDeallocator(runnable), name); - thread.start(); + // sleep for full poll-interval after an error, so we don't spam the log file + try + { + haveWork.tryAcquire(markerIntervalMillis, TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) + { + throw new AssertionError(); + } + } + return true; + } } + /** * Block for @param alloc to be sync'd as necessary, and handle bookkeeping */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/db788fe8/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java index 76419b7..7a09de0 100644 --- a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java +++ b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java @@ -26,7 +26,8 @@ class PeriodicCommitLogService extends AbstractCommitLogService public PeriodicCommitLogService(final CommitLog commitLog) { - super(commitLog, "PERIODIC-COMMIT-LOG-SYNCER", DatabaseDescriptor.getCommitLogSyncPeriod(), DatabaseDescriptor.getCommitLogMarkerPeriod()); + super(commitLog, "PERIODIC-COMMIT-LOG-SYNCER", DatabaseDescriptor.getCommitLogSyncPeriod(), + !commitLog.configuration.useCompression()); } protected void maybeWaitForSync(CommitLogSegment.Allocation alloc) http://git-wip-us.apache.org/repos/asf/cassandra/blob/db788fe8/src/java/org/apache/cassandra/utils/Clock.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/Clock.java b/src/java/org/apache/cassandra/utils/Clock.java new file mode 100644 index 0000000..eb9822c --- /dev/null +++ b/src/java/org/apache/cassandra/utils/Clock.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.utils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Wrapper around time related functions that are either implemented by using the default JVM calls + * or by using a custom implementation for testing purposes. + * + * See {@link #instance} for how to use a custom implementation. + * + * Please note that {@link java.time.Clock} wasn't used, as it would not be possible to provide an + * implementation for {@link #nanoTime()} with the exact same properties of {@link System#nanoTime()}. + */ +public class Clock +{ + private static final Logger logger = LoggerFactory.getLogger(Clock.class); + + /** + * Static singleton object that will be instanciated by default with a system clock + * implementation. Set <code>cassandra.clock</code> system property to a FQCN to use a + * different implementation instead. + */ + public static Clock instance; + + static + { + String sclock = System.getProperty("cassandra.clock"); + if (sclock == null) + { + instance = new Clock(); + } + else + { + try + { + logger.debug("Using custom clock implementation: {}", sclock); + instance = (Clock) Class.forName(sclock).newInstance(); + } + catch (Exception e) + { + logger.error(e.getMessage(), e); + } + } + } + + /** + * @see System#nanoTime() + */ + public long nanoTime() + { + return System.nanoTime(); + } + + /** + * @see System#currentTimeMillis() + */ + public long currentTimeMillis() + { + return System.currentTimeMillis(); + } + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/db788fe8/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java b/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java new file mode 100644 index 0000000..5a46e5f --- /dev/null +++ b/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.commitlog; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; + +import org.apache.cassandra.config.Config; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.utils.Clock; +import org.apache.cassandra.utils.FreeRunningClock; + +import static org.apache.cassandra.db.commitlog.AbstractCommitLogService.DEFAULT_MARKER_INTERVAL_MILLIS; + +public class AbstractCommitLogServiceTest +{ + @BeforeClass + public static void before() + { + DatabaseDescriptor.setCommitLogSync(Config.CommitLogSync.periodic); + DatabaseDescriptor.setCommitLogSyncPeriod(10 * 1000); + } + + @Test + public void testConstructorSyncIsQuantized() + { + long syncTimeMillis = 10 * 1000; + FakeCommitLogService commitLogService = new FakeCommitLogService(syncTimeMillis); + Assert.assertEquals(DEFAULT_MARKER_INTERVAL_MILLIS, commitLogService.markerIntervalMillis); + Assert.assertEquals(syncTimeMillis, commitLogService.syncIntervalMillis); + } + + @Test + public void testConstructorSyncEqualsMarkerDefault() + { + long syncTimeMillis = 100; + FakeCommitLogService commitLogService = new FakeCommitLogService(syncTimeMillis); + Assert.assertEquals(DEFAULT_MARKER_INTERVAL_MILLIS, commitLogService.markerIntervalMillis); + Assert.assertEquals(syncTimeMillis, commitLogService.syncIntervalMillis); + Assert.assertEquals(commitLogService.markerIntervalMillis, commitLogService.syncIntervalMillis); + } + + @Test + public void testConstructorSyncShouldRoundUp() + { + long syncTimeMillis = 151; + long expectedMillis = 200; + FakeCommitLogService commitLogService = new FakeCommitLogService(syncTimeMillis); + Assert.assertEquals(DEFAULT_MARKER_INTERVAL_MILLIS, commitLogService.markerIntervalMillis); + Assert.assertEquals(expectedMillis, commitLogService.syncIntervalMillis); + } + + @Test + public void testConstructorSyncShouldRoundDown() + { + long syncTimeMillis = 121; + long expectedMillis = 100; + FakeCommitLogService commitLogService = new FakeCommitLogService(syncTimeMillis); + Assert.assertEquals(DEFAULT_MARKER_INTERVAL_MILLIS, commitLogService.markerIntervalMillis); + Assert.assertEquals(expectedMillis, commitLogService.syncIntervalMillis); + } + + @Test + public void testConstructorSyncTinyValue() + { + long syncTimeMillis = 10; + long expectedNanos = syncTimeMillis; + FakeCommitLogService commitLogService = new FakeCommitLogService(syncTimeMillis); + Assert.assertEquals(expectedNanos, commitLogService.markerIntervalMillis); + Assert.assertEquals(expectedNanos, commitLogService.syncIntervalMillis); + } + + private static class FakeCommitLogService extends AbstractCommitLogService + { + FakeCommitLogService(long syncIntervalMillis) + { + super(new FakeCommitLog(), "This is not a real commit log", syncIntervalMillis, true); + lastSyncedAt = 0; + } + + @Override + void start() + { + // nop + } + + protected void maybeWaitForSync(CommitLogSegment.Allocation alloc) + { + // nop + } + } + + @Test + public void testSync() + { + long syncTimeMillis = AbstractCommitLogService.DEFAULT_MARKER_INTERVAL_MILLIS * 2; + FreeRunningClock clock = new FreeRunningClock(); + FakeCommitLogService commitLogService = new FakeCommitLogService(syncTimeMillis); + AbstractCommitLogService.SyncRunnable syncRunnable = commitLogService.new SyncRunnable(clock); + FakeCommitLog commitLog = (FakeCommitLog) commitLogService.commitLog; + + // at time 0 + Assert.assertTrue(syncRunnable.sync()); + Assert.assertEquals(1, commitLog.markCount.get()); + Assert.assertEquals(0, commitLog.syncCount.get()); + + // at time DEFAULT_MARKER_INTERVAL_MILLIS + clock.advance(DEFAULT_MARKER_INTERVAL_MILLIS, TimeUnit.MILLISECONDS); + Assert.assertTrue(syncRunnable.sync()); + Assert.assertEquals(2, commitLog.markCount.get()); + Assert.assertEquals(0, commitLog.syncCount.get()); + + // at time DEFAULT_MARKER_INTERVAL_MILLIS * 2 + clock.advance(DEFAULT_MARKER_INTERVAL_MILLIS, TimeUnit.MILLISECONDS); + Assert.assertTrue(syncRunnable.sync()); + Assert.assertEquals(2, commitLog.markCount.get()); + Assert.assertEquals(1, commitLog.syncCount.get()); + + // at time DEFAULT_MARKER_INTERVAL_MILLIS * 3, but with shutdown! + clock.advance(DEFAULT_MARKER_INTERVAL_MILLIS, TimeUnit.MILLISECONDS); + commitLogService.shutdown(); + Assert.assertFalse(syncRunnable.sync()); + Assert.assertEquals(2, commitLog.markCount.get()); + Assert.assertEquals(2, commitLog.syncCount.get()); + } + + private static class FakeCommitLog extends CommitLog + { + private final AtomicInteger markCount = new AtomicInteger(); + private final AtomicInteger syncCount = new AtomicInteger(); + + FakeCommitLog() + { + super(DatabaseDescriptor.getCommitLogLocation(), null); + } + + @Override + CommitLog start() + { + // this is a bit dicey. we need to start the allocator, but starting the parent's executor will muck things + // up as it is pointing to a different executor service, not the fake one in this test class. + allocator.start(); + return this; + } + + @Override + public void sync(boolean syncAllSegments, boolean flush) + { + if (flush) + syncCount.incrementAndGet(); + else + markCount.incrementAndGet(); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/db788fe8/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java index e2b9f72..b73275b 100644 --- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java +++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java @@ -65,7 +65,6 @@ public class CommitLogChainedMarkersTest DatabaseDescriptor.setCommitLogSegmentSize(5); DatabaseDescriptor.setCommitLogSync(Config.CommitLogSync.periodic); DatabaseDescriptor.setCommitLogSyncPeriod(10000 * 1000); - DatabaseDescriptor.setCommitLogMarkerPeriod(1); SchemaLoader.prepareServer(); SchemaLoader.createKeyspace(KEYSPACE1, KeyspaceParams.simple(1), http://git-wip-us.apache.org/repos/asf/cassandra/blob/db788fe8/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java index a1999ef..c615880 100644 --- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java +++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java @@ -64,13 +64,13 @@ public class CommitLogSegmentBackpressureTest @Test @BMRules(rules = {@BMRule(name = "Acquire Semaphore before sync", - targetClass = "AbstractCommitLogService$1", - targetMethod = "run", + targetClass = "AbstractCommitLogService$SyncRunnable", + targetMethod = "sync", targetLocation = "AT INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync(boolean, boolean)", action = "org.apache.cassandra.db.commitlog.CommitLogSegmentBackpressureTest.allowSync.acquire()"), @BMRule(name = "Release Semaphore after sync", - targetClass = "AbstractCommitLogService$1", - targetMethod = "run", + targetClass = "AbstractCommitLogService$SyncRunnable", + targetMethod = "sync", targetLocation = "AFTER INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync(boolean, boolean)", action = "org.apache.cassandra.db.commitlog.CommitLogSegmentBackpressureTest.allowSync.release()")}) public void testCompressedCommitLogBackpressure() throws Throwable http://git-wip-us.apache.org/repos/asf/cassandra/blob/db788fe8/test/unit/org/apache/cassandra/utils/FreeRunningClock.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/utils/FreeRunningClock.java b/test/unit/org/apache/cassandra/utils/FreeRunningClock.java new file mode 100644 index 0000000..83c8db7 --- /dev/null +++ b/test/unit/org/apache/cassandra/utils/FreeRunningClock.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.utils; + +import java.util.concurrent.TimeUnit; + +/** + * A freely adjustable clock that can be used for unit testing. See {@link Clock#instance} how to + * enable this class. + */ +public class FreeRunningClock extends Clock +{ + private long nanoTime = 0; + + @Override + public long nanoTime() + { + return nanoTime; + } + + @Override + public long currentTimeMillis() + { + return TimeUnit.NANOSECONDS.toMillis(nanoTime()); + } + + public void advance(long time, TimeUnit unit) + { + nanoTime += unit.toNanos(time); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
