This is an automated email from the ASF dual-hosted git repository.

dschneider pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 0b15a81  GEODE-6889, GEODE-6890: remove sync when updating max stats 
(#3728)
0b15a81 is described below

commit 0b15a8119b7b42df3b69825f42b9e096556c91a0
Author: Murtuza Boxwala <mboxw...@pivotal.io>
AuthorDate: Fri Jun 21 12:38:15 2019 -0400

    GEODE-6889, GEODE-6890: remove sync when updating max stats (#3728)
    
    * GEODE-6889: Ensure the highest value is recorded to replyWaitMaxTime
    * GEODE-6890: Ensure the highest value is recorded to maxSentMessagesTime
    
    Co-authored-by: Murtuza Boxwala <mboxw...@pivotal.io>
---
 .../distributed/internal/DistributionStats.java    |  45 ++++-----
 .../geode/distributed/internal/MaxLongGauge.java   |  51 ++++++++++
 .../internal/DistributionStatsTest.java            |  60 ++++++++++++
 .../internal/MaxLongGaugeConcurrentTest.java       |  73 ++++++++++++++
 .../distributed/internal/MaxLongGaugeTest.java     | 105 +++++++++++++++++++++
 5 files changed, 309 insertions(+), 25 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionStats.java
 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionStats.java
index 13a77fb..14e6bae 100644
--- 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionStats.java
+++ 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionStats.java
@@ -22,6 +22,7 @@ import org.apache.geode.StatisticsFactory;
 import org.apache.geode.StatisticsType;
 import org.apache.geode.StatisticsTypeFactory;
 import org.apache.geode.annotations.Immutable;
+import org.apache.geode.annotations.VisibleForTesting;
 import org.apache.geode.annotations.internal.MakeNotStatic;
 import org.apache.geode.internal.NanoTimer;
 import org.apache.geode.internal.logging.LogService;
@@ -48,8 +49,10 @@ public class DistributionStats implements DMStats {
   private static final int sentMessagesId;
   private static final int sentCommitMessagesId;
   private static final int commitWaitsId;
-  private static final int sentMessagesTimeId;
-  private static final int sentMessagesMaxTimeId;
+  @VisibleForTesting
+  static final int sentMessagesTimeId;
+  @VisibleForTesting
+  static final int sentMessagesMaxTimeId;
   private static final int broadcastMessagesId;
   private static final int broadcastMessagesTimeId;
   private static final int receivedMessagesId;
@@ -91,9 +94,11 @@ public class DistributionStats implements DMStats {
   private static final int serialQueueThrottleCountId;
   private static final int replyWaitsInProgressId;
   private static final int replyWaitsCompletedId;
-  private static final int replyWaitTimeId;
+  @VisibleForTesting
+  static final int replyWaitTimeId;
   private static final int replyTimeoutsId;
-  private static final int replyWaitMaxTimeId;
+  @VisibleForTesting
+  static final int replyWaitMaxTimeId;
   private static final int receiverConnectionsId;
   private static final int failedAcceptsId;
   private static final int failedConnectsId;
@@ -936,6 +941,9 @@ public class DistributionStats implements DMStats {
   /** The Statistics object that we delegate most behavior to */
   private final Statistics stats;
 
+  private final MaxLongGauge maxReplyWaitTime;
+  private final MaxLongGauge maxSentMessagesTime;
+
   // private final HistogramStats replyHandoffHistogram;
   // private final HistogramStats replyWaitHistogram;
 
@@ -946,7 +954,7 @@ public class DistributionStats implements DMStats {
    * factory.
    */
   public DistributionStats(StatisticsFactory f, long statId) {
-    this.stats = f.createAtomicStatistics(type, "distributionStats", statId);
+    this(f.createAtomicStatistics(type, "distributionStats", statId));
     // this.replyHandoffHistogram = new HistogramStats("ReplyHandOff", 
"nanoseconds", f,
     // new long[] {100000, 200000, 300000, 400000, 500000, 600000, 700000, 
800000, 900000, 1000000},
     // false);
@@ -960,6 +968,8 @@ public class DistributionStats implements DMStats {
    */
   public DistributionStats(Statistics stats) {
     this.stats = stats;
+    maxReplyWaitTime = new MaxLongGauge(replyWaitMaxTimeId, stats);
+    maxSentMessagesTime = new MaxLongGauge(sentMessagesMaxTimeId, stats);
     // this.replyHandoffHistogram = null;
     // this.replyWaitHistogram = null;
   }
@@ -1040,22 +1050,12 @@ public class DistributionStats implements DMStats {
   public void incSentMessagesTime(long nanos) {
     if (enableClockStats) {
       this.stats.incLong(sentMessagesTimeId, nanos);
-      long millis = nanos / 1000000;
-      if (getSentMessagesMaxTime() < millis) {
-        this.stats.setLong(sentMessagesMaxTimeId, millis);
-      }
+      long millis = NanoTimer.nanosToMillis(nanos);
+      maxSentMessagesTime.recordMax(millis);
     }
   }
 
   /**
-   * Returns the longest time required to distribute a message, in nanos
-   */
-  public long getSentMessagesMaxTime() {
-    return this.stats.getLong(sentMessagesMaxTimeId);
-  }
-
-
-  /**
    * Returns the total number of messages broadcast by the distribution manager
    */
   @Override
@@ -1376,10 +1376,6 @@ public class DistributionStats implements DMStats {
     return stats.getLong(replyWaitTimeId);
   }
 
-  public long getReplyWaitMaxTime() {
-    return stats.getLong(replyWaitMaxTimeId);
-  }
-
   @Override
   public long startSocketWrite(boolean sync) {
     if (sync) {
@@ -1600,6 +1596,7 @@ public class DistributionStats implements DMStats {
     return getStatTime();
   }
 
+
   @Override
   public void endReplyWait(long startNanos, long initTime) {
     if (enableClockStats) {
@@ -1607,10 +1604,8 @@ public class DistributionStats implements DMStats {
       // this.replyWaitHistogram.endOp(delta);
     }
     if (initTime != 0) {
-      long mswait = System.currentTimeMillis() - initTime;
-      if (mswait > getReplyWaitMaxTime()) {
-        stats.setLong(replyWaitMaxTimeId, mswait);
-      }
+      long waitTime = System.currentTimeMillis() - initTime;
+      maxReplyWaitTime.recordMax(waitTime);
     }
     stats.incInt(replyWaitsInProgressId, -1);
     stats.incInt(replyWaitsCompletedId, 1);
diff --git 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/MaxLongGauge.java
 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/MaxLongGauge.java
new file mode 100644
index 0000000..43c281a
--- /dev/null
+++ 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/MaxLongGauge.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.distributed.internal;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.geode.Statistics;
+
+/**
+ * This class holds the max value of a stat inside an AtomicLong. Every time a 
higher value is found
+ * the class will forward the delta between the old max and the new max to the 
statistics class.
+ * This pattern is a lock-less alternative to calling Statistics.getLong and 
then updating the max.
+ */
+class MaxLongGauge {
+  private final int statId;
+  private final Statistics stats;
+  private final AtomicLong max;
+
+  public MaxLongGauge(int statId, Statistics stats) {
+    this.statId = statId;
+    this.stats = stats;
+    max = new AtomicLong();
+  }
+
+  public void recordMax(long currentValue) {
+    boolean done = false;
+    while (!done) {
+      long maxValue = max.get();
+      if (currentValue <= maxValue) {
+        done = true;
+      } else {
+        done = max.compareAndSet(maxValue, currentValue);
+        if (done) {
+          stats.incLong(statId, currentValue - maxValue);
+        }
+      }
+    }
+  }
+}
diff --git 
a/geode-core/src/test/java/org/apache/geode/distributed/internal/DistributionStatsTest.java
 
b/geode-core/src/test/java/org/apache/geode/distributed/internal/DistributionStatsTest.java
new file mode 100644
index 0000000..84232f9
--- /dev/null
+++ 
b/geode-core/src/test/java/org/apache/geode/distributed/internal/DistributionStatsTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.distributed.internal;
+
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import org.apache.geode.Statistics;
+
+public class DistributionStatsTest {
+
+  private Statistics mockStats;
+  private DistributionStats distributionStats;
+
+  @Before
+  public void setup() {
+    mockStats = mock(Statistics.class);
+    distributionStats = new DistributionStats(mockStats);
+    DistributionStats.enableClockStats = true;
+  }
+
+  @After
+  public void cleanup() {
+    DistributionStats.enableClockStats = false;
+  }
+
+  @Test
+  public void endReplyWait() {
+    distributionStats.endReplyWait(12000000, 12);
+
+    verify(mockStats).incLong(eq(DistributionStats.replyWaitTimeId), 
Mockito.anyLong());
+    verify(mockStats).incLong(eq(DistributionStats.replyWaitMaxTimeId), 
Mockito.anyLong());
+  }
+
+  @Test
+  public void incSentMessagesTime() {
+    distributionStats.incSentMessagesTime(50000000L);
+
+    verify(mockStats).incLong(DistributionStats.sentMessagesTimeId, 50000000L);
+    verify(mockStats).incLong(DistributionStats.sentMessagesMaxTimeId, 50L);
+  }
+}
diff --git 
a/geode-core/src/test/java/org/apache/geode/distributed/internal/MaxLongGaugeConcurrentTest.java
 
b/geode-core/src/test/java/org/apache/geode/distributed/internal/MaxLongGaugeConcurrentTest.java
new file mode 100644
index 0000000..8260514
--- /dev/null
+++ 
b/geode-core/src/test/java/org/apache/geode/distributed/internal/MaxLongGaugeConcurrentTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.distributed.internal;
+
+import static 
org.apache.geode.internal.statistics.StatisticDescriptorImpl.createLongGauge;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.geode.StatisticDescriptor;
+import org.apache.geode.internal.statistics.StatisticsTypeImpl;
+import org.apache.geode.internal.statistics.StripedStatisticsImpl;
+import org.apache.geode.test.concurrency.ConcurrentTestRunner;
+import org.apache.geode.test.concurrency.ParallelExecutor;
+
+@RunWith(ConcurrentTestRunner.class)
+public class MaxLongGaugeConcurrentTest {
+  private static final int PARALLEL_COUNT = 100;
+  public static final int RECORDS_PER_TASK = 20;
+
+
+  @Test
+  public void recordMax(ParallelExecutor executor)
+      throws Exception {
+    StatisticDescriptor descriptor =
+        createLongGauge("1", "", "", true);
+    StatisticDescriptor[] descriptors = {descriptor};
+    StatisticsTypeImpl statisticsType = new StatisticsTypeImpl("abc", "test",
+        descriptors);
+    StripedStatisticsImpl fakeStatistics = new StripedStatisticsImpl(
+        statisticsType,
+        "def", 12, 10,
+        null);
+
+    MaxLongGauge maxLongGauge = new MaxLongGauge(descriptor.getId(), 
fakeStatistics);
+    ConcurrentLinkedQueue<Long> longs = new ConcurrentLinkedQueue<>();
+
+    executor.inParallel(() -> {
+      for (int i = 0; i < RECORDS_PER_TASK; i++) {
+        long value = ThreadLocalRandom.current().nextLong();
+        maxLongGauge.recordMax(value);
+        longs.add(value);
+      }
+    }, PARALLEL_COUNT);
+    executor.execute();
+
+    long actualMax = fakeStatistics.getLong(descriptor.getId());
+    long expectedMax = getMax(longs);
+
+    assertThat(longs).hasSize(RECORDS_PER_TASK * PARALLEL_COUNT);
+    assertThat(actualMax).isEqualTo(expectedMax);
+  }
+
+  private long getMax(ConcurrentLinkedQueue<Long> longs) {
+    return Math.max(longs.parallelStream().max(Long::compareTo).get(), 0);
+  }
+}
diff --git 
a/geode-core/src/test/java/org/apache/geode/distributed/internal/MaxLongGaugeTest.java
 
b/geode-core/src/test/java/org/apache/geode/distributed/internal/MaxLongGaugeTest.java
new file mode 100644
index 0000000..5a84d1d
--- /dev/null
+++ 
b/geode-core/src/test/java/org/apache/geode/distributed/internal/MaxLongGaugeTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.distributed.internal;
+
+import static 
org.apache.geode.internal.statistics.StatisticDescriptorImpl.createLongGauge;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.StatisticDescriptor;
+import org.apache.geode.internal.statistics.StatisticsTypeImpl;
+import org.apache.geode.internal.statistics.StripedStatisticsImpl;
+
+public class MaxLongGaugeTest {
+  private StripedStatisticsImpl fakeStatistics;
+  private int statId1;
+  private int statId2;
+
+  @Before
+  public void setup() {
+    StatisticDescriptor descriptor1 =
+        createLongGauge("1", "", "", true);
+    StatisticDescriptor descriptor2 =
+        createLongGauge("2", "", "", true);
+
+    StatisticDescriptor[] descriptors = {descriptor1, descriptor2};
+    StatisticsTypeImpl statisticsType = new StatisticsTypeImpl("abc", "test",
+        descriptors);
+    statId1 = descriptor1.getId();
+    statId2 = descriptor2.getId();
+    fakeStatistics = new StripedStatisticsImpl(
+        statisticsType,
+        "def", 12, 10,
+        null);
+  }
+
+  @Test
+  public void recordMax_singleRecord() {
+    MaxLongGauge maxLongGauge = new MaxLongGauge(statId1, fakeStatistics);
+
+    maxLongGauge.recordMax(12);
+
+    assertThat(fakeStatistics.getLong(statId1)).isEqualTo(12);
+  }
+
+  @Test
+  public void recordMax_multipleRecords() {
+    MaxLongGauge maxLongGauge = new MaxLongGauge(statId1, fakeStatistics);
+
+    maxLongGauge.recordMax(12);
+    maxLongGauge.recordMax(13);
+
+    assertThat(fakeStatistics.getLong(statId1)).isEqualTo(13);
+  }
+
+  @Test
+  public void recordMax_recordNothing_ifMaxIsNotExceeded() {
+    MaxLongGauge maxLongGauge = new MaxLongGauge(statId1, fakeStatistics);
+
+    maxLongGauge.recordMax(12);
+    maxLongGauge.recordMax(11);
+
+    assertThat(fakeStatistics.getLong(statId1)).isEqualTo(12);
+  }
+
+  @Test
+  public void recordMax_ignoresNegatives() {
+    MaxLongGauge maxLongGauge = new MaxLongGauge(statId1, fakeStatistics);
+
+    maxLongGauge.recordMax(-12);
+
+    assertThat(fakeStatistics.getLong(statId1)).isEqualTo(0);
+  }
+
+  @Test
+  public void recordMax_ignoresZero() {
+    MaxLongGauge maxLongGauge = new MaxLongGauge(statId1, fakeStatistics);
+
+    maxLongGauge.recordMax(0);
+
+    assertThat(fakeStatistics.getLong(statId1)).isEqualTo(0);
+  }
+
+  @Test
+  public void recordMax_usesStatId() {
+    MaxLongGauge maxLongGauge = new MaxLongGauge(statId2, fakeStatistics);
+
+    maxLongGauge.recordMax(17);
+
+    assertThat(fakeStatistics.getLong(statId2)).isEqualTo(17);
+  }
+}

Reply via email to