Merge branch 'cassandra-3.0' into cassandra-3.11

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/77a12053
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/77a12053
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/77a12053

Branch: refs/heads/trunk
Commit: 77a12053b69ceebd529556d5159f9325703283eb
Parents: b92d90d 214a3ab
Author: Jason Brown <jasedbr...@gmail.com>
Authored: Tue Jun 5 13:48:56 2018 -0700
Committer: Jason Brown <jasedbr...@gmail.com>
Committed: Tue Jun 5 13:50:36 2018 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../db/commitlog/AbstractCommitLogService.java  | 88 +++++++++++++-------
 .../commitlog/AbstractCommitLogServiceTest.java | 49 ++++++++++-
 3 files changed, 105 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/77a12053/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 2d4ef25,dfdfbfd..2e77d2e
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,21 -1,5 +1,22 @@@
 -3.0.17
 +3.11.3
 + * Reduce nodetool GC thread count (CASSANDRA-14475)
 + * Fix New SASI view creation during Index Redistribution (CASSANDRA-14055)
 + * Remove string formatting lines from BufferPool hot path (CASSANDRA-14416)
 + * Update metrics to 3.1.5 (CASSANDRA-12924)
 + * Detect OpenJDK jvm type and architecture (CASSANDRA-12793)
 + * Don't use guava collections in the non-system keyspace jmx attributes 
(CASSANDRA-12271)
 + * Allow existing nodes to use all peers in shadow round (CASSANDRA-13851)
 + * Fix cqlsh to read connection.ssl cqlshrc option again (CASSANDRA-14299)
 + * Downgrade log level to trace for CommitLogSegmentManager (CASSANDRA-14370)
 + * CQL fromJson(null) throws NullPointerException (CASSANDRA-13891)
 + * Serialize empty buffer as empty string for json output format 
(CASSANDRA-14245)
 + * Allow logging implementation to be interchanged for embedded testing 
(CASSANDRA-13396)
 + * SASI tokenizer for simple delimiter based entries (CASSANDRA-14247)
 + * Fix Loss of digits when doing CAST from varint/bigint to decimal 
(CASSANDRA-14170)
 + * RateBasedBackPressure unnecessarily invokes a lock on the Guava 
RateLimiter (CASSANDRA-14163)
 + * Fix wildcard GROUP BY queries (CASSANDRA-14209)
 +Merged from 3.0:
+  * Fix regression of lagging commitlog flush log message (CASSANDRA-14451)
   * Add Missing dependencies in pom-all (CASSANDRA-14422)
   * Cleanup StartupClusterConnectivityChecker and PING Verb (CASSANDRA-14447)
   * Fix deprecated repair error notifications from 3.x clusters to legacy JMX 
clients (CASSANDRA-13121)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/77a12053/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
----------------------------------------------------------------------
diff --cc 
src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
index 7c5d300,0845bd5..b7ab705
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
@@@ -17,15 -17,6 +17,16 @@@
   */
  package org.apache.cassandra.db.commitlog;
  
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicLong;
 +import java.util.concurrent.locks.LockSupport;
 +
++import com.google.common.annotations.VisibleForTesting;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import com.codahale.metrics.Timer.Context;
 +
  import org.apache.cassandra.concurrent.NamedThreadFactory;
  import org.apache.cassandra.config.Config;
  import org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
@@@ -162,14 -160,15 +163,15 @@@ public abstract class AbstractCommitLog
  
          boolean sync()
          {
 +            // always run once after shutdown signalled
 +            boolean shutdownRequested = shutdown;
 +
              try
              {
 -                // always run once after shutdown signalled
 -                boolean run = !shutdown;
 -
                  // sync and signal
 -                long pollStarted = clock.currentTimeMillis();
 -                boolean flushToDisk = lastSyncedAt + syncIntervalMillis <= 
pollStarted || shutdown || syncRequested;
 +                long pollStarted = clock.nanoTime();
-                 if (lastSyncedAt + syncIntervalNanos <= pollStarted || 
shutdownRequested || syncRequested)
++                boolean flushToDisk = lastSyncedAt + syncIntervalNanos <= 
pollStarted || shutdownRequested || syncRequested;
+                 if (flushToDisk)
                  {
                      // in this branch, we want to flush the commit log to disk
                      syncRequested = false;
@@@ -181,47 -180,30 +183,19 @@@
                  else
                  {
                      // in this branch, just update the commit log sync headers
 -                    commitLog.sync(false, false);
 +                    commitLog.sync(false);
                  }
  
-                 // sleep any time we have left before the next one is due
 -                long now = clock.currentTimeMillis();
 +                long now = clock.nanoTime();
-                 long wakeUpAt = pollStarted + markerIntervalNanos;
-                 if (wakeUpAt < now)
-                 {
-                     // if we have lagged noticeably, update our lag counter
-                     if (firstLagAt == 0)
-                     {
-                         firstLagAt = now;
-                         totalSyncDuration = syncExceededIntervalBy = 
syncCount = lagCount = 0;
-                     }
-                     syncExceededIntervalBy += now - wakeUpAt;
-                     lagCount++;
-                 }
-                 totalSyncDuration += now - pollStarted;
- 
-                 if (firstLagAt > 0)
-                 {
-                     //Only reset the lag tracking if it actually logged this 
time
-                     boolean logged = NoSpamLogger.log(logger,
-                                                       NoSpamLogger.Level.WARN,
-                                                       5,
-                                                       TimeUnit.MINUTES,
-                                                       "Out of {} commit log 
syncs over the past {}s with average duration of {}ms, {} have exceeded the 
configured commit interval by an average of {}ms",
-                                                       syncCount,
-                                                       String.format("%.2f", 
(now - firstLagAt) * 1e-9d),
-                                                       String.format("%.2f", 
totalSyncDuration * 1e-6d / syncCount),
-                                                       lagCount,
-                                                       String.format("%.2f", 
syncExceededIntervalBy * 1e-6d / lagCount));
-                     if (logged)
-                         firstLagAt = 0;
-                 }
+                 if (flushToDisk)
+                     maybeLogFlushLag(pollStarted, now);
  
 -                if (!run)
 +                if (shutdownRequested)
                      return false;
  
 -                // if we have lagged this round, we probably have work to do 
already so we don't sleep
 -                long sleep = pollStarted + markerIntervalMillis - now;
 -                if (sleep < 0)
 -                    return true;
 -
 -                try
 -                {
 -                    haveWork.tryAcquire(sleep, TimeUnit.MILLISECONDS);
 -                    haveWork.drainPermits();
 -                }
 -                catch (InterruptedException e)
 -                {
 -                    throw new AssertionError();
 -                }
++                long wakeUpAt = pollStarted + markerIntervalNanos;
 +                if (wakeUpAt > now)
 +                    LockSupport.parkNanos(wakeUpAt - now);
              }
              catch (Throwable t)
              {
@@@ -229,13 -211,67 +203,63 @@@
                      return false;
  
                  // sleep for full poll-interval after an error, so we don't 
spam the log file
 -                try
 -                {
 -                    haveWork.tryAcquire(markerIntervalMillis, 
TimeUnit.MILLISECONDS);
 -                }
 -                catch (InterruptedException e)
 -                {
 -                    throw new AssertionError();
 -                }
 +                LockSupport.parkNanos(markerIntervalNanos);
              }
 +
              return true;
          }
+ 
+         /**
 -         * Add a log entry whenever the time to flush the commit log to disk 
exceeds {@link #syncIntervalMillis}.
++         * Add a log entry whenever the time to flush the commit log to disk 
exceeds {@link #syncIntervalNanos}.
+          */
+         @VisibleForTesting
+         boolean maybeLogFlushLag(long pollStarted, long now)
+         {
+             long flushDuration = now - pollStarted;
+             totalSyncDuration += flushDuration;
+ 
+             // this is the timestamp by which we should have completed the 
flush
 -            long maxFlushTimestamp = pollStarted + syncIntervalMillis;
++            long maxFlushTimestamp = pollStarted + syncIntervalNanos;
+             if (maxFlushTimestamp > now)
+                 return false;
+ 
+             // if we have lagged noticeably, update our lag counter
+             if (firstLagAt == 0)
+             {
+                 firstLagAt = now;
+                 syncExceededIntervalBy = lagCount = 0;
+                 syncCount = 1;
+                 totalSyncDuration = flushDuration;
+             }
+             syncExceededIntervalBy += now - maxFlushTimestamp;
+             lagCount++;
+ 
+             if (firstLagAt > 0)
+             {
+                 //Only reset the lag tracking if it actually logged this time
 -                boolean logged = NoSpamLogger.log(
 -                logger,
 -                NoSpamLogger.Level.WARN,
 -                5,
 -                TimeUnit.MINUTES,
 -                "Out of {} commit log syncs over the past {}s with average 
duration of {}ms, {} have exceeded the configured commit interval by an average 
of {}ms",
 -                syncCount, (now - firstLagAt) / 1000, String.format("%.2f", 
(double) totalSyncDuration / syncCount), lagCount, String.format("%.2f", 
(double) syncExceededIntervalBy / lagCount));
++                boolean logged = NoSpamLogger.log(logger,
++                                                  NoSpamLogger.Level.WARN,
++                                                  5,
++                                                  TimeUnit.MINUTES,
++                                                  "Out of {} commit log syncs 
over the past {}s with average duration of {}ms, {} have exceeded the 
configured commit interval by an average of {}ms",
++                                                  syncCount,
++                                                  String.format("%.2f", (now 
- firstLagAt) * 1e-9d),
++                                                  String.format("%.2f", 
totalSyncDuration * 1e-6d / syncCount),
++                                                  lagCount,
++                                                  String.format("%.2f", 
syncExceededIntervalBy * 1e-6d / lagCount));
+                 if (logged)
+                     firstLagAt = 0;
+             }
+             return true;
+         }
+ 
+         @VisibleForTesting
+         long getTotalSyncDuration()
+         {
+             return totalSyncDuration;
+         }
      }
  
 -
      /**
       * Block for @param alloc to be sync'd as necessary, and handle 
bookkeeping
       */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/77a12053/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java
----------------------------------------------------------------------
diff --cc 
test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java
index 18f15fa,6f51eaf..bc5cb29
--- 
a/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java
+++ 
b/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java
@@@ -163,4 -174,50 +164,50 @@@ public class AbstractCommitLogServiceTe
                  markCount.incrementAndGet();
          }
      }
+ 
+     @Test
+     public void maybeLogFlushLag_MustLog()
+     {
+         long syncTimeMillis = 10;
+         SyncRunnable syncRunnable = new 
FakeCommitLogService(syncTimeMillis).new SyncRunnable(new FreeRunningClock());
+         long pollStarted = 1;
 -        long now = pollStarted + (syncTimeMillis * 2);
++        long now = Integer.MAX_VALUE;
+         Assert.assertTrue(syncRunnable.maybeLogFlushLag(pollStarted, now));
+         Assert.assertEquals(now - pollStarted, 
syncRunnable.getTotalSyncDuration());
+     }
+ 
+     @Test
+     public void maybeLogFlushLag_NoLog()
+     {
+         long syncTimeMillis = 10;
+         SyncRunnable syncRunnable = new 
FakeCommitLogService(syncTimeMillis).new SyncRunnable(new FreeRunningClock());
+         long pollStarted = 1;
+         long now = pollStarted + (syncTimeMillis - 1);
+         Assert.assertFalse(syncRunnable.maybeLogFlushLag(pollStarted, now));
+         Assert.assertEquals(now - pollStarted, 
syncRunnable.getTotalSyncDuration());
+     }
+ 
+     /**
+      * Mostly tests that {@link SyncRunnable#totalSyncDuration} is handled 
correctly
+      */
+     @Test
+     public void maybeLogFlushLag_MultipleOperations()
+     {
+         long syncTimeMillis = 10;
+         SyncRunnable syncRunnable = new 
FakeCommitLogService(syncTimeMillis).new SyncRunnable(new FreeRunningClock());
+ 
+         long pollStarted = 1;
+         long now = pollStarted + (syncTimeMillis - 1);
+ 
+         int runCount = 12;
+         for (int i = 1; i <= runCount; i++)
+         {
+             Assert.assertFalse(syncRunnable.maybeLogFlushLag(pollStarted, 
now));
+             Assert.assertEquals(i * (now - pollStarted), 
syncRunnable.getTotalSyncDuration());
+         }
+ 
 -        now = pollStarted + (syncTimeMillis * 2);
++        now = pollStarted + Integer.MAX_VALUE;
+         Assert.assertTrue(syncRunnable.maybeLogFlushLag(pollStarted, now));
+         Assert.assertEquals(now - pollStarted, 
syncRunnable.getTotalSyncDuration());
+     }
  }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to