This is an automated email from the ASF dual-hosted git repository. dschneider pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push: new 0b15a81 GEODE-6889, GEODE-6890: remove sync when updating max stats (#3728) 0b15a81 is described below commit 0b15a8119b7b42df3b69825f42b9e096556c91a0 Author: Murtuza Boxwala <mboxw...@pivotal.io> AuthorDate: Fri Jun 21 12:38:15 2019 -0400 GEODE-6889, GEODE-6890: remove sync when updating max stats (#3728) * GEODE-6889: Ensure the highest value is recorded to replyWaitMaxTime * GEODE-6890: Ensure the highest value is recorded to maxSentMessagesTime Co-authored-by: Murtuza Boxwala <mboxw...@pivotal.io> --- .../distributed/internal/DistributionStats.java | 45 ++++----- .../geode/distributed/internal/MaxLongGauge.java | 51 ++++++++++ .../internal/DistributionStatsTest.java | 60 ++++++++++++ .../internal/MaxLongGaugeConcurrentTest.java | 73 ++++++++++++++ .../distributed/internal/MaxLongGaugeTest.java | 105 +++++++++++++++++++++ 5 files changed, 309 insertions(+), 25 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionStats.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionStats.java index 13a77fb..14e6bae 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionStats.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionStats.java @@ -22,6 +22,7 @@ import org.apache.geode.StatisticsFactory; import org.apache.geode.StatisticsType; import org.apache.geode.StatisticsTypeFactory; import org.apache.geode.annotations.Immutable; +import org.apache.geode.annotations.VisibleForTesting; import org.apache.geode.annotations.internal.MakeNotStatic; import org.apache.geode.internal.NanoTimer; import org.apache.geode.internal.logging.LogService; @@ -48,8 +49,10 @@ public class DistributionStats implements DMStats { private static final int sentMessagesId; private static final int sentCommitMessagesId; private static final int commitWaitsId; - private static final int sentMessagesTimeId; - private static final int sentMessagesMaxTimeId; + @VisibleForTesting + static final int sentMessagesTimeId; + @VisibleForTesting + static final int sentMessagesMaxTimeId; private static final int broadcastMessagesId; private static final int broadcastMessagesTimeId; private static final int receivedMessagesId; @@ -91,9 +94,11 @@ public class DistributionStats implements DMStats { private static final int serialQueueThrottleCountId; private static final int replyWaitsInProgressId; private static final int replyWaitsCompletedId; - private static final int replyWaitTimeId; + @VisibleForTesting + static final int replyWaitTimeId; private static final int replyTimeoutsId; - private static final int replyWaitMaxTimeId; + @VisibleForTesting + static final int replyWaitMaxTimeId; private static final int receiverConnectionsId; private static final int failedAcceptsId; private static final int failedConnectsId; @@ -936,6 +941,9 @@ public class DistributionStats implements DMStats { /** The Statistics object that we delegate most behavior to */ private final Statistics stats; + private final MaxLongGauge maxReplyWaitTime; + private final MaxLongGauge maxSentMessagesTime; + // private final HistogramStats replyHandoffHistogram; // private final HistogramStats replyWaitHistogram; @@ -946,7 +954,7 @@ public class DistributionStats implements DMStats { * factory. */ public DistributionStats(StatisticsFactory f, long statId) { - this.stats = f.createAtomicStatistics(type, "distributionStats", statId); + this(f.createAtomicStatistics(type, "distributionStats", statId)); // this.replyHandoffHistogram = new HistogramStats("ReplyHandOff", "nanoseconds", f, // new long[] {100000, 200000, 300000, 400000, 500000, 600000, 700000, 800000, 900000, 1000000}, // false); @@ -960,6 +968,8 @@ public class DistributionStats implements DMStats { */ public DistributionStats(Statistics stats) { this.stats = stats; + maxReplyWaitTime = new MaxLongGauge(replyWaitMaxTimeId, stats); + maxSentMessagesTime = new MaxLongGauge(sentMessagesMaxTimeId, stats); // this.replyHandoffHistogram = null; // this.replyWaitHistogram = null; } @@ -1040,22 +1050,12 @@ public class DistributionStats implements DMStats { public void incSentMessagesTime(long nanos) { if (enableClockStats) { this.stats.incLong(sentMessagesTimeId, nanos); - long millis = nanos / 1000000; - if (getSentMessagesMaxTime() < millis) { - this.stats.setLong(sentMessagesMaxTimeId, millis); - } + long millis = NanoTimer.nanosToMillis(nanos); + maxSentMessagesTime.recordMax(millis); } } /** - * Returns the longest time required to distribute a message, in nanos - */ - public long getSentMessagesMaxTime() { - return this.stats.getLong(sentMessagesMaxTimeId); - } - - - /** * Returns the total number of messages broadcast by the distribution manager */ @Override @@ -1376,10 +1376,6 @@ public class DistributionStats implements DMStats { return stats.getLong(replyWaitTimeId); } - public long getReplyWaitMaxTime() { - return stats.getLong(replyWaitMaxTimeId); - } - @Override public long startSocketWrite(boolean sync) { if (sync) { @@ -1600,6 +1596,7 @@ public class DistributionStats implements DMStats { return getStatTime(); } + @Override public void endReplyWait(long startNanos, long initTime) { if (enableClockStats) { @@ -1607,10 +1604,8 @@ public class DistributionStats implements DMStats { // this.replyWaitHistogram.endOp(delta); } if (initTime != 0) { - long mswait = System.currentTimeMillis() - initTime; - if (mswait > getReplyWaitMaxTime()) { - stats.setLong(replyWaitMaxTimeId, mswait); - } + long waitTime = System.currentTimeMillis() - initTime; + maxReplyWaitTime.recordMax(waitTime); } stats.incInt(replyWaitsInProgressId, -1); stats.incInt(replyWaitsCompletedId, 1); diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/MaxLongGauge.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/MaxLongGauge.java new file mode 100644 index 0000000..43c281a --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/MaxLongGauge.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.distributed.internal; + +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.geode.Statistics; + +/** + * This class holds the max value of a stat inside an AtomicLong. Every time a higher value is found + * the class will forward the delta between the old max and the new max to the statistics class. + * This pattern is a lock-less alternative to calling Statistics.getLong and then updating the max. + */ +class MaxLongGauge { + private final int statId; + private final Statistics stats; + private final AtomicLong max; + + public MaxLongGauge(int statId, Statistics stats) { + this.statId = statId; + this.stats = stats; + max = new AtomicLong(); + } + + public void recordMax(long currentValue) { + boolean done = false; + while (!done) { + long maxValue = max.get(); + if (currentValue <= maxValue) { + done = true; + } else { + done = max.compareAndSet(maxValue, currentValue); + if (done) { + stats.incLong(statId, currentValue - maxValue); + } + } + } + } +} diff --git a/geode-core/src/test/java/org/apache/geode/distributed/internal/DistributionStatsTest.java b/geode-core/src/test/java/org/apache/geode/distributed/internal/DistributionStatsTest.java new file mode 100644 index 0000000..84232f9 --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/distributed/internal/DistributionStatsTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.distributed.internal; + +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import org.apache.geode.Statistics; + +public class DistributionStatsTest { + + private Statistics mockStats; + private DistributionStats distributionStats; + + @Before + public void setup() { + mockStats = mock(Statistics.class); + distributionStats = new DistributionStats(mockStats); + DistributionStats.enableClockStats = true; + } + + @After + public void cleanup() { + DistributionStats.enableClockStats = false; + } + + @Test + public void endReplyWait() { + distributionStats.endReplyWait(12000000, 12); + + verify(mockStats).incLong(eq(DistributionStats.replyWaitTimeId), Mockito.anyLong()); + verify(mockStats).incLong(eq(DistributionStats.replyWaitMaxTimeId), Mockito.anyLong()); + } + + @Test + public void incSentMessagesTime() { + distributionStats.incSentMessagesTime(50000000L); + + verify(mockStats).incLong(DistributionStats.sentMessagesTimeId, 50000000L); + verify(mockStats).incLong(DistributionStats.sentMessagesMaxTimeId, 50L); + } +} diff --git a/geode-core/src/test/java/org/apache/geode/distributed/internal/MaxLongGaugeConcurrentTest.java b/geode-core/src/test/java/org/apache/geode/distributed/internal/MaxLongGaugeConcurrentTest.java new file mode 100644 index 0000000..8260514 --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/distributed/internal/MaxLongGaugeConcurrentTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.distributed.internal; + +import static org.apache.geode.internal.statistics.StatisticDescriptorImpl.createLongGauge; +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ThreadLocalRandom; + +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.apache.geode.StatisticDescriptor; +import org.apache.geode.internal.statistics.StatisticsTypeImpl; +import org.apache.geode.internal.statistics.StripedStatisticsImpl; +import org.apache.geode.test.concurrency.ConcurrentTestRunner; +import org.apache.geode.test.concurrency.ParallelExecutor; + +@RunWith(ConcurrentTestRunner.class) +public class MaxLongGaugeConcurrentTest { + private static final int PARALLEL_COUNT = 100; + public static final int RECORDS_PER_TASK = 20; + + + @Test + public void recordMax(ParallelExecutor executor) + throws Exception { + StatisticDescriptor descriptor = + createLongGauge("1", "", "", true); + StatisticDescriptor[] descriptors = {descriptor}; + StatisticsTypeImpl statisticsType = new StatisticsTypeImpl("abc", "test", + descriptors); + StripedStatisticsImpl fakeStatistics = new StripedStatisticsImpl( + statisticsType, + "def", 12, 10, + null); + + MaxLongGauge maxLongGauge = new MaxLongGauge(descriptor.getId(), fakeStatistics); + ConcurrentLinkedQueue<Long> longs = new ConcurrentLinkedQueue<>(); + + executor.inParallel(() -> { + for (int i = 0; i < RECORDS_PER_TASK; i++) { + long value = ThreadLocalRandom.current().nextLong(); + maxLongGauge.recordMax(value); + longs.add(value); + } + }, PARALLEL_COUNT); + executor.execute(); + + long actualMax = fakeStatistics.getLong(descriptor.getId()); + long expectedMax = getMax(longs); + + assertThat(longs).hasSize(RECORDS_PER_TASK * PARALLEL_COUNT); + assertThat(actualMax).isEqualTo(expectedMax); + } + + private long getMax(ConcurrentLinkedQueue<Long> longs) { + return Math.max(longs.parallelStream().max(Long::compareTo).get(), 0); + } +} diff --git a/geode-core/src/test/java/org/apache/geode/distributed/internal/MaxLongGaugeTest.java b/geode-core/src/test/java/org/apache/geode/distributed/internal/MaxLongGaugeTest.java new file mode 100644 index 0000000..5a84d1d --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/distributed/internal/MaxLongGaugeTest.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.distributed.internal; + +import static org.apache.geode.internal.statistics.StatisticDescriptorImpl.createLongGauge; +import static org.assertj.core.api.Assertions.assertThat; + +import org.junit.Before; +import org.junit.Test; + +import org.apache.geode.StatisticDescriptor; +import org.apache.geode.internal.statistics.StatisticsTypeImpl; +import org.apache.geode.internal.statistics.StripedStatisticsImpl; + +public class MaxLongGaugeTest { + private StripedStatisticsImpl fakeStatistics; + private int statId1; + private int statId2; + + @Before + public void setup() { + StatisticDescriptor descriptor1 = + createLongGauge("1", "", "", true); + StatisticDescriptor descriptor2 = + createLongGauge("2", "", "", true); + + StatisticDescriptor[] descriptors = {descriptor1, descriptor2}; + StatisticsTypeImpl statisticsType = new StatisticsTypeImpl("abc", "test", + descriptors); + statId1 = descriptor1.getId(); + statId2 = descriptor2.getId(); + fakeStatistics = new StripedStatisticsImpl( + statisticsType, + "def", 12, 10, + null); + } + + @Test + public void recordMax_singleRecord() { + MaxLongGauge maxLongGauge = new MaxLongGauge(statId1, fakeStatistics); + + maxLongGauge.recordMax(12); + + assertThat(fakeStatistics.getLong(statId1)).isEqualTo(12); + } + + @Test + public void recordMax_multipleRecords() { + MaxLongGauge maxLongGauge = new MaxLongGauge(statId1, fakeStatistics); + + maxLongGauge.recordMax(12); + maxLongGauge.recordMax(13); + + assertThat(fakeStatistics.getLong(statId1)).isEqualTo(13); + } + + @Test + public void recordMax_recordNothing_ifMaxIsNotExceeded() { + MaxLongGauge maxLongGauge = new MaxLongGauge(statId1, fakeStatistics); + + maxLongGauge.recordMax(12); + maxLongGauge.recordMax(11); + + assertThat(fakeStatistics.getLong(statId1)).isEqualTo(12); + } + + @Test + public void recordMax_ignoresNegatives() { + MaxLongGauge maxLongGauge = new MaxLongGauge(statId1, fakeStatistics); + + maxLongGauge.recordMax(-12); + + assertThat(fakeStatistics.getLong(statId1)).isEqualTo(0); + } + + @Test + public void recordMax_ignoresZero() { + MaxLongGauge maxLongGauge = new MaxLongGauge(statId1, fakeStatistics); + + maxLongGauge.recordMax(0); + + assertThat(fakeStatistics.getLong(statId1)).isEqualTo(0); + } + + @Test + public void recordMax_usesStatId() { + MaxLongGauge maxLongGauge = new MaxLongGauge(statId2, fakeStatistics); + + maxLongGauge.recordMax(17); + + assertThat(fakeStatistics.getLong(statId2)).isEqualTo(17); + } +}