Merge branch 'cassandra-3.0' into cassandra-3.11
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/77a12053 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/77a12053 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/77a12053 Branch: refs/heads/trunk Commit: 77a12053b69ceebd529556d5159f9325703283eb Parents: b92d90d 214a3ab Author: Jason Brown <jasedbr...@gmail.com> Authored: Tue Jun 5 13:48:56 2018 -0700 Committer: Jason Brown <jasedbr...@gmail.com> Committed: Tue Jun 5 13:50:36 2018 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../db/commitlog/AbstractCommitLogService.java | 88 +++++++++++++------- .../commitlog/AbstractCommitLogServiceTest.java | 49 ++++++++++- 3 files changed, 105 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/77a12053/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 2d4ef25,dfdfbfd..2e77d2e --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,21 -1,5 +1,22 @@@ -3.0.17 +3.11.3 + * Reduce nodetool GC thread count (CASSANDRA-14475) + * Fix New SASI view creation during Index Redistribution (CASSANDRA-14055) + * Remove string formatting lines from BufferPool hot path (CASSANDRA-14416) + * Update metrics to 3.1.5 (CASSANDRA-12924) + * Detect OpenJDK jvm type and architecture (CASSANDRA-12793) + * Don't use guava collections in the non-system keyspace jmx attributes (CASSANDRA-12271) + * Allow existing nodes to use all peers in shadow round (CASSANDRA-13851) + * Fix cqlsh to read connection.ssl cqlshrc option again (CASSANDRA-14299) + * Downgrade log level to trace for CommitLogSegmentManager (CASSANDRA-14370) + * CQL fromJson(null) throws NullPointerException (CASSANDRA-13891) + * Serialize empty buffer as empty string for json output format (CASSANDRA-14245) + * Allow logging implementation to be interchanged for embedded testing (CASSANDRA-13396) + * SASI tokenizer for simple delimiter based entries (CASSANDRA-14247) + * Fix Loss of digits when doing CAST from varint/bigint to decimal (CASSANDRA-14170) + * RateBasedBackPressure unnecessarily invokes a lock on the Guava RateLimiter (CASSANDRA-14163) + * Fix wildcard GROUP BY queries (CASSANDRA-14209) +Merged from 3.0: + * Fix regression of lagging commitlog flush log message (CASSANDRA-14451) * Add Missing dependencies in pom-all (CASSANDRA-14422) * Cleanup StartupClusterConnectivityChecker and PING Verb (CASSANDRA-14447) * Fix deprecated repair error notifications from 3.x clusters to legacy JMX clients (CASSANDRA-13121) http://git-wip-us.apache.org/repos/asf/cassandra/blob/77a12053/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java index 7c5d300,0845bd5..b7ab705 --- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java +++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java @@@ -17,15 -17,6 +17,16 @@@ */ package org.apache.cassandra.db.commitlog; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.LockSupport; + ++import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.codahale.metrics.Timer.Context; + import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.config.Config; import org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation; @@@ -162,14 -160,15 +163,15 @@@ public abstract class AbstractCommitLog boolean sync() { + // always run once after shutdown signalled + boolean shutdownRequested = shutdown; + try { - // always run once after shutdown signalled - boolean run = !shutdown; - // sync and signal - long pollStarted = clock.currentTimeMillis(); - boolean flushToDisk = lastSyncedAt + syncIntervalMillis <= pollStarted || shutdown || syncRequested; + long pollStarted = clock.nanoTime(); - if (lastSyncedAt + syncIntervalNanos <= pollStarted || shutdownRequested || syncRequested) ++ boolean flushToDisk = lastSyncedAt + syncIntervalNanos <= pollStarted || shutdownRequested || syncRequested; + if (flushToDisk) { // in this branch, we want to flush the commit log to disk syncRequested = false; @@@ -181,47 -180,30 +183,19 @@@ else { // in this branch, just update the commit log sync headers - commitLog.sync(false, false); + commitLog.sync(false); } - // sleep any time we have left before the next one is due - long now = clock.currentTimeMillis(); + long now = clock.nanoTime(); - long wakeUpAt = pollStarted + markerIntervalNanos; - if (wakeUpAt < now) - { - // if we have lagged noticeably, update our lag counter - if (firstLagAt == 0) - { - firstLagAt = now; - totalSyncDuration = syncExceededIntervalBy = syncCount = lagCount = 0; - } - syncExceededIntervalBy += now - wakeUpAt; - lagCount++; - } - totalSyncDuration += now - pollStarted; - - if (firstLagAt > 0) - { - //Only reset the lag tracking if it actually logged this time - boolean logged = NoSpamLogger.log(logger, - NoSpamLogger.Level.WARN, - 5, - TimeUnit.MINUTES, - "Out of {} commit log syncs over the past {}s with average duration of {}ms, {} have exceeded the configured commit interval by an average of {}ms", - syncCount, - String.format("%.2f", (now - firstLagAt) * 1e-9d), - String.format("%.2f", totalSyncDuration * 1e-6d / syncCount), - lagCount, - String.format("%.2f", syncExceededIntervalBy * 1e-6d / lagCount)); - if (logged) - firstLagAt = 0; - } + if (flushToDisk) + maybeLogFlushLag(pollStarted, now); - if (!run) + if (shutdownRequested) return false; - // if we have lagged this round, we probably have work to do already so we don't sleep - long sleep = pollStarted + markerIntervalMillis - now; - if (sleep < 0) - return true; - - try - { - haveWork.tryAcquire(sleep, TimeUnit.MILLISECONDS); - haveWork.drainPermits(); - } - catch (InterruptedException e) - { - throw new AssertionError(); - } ++ long wakeUpAt = pollStarted + markerIntervalNanos; + if (wakeUpAt > now) + LockSupport.parkNanos(wakeUpAt - now); } catch (Throwable t) { @@@ -229,13 -211,67 +203,63 @@@ return false; // sleep for full poll-interval after an error, so we don't spam the log file - try - { - haveWork.tryAcquire(markerIntervalMillis, TimeUnit.MILLISECONDS); - } - catch (InterruptedException e) - { - throw new AssertionError(); - } + LockSupport.parkNanos(markerIntervalNanos); } + return true; } + + /** - * Add a log entry whenever the time to flush the commit log to disk exceeds {@link #syncIntervalMillis}. ++ * Add a log entry whenever the time to flush the commit log to disk exceeds {@link #syncIntervalNanos}. + */ + @VisibleForTesting + boolean maybeLogFlushLag(long pollStarted, long now) + { + long flushDuration = now - pollStarted; + totalSyncDuration += flushDuration; + + // this is the timestamp by which we should have completed the flush - long maxFlushTimestamp = pollStarted + syncIntervalMillis; ++ long maxFlushTimestamp = pollStarted + syncIntervalNanos; + if (maxFlushTimestamp > now) + return false; + + // if we have lagged noticeably, update our lag counter + if (firstLagAt == 0) + { + firstLagAt = now; + syncExceededIntervalBy = lagCount = 0; + syncCount = 1; + totalSyncDuration = flushDuration; + } + syncExceededIntervalBy += now - maxFlushTimestamp; + lagCount++; + + if (firstLagAt > 0) + { + //Only reset the lag tracking if it actually logged this time - boolean logged = NoSpamLogger.log( - logger, - NoSpamLogger.Level.WARN, - 5, - TimeUnit.MINUTES, - "Out of {} commit log syncs over the past {}s with average duration of {}ms, {} have exceeded the configured commit interval by an average of {}ms", - syncCount, (now - firstLagAt) / 1000, String.format("%.2f", (double) totalSyncDuration / syncCount), lagCount, String.format("%.2f", (double) syncExceededIntervalBy / lagCount)); ++ boolean logged = NoSpamLogger.log(logger, ++ NoSpamLogger.Level.WARN, ++ 5, ++ TimeUnit.MINUTES, ++ "Out of {} commit log syncs over the past {}s with average duration of {}ms, {} have exceeded the configured commit interval by an average of {}ms", ++ syncCount, ++ String.format("%.2f", (now - firstLagAt) * 1e-9d), ++ String.format("%.2f", totalSyncDuration * 1e-6d / syncCount), ++ lagCount, ++ String.format("%.2f", syncExceededIntervalBy * 1e-6d / lagCount)); + if (logged) + firstLagAt = 0; + } + return true; + } + + @VisibleForTesting + long getTotalSyncDuration() + { + return totalSyncDuration; + } } - /** * Block for @param alloc to be sync'd as necessary, and handle bookkeeping */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/77a12053/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java index 18f15fa,6f51eaf..bc5cb29 --- a/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java +++ b/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java @@@ -163,4 -174,50 +164,50 @@@ public class AbstractCommitLogServiceTe markCount.incrementAndGet(); } } + + @Test + public void maybeLogFlushLag_MustLog() + { + long syncTimeMillis = 10; + SyncRunnable syncRunnable = new FakeCommitLogService(syncTimeMillis).new SyncRunnable(new FreeRunningClock()); + long pollStarted = 1; - long now = pollStarted + (syncTimeMillis * 2); ++ long now = Integer.MAX_VALUE; + Assert.assertTrue(syncRunnable.maybeLogFlushLag(pollStarted, now)); + Assert.assertEquals(now - pollStarted, syncRunnable.getTotalSyncDuration()); + } + + @Test + public void maybeLogFlushLag_NoLog() + { + long syncTimeMillis = 10; + SyncRunnable syncRunnable = new FakeCommitLogService(syncTimeMillis).new SyncRunnable(new FreeRunningClock()); + long pollStarted = 1; + long now = pollStarted + (syncTimeMillis - 1); + Assert.assertFalse(syncRunnable.maybeLogFlushLag(pollStarted, now)); + Assert.assertEquals(now - pollStarted, syncRunnable.getTotalSyncDuration()); + } + + /** + * Mostly tests that {@link SyncRunnable#totalSyncDuration} is handled correctly + */ + @Test + public void maybeLogFlushLag_MultipleOperations() + { + long syncTimeMillis = 10; + SyncRunnable syncRunnable = new FakeCommitLogService(syncTimeMillis).new SyncRunnable(new FreeRunningClock()); + + long pollStarted = 1; + long now = pollStarted + (syncTimeMillis - 1); + + int runCount = 12; + for (int i = 1; i <= runCount; i++) + { + Assert.assertFalse(syncRunnable.maybeLogFlushLag(pollStarted, now)); + Assert.assertEquals(i * (now - pollStarted), syncRunnable.getTotalSyncDuration()); + } + - now = pollStarted + (syncTimeMillis * 2); ++ now = pollStarted + Integer.MAX_VALUE; + Assert.assertTrue(syncRunnable.maybeLogFlushLag(pollStarted, now)); + Assert.assertEquals(now - pollStarted, syncRunnable.getTotalSyncDuration()); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org