This is an automated email from the ASF dual-hosted git repository.

maedhroz pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5f23e6d  Add isolated flush timer to CommitLogMetrics and ensure 
writes correspond to single WaitingOnCommit data points
5f23e6d is described below

commit 5f23e6d766a18782db82d955bf380239990d2c84
Author: Caleb Rackliffe <calebrackli...@gmail.com>
AuthorDate: Wed Jun 23 14:25:12 2021 -0500

    Add isolated flush timer to CommitLogMetrics and ensure writes correspond 
to single WaitingOnCommit data points
    
    patch by Caleb Rackliffe; reviewed by Yifan Cai for CASSANDRA-16701
---
 CHANGES.txt                                          |  1 +
 .../cassandra/db/commitlog/CommitLogSegment.java     | 18 ++++++++++++------
 .../apache/cassandra/metrics/CommitLogMetrics.java   |  3 +++
 .../cassandra/db/commitlog/BatchCommitLogTest.java   | 20 ++++++++++++++++++++
 4 files changed, 36 insertions(+), 6 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 85ad263..91963bd 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.1
+ * Add isolated flush timer to CommitLogMetrics and ensure writes correspond 
to single WaitingOnCommit data points (CASSANDRA-16701)
  * Add a system property to set hostId if not yet initialized (CASSANDRA-14582)
  * GossiperTest.testHasVersion3Nodes didn't take into account trunk version 
changes, fixed to rely on latest version (CASSANDRA-16651)
 Merged from 4.0:
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java 
b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
index 5303de9..246657f 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
@@ -366,7 +366,11 @@ public abstract class CommitLogSegment
 
         if (flush || close)
         {
-            flush(startMarker, sectionEnd);
+            try (Timer.Context ignored = 
CommitLog.instance.metrics.waitingOnFlush.time())
+            {
+                flush(startMarker, sectionEnd);
+            }
+            
             if (cdcState == CDCState.CONTAINS)
                 writeCDCIndexFile(descriptor, sectionEnd, close);
             lastSyncedOffset = lastMarkerOffset = nextMarker;
@@ -499,13 +503,12 @@ public abstract class CommitLogSegment
         }
     }
 
-    void waitForSync(int position, Timer waitingOnCommit)
+    void waitForSync(int position)
     {
         while (lastSyncedOffset < position)
         {
-            WaitQueue.Signal signal = waitingOnCommit != null ?
-                                      
syncComplete.register(waitingOnCommit.time()) :
-                                      syncComplete.register();
+            WaitQueue.Signal signal = syncComplete.register();
+            
             if (lastSyncedOffset < position)
                 signal.awaitUninterruptibly();
             else
@@ -742,7 +745,10 @@ public abstract class CommitLogSegment
 
         void awaitDiskSync(Timer waitingOnCommit)
         {
-            segment.waitForSync(position, waitingOnCommit);
+            try (Timer.Context ignored = waitingOnCommit.time())
+            {
+                segment.waitForSync(position);
+            }
         }
 
         /**
diff --git a/src/java/org/apache/cassandra/metrics/CommitLogMetrics.java 
b/src/java/org/apache/cassandra/metrics/CommitLogMetrics.java
index a3302bc..cb53575 100644
--- a/src/java/org/apache/cassandra/metrics/CommitLogMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/CommitLogMetrics.java
@@ -42,6 +42,8 @@ public class CommitLogMetrics
     public final Timer waitingOnSegmentAllocation;
     /** The time spent waiting on CL sync; for Periodic this is only occurs 
when the sync is lagging its sync interval */
     public final Timer waitingOnCommit;
+    /** Time spent actually flushing the contents of a buffer to disk */
+    public final Timer waitingOnFlush;
     /** Number and rate of oversized mutations */
     public final Meter oversizedMutations;
 
@@ -49,6 +51,7 @@ public class CommitLogMetrics
     {
         waitingOnSegmentAllocation = 
Metrics.timer(factory.createMetricName("WaitingOnSegmentAllocation"));
         waitingOnCommit = 
Metrics.timer(factory.createMetricName("WaitingOnCommit"));
+        waitingOnFlush = 
Metrics.timer(factory.createMetricName("WaitingOnFlush"));
         oversizedMutations = 
Metrics.meter(factory.createMetricName("OverSizedMutations"));
     }
 
diff --git 
a/test/unit/org/apache/cassandra/db/commitlog/BatchCommitLogTest.java 
b/test/unit/org/apache/cassandra/db/commitlog/BatchCommitLogTest.java
index fb7dda1..7336e03 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/BatchCommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/BatchCommitLogTest.java
@@ -34,6 +34,8 @@ import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.RowUpdateBuilder;
 import org.apache.cassandra.security.EncryptionContext;
 
+import static org.junit.Assert.assertEquals;
+
 public class BatchCommitLogTest extends CommitLogTest
 {
     private static final long CL_BATCH_SYNC_WINDOW = 1000; // 1 second
@@ -75,4 +77,22 @@ public class BatchCommitLogTest extends CommitLogTest
         Assert.assertTrue("Expect batch commitlog shutdown immediately, but 
took " + delta, delta < CL_BATCH_SYNC_WINDOW);
         CommitLog.instance.start();
     }
+
+    @Test
+    public void testFlushAndWaitingMetrics()
+    {
+        ColumnFamilyStore cfs1 = 
Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
+        Mutation m = new RowUpdateBuilder(cfs1.metadata.get(), 0, 
"key").clustering("bytes")
+                                                                        
.add("val", ByteBuffer.allocate(10 * 1024))
+                                                                        
.build();
+
+        long startingFlushCount = 
CommitLog.instance.metrics.waitingOnFlush.getCount();
+        long startingWaitCount = 
CommitLog.instance.metrics.waitingOnCommit.getCount();
+
+        CommitLog.instance.add(m);
+
+        // We should register single new flush and waiting data points.
+        assertEquals(startingFlushCount + 1, 
CommitLog.instance.metrics.waitingOnFlush.getCount());
+        assertEquals(startingWaitCount + 1, 
CommitLog.instance.metrics.waitingOnCommit.getCount());
+    }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to