Repository: cassandra Updated Branches: refs/heads/cassandra-2.2 6068efbaf -> 8b021db7c
ArrivalWindow should use primitives patch by sankalp kohli; reviewed by jasobrown for CASSANDRA-9496 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ad8047ab Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ad8047ab Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ad8047ab Branch: refs/heads/cassandra-2.2 Commit: ad8047abdf5db6652b9586e039debb1e855db09a Parents: ec52e77 Author: Jason Brown <jasedbr...@gmail.com> Authored: Wed Jun 17 14:33:44 2015 -0700 Committer: Jason Brown <jasedbr...@gmail.com> Committed: Wed Jun 17 14:33:44 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/gms/FailureDetector.java | 59 ++++++++++++++-- .../cassandra/utils/BoundedStatsDeque.java | 72 -------------------- .../gms/ArrayBackedBoundedStatsTest.java | 57 ++++++++++++++++ .../cassandra/utils/BoundedStatsDequeTest.java | 66 ------------------ 5 files changed, 111 insertions(+), 144 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/ad8047ab/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 6d031f6..753fb1c 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.0.16: + * ArrivalWindow should use primitives (CASSANDRA-9496) * Periodically submit background compaction tasks (CASSANDRA-9592) * Set HAS_MORE_PAGES flag to false when PagingState is null (CASSANDRA-9571) * Backport indexed value validation fix from CASSANDRA-9057 (CASSANDRA-9564) http://git-wip-us.apache.org/repos/asf/cassandra/blob/ad8047ab/src/java/org/apache/cassandra/gms/FailureDetector.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/FailureDetector.java b/src/java/org/apache/cassandra/gms/FailureDetector.java index e247e48..8fdd99f 100644 --- a/src/java/org/apache/cassandra/gms/FailureDetector.java +++ b/src/java/org/apache/cassandra/gms/FailureDetector.java @@ -27,14 +27,12 @@ import java.util.concurrent.TimeUnit; import javax.management.MBeanServer; import javax.management.ObjectName; -import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.io.FSWriteError; import org.apache.cassandra.io.util.FileUtils; -import org.apache.cassandra.utils.BoundedStatsDeque; import org.apache.cassandra.utils.FBUtilities; /** @@ -289,11 +287,60 @@ public class FailureDetector implements IFailureDetector, FailureDetectorMBean } } +/* + This class is not thread safe. + */ +class ArrayBackedBoundedStats +{ + private final long[] arrivalIntervals; + private long sum = 0; + private int index = 0; + private boolean isFilled = false; + private volatile double mean = 0; + + public ArrayBackedBoundedStats(final int size) + { + arrivalIntervals = new long[size]; + } + + public void add(long interval) + { + if(index == arrivalIntervals.length) + { + isFilled = true; + index = 0; + } + + if(isFilled) + sum = sum - arrivalIntervals[index]; + + arrivalIntervals[index++] = interval; + sum += interval; + mean = (double)sum / size(); + } + + private int size() + { + return isFilled ? arrivalIntervals.length : index; + } + + public double mean() + { + return mean; + } + + public long[] getArrivalIntervals() + { + return arrivalIntervals; + } + +} + class ArrivalWindow { private static final Logger logger = LoggerFactory.getLogger(ArrivalWindow.class); private long tLast = 0L; - private final BoundedStatsDeque arrivalIntervals; + private final ArrayBackedBoundedStats arrivalIntervals; // this is useless except to provide backwards compatibility in phi_convict_threshold, // because everyone seems pretty accustomed to the default of 8, and users who have @@ -309,7 +356,7 @@ class ArrivalWindow ArrivalWindow(int size) { - arrivalIntervals = new BoundedStatsDeque(size); + arrivalIntervals = new ArrayBackedBoundedStats(size); } private static long getMaxInterval() @@ -355,14 +402,14 @@ class ArrivalWindow // see CASSANDRA-2597 for an explanation of the math at work here. double phi(long tnow) { - assert arrivalIntervals.size() > 0 && tLast > 0; // should not be called before any samples arrive + assert arrivalIntervals.mean() > 0 && tLast > 0; // should not be called before any samples arrive long t = tnow - tLast; return t / mean(); } public String toString() { - return StringUtils.join(arrivalIntervals.iterator(), " "); + return Arrays.toString(arrivalIntervals.getArrivalIntervals()); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ad8047ab/src/java/org/apache/cassandra/utils/BoundedStatsDeque.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/BoundedStatsDeque.java b/src/java/org/apache/cassandra/utils/BoundedStatsDeque.java deleted file mode 100644 index 3983b74..0000000 --- a/src/java/org/apache/cassandra/utils/BoundedStatsDeque.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.utils; - -import java.util.Iterator; -import java.util.NoSuchElementException; -import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; - -import com.google.common.util.concurrent.AtomicDouble; - -/** - * bounded threadsafe deque - */ -public class BoundedStatsDeque implements Iterable<Long> -{ - private final LinkedBlockingDeque<Long> deque; - private final AtomicLong sum; - - public BoundedStatsDeque(int size) - { - deque = new LinkedBlockingDeque<>(size); - sum = new AtomicLong(0); - } - - public Iterator<Long> iterator() - { - return deque.iterator(); - } - - public int size() - { - return deque.size(); - } - - public void add(long i) - { - if (!deque.offer(i)) - { - Long removed = deque.remove(); - sum.addAndGet(-removed); - deque.offer(i); - } - sum.addAndGet(i); - } - - public long sum() - { - return sum.get(); - } - - public double mean() - { - return size() > 0 ? ((double) sum()) / size() : 0; - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/ad8047ab/test/unit/org/apache/cassandra/gms/ArrayBackedBoundedStatsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/gms/ArrayBackedBoundedStatsTest.java b/test/unit/org/apache/cassandra/gms/ArrayBackedBoundedStatsTest.java new file mode 100644 index 0000000..b6f4e07 --- /dev/null +++ b/test/unit/org/apache/cassandra/gms/ArrayBackedBoundedStatsTest.java @@ -0,0 +1,57 @@ +package org.apache.cassandra.gms; + +import java.util.Arrays; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class ArrayBackedBoundedStatsTest { + + @Test + public void test() + { + int size = 4; + + ArrayBackedBoundedStats bsd = new ArrayBackedBoundedStats(size); + //check the values for an empty result + assertEquals(0, bsd.mean(), 0.001d); + + bsd.add(1L); //this one falls out, over limit + bsd.add(2L); + bsd.add(3L); + bsd.add(4L); + bsd.add(5L); + + //verify that everything is in there + long [] expected = new long[] {2,3,4,5}; + assertArrivalIntervals(bsd, expected); + + //check results + assertEquals(3.5, bsd.mean(), 0.001d); + } + + private void assertArrivalIntervals(ArrayBackedBoundedStats bsd, long [] expected) + { + Arrays.sort(expected); + Arrays.sort(bsd.getArrivalIntervals()); + assertTrue(Arrays.equals(bsd.getArrivalIntervals(), expected)); + + } + + @Test + public void testMultipleRounds() throws Exception + { + int size = 5; + ArrayBackedBoundedStats bsd = new ArrayBackedBoundedStats(size); + + for(long i=0; i <= 1000;i++) + { + bsd.add(i); + } + + long [] expected = new long[] {1000,999,998,997, 996}; + assertArrivalIntervals(bsd, expected); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/ad8047ab/test/unit/org/apache/cassandra/utils/BoundedStatsDequeTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/utils/BoundedStatsDequeTest.java b/test/unit/org/apache/cassandra/utils/BoundedStatsDequeTest.java deleted file mode 100644 index b64a765..0000000 --- a/test/unit/org/apache/cassandra/utils/BoundedStatsDequeTest.java +++ /dev/null @@ -1,66 +0,0 @@ -package org.apache.cassandra.utils; -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - - -import static org.junit.Assert.*; - -import java.util.Iterator; - -import org.junit.Test; - -public class BoundedStatsDequeTest -{ - @Test - public void test() - { - int size = 4; - - BoundedStatsDeque bsd = new BoundedStatsDeque(size); - //check the values for an empty result - assertEquals(0, bsd.size()); - assertEquals(0, bsd.sum(), 0.001d); - assertEquals(0, bsd.mean(), 0.001d); - - bsd.add(1L); //this one falls out, over limit - bsd.add(2L); - bsd.add(3L); - bsd.add(4L); - bsd.add(5L); - - //verify that everything is in there - Iterator<Long> iter = bsd.iterator(); - assertTrue(iter.hasNext()); - assertEquals(2L, iter.next(), 0); - assertTrue(iter.hasNext()); - assertEquals(3L, iter.next(), 0); - assertTrue(iter.hasNext()); - assertEquals(4L, iter.next(), 0); - assertTrue(iter.hasNext()); - assertEquals(5L, iter.next(), 0); - assertFalse(iter.hasNext()); - - //check results - assertEquals(size, bsd.size()); - assertEquals(14, bsd.sum(), 0.001d); - assertEquals(3.5, bsd.mean(), 0.001d); - } -}