Repository: cassandra Updated Branches: refs/heads/cassandra-3.0 442f4737c -> f3668e155 refs/heads/cassandra-3.3 f2174280d -> c146983e4 refs/heads/trunk 4fb559b58 -> 3bf4be46f
10688: bound search space and support unlimited depth for tracing during circular strong-ref leak detection Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f3668e15 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f3668e15 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f3668e15 Branch: refs/heads/cassandra-3.0 Commit: f3668e15537426eaca63f0395cec18c3b7cba83a Parents: 442f473 Author: Ariel Weisberg <ariel.weisb...@datastax.com> Authored: Wed Nov 25 20:16:51 2015 -0500 Committer: Benedict Elliott Smith <bened...@apache.org> Committed: Tue Jan 26 13:27:08 2016 +0000 ---------------------------------------------------------------------- .../apache/cassandra/utils/NoSpamLogger.java | 25 +- .../apache/cassandra/utils/concurrent/Ref.java | 212 ++++++++++++++- .../cassandra/utils/NoSpamLoggerTest.java | 8 + .../utils/concurrent/RefCountedTest.java | 272 +++++++++++++++++++ 4 files changed, 497 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3668e15/src/java/org/apache/cassandra/utils/NoSpamLogger.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/NoSpamLogger.java b/src/java/org/apache/cassandra/utils/NoSpamLogger.java index 84bfa68..df3d2e4 100644 --- a/src/java/org/apache/cassandra/utils/NoSpamLogger.java +++ b/src/java/org/apache/cassandra/utils/NoSpamLogger.java @@ -156,13 +156,18 @@ public class NoSpamLogger public static boolean log(Logger logger, Level level, long minInterval, TimeUnit unit, String message, Object... objects) { - return log(logger, level, minInterval, unit, CLOCK.nanoTime(), message, objects); + return log(logger, level, message, minInterval, unit, CLOCK.nanoTime(), message, objects); } - public static boolean log(Logger logger, Level level, long minInterval, TimeUnit unit, long nowNanos, String message, Object... objects) + public static boolean log(Logger logger, Level level, String key, long minInterval, TimeUnit unit, String message, Object... objects) + { + return log(logger, level, key, minInterval, unit, CLOCK.nanoTime(), message, objects); + } + + public static boolean log(Logger logger, Level level, String key, long minInterval, TimeUnit unit, long nowNanos, String message, Object... objects) { NoSpamLogger wrapped = getLogger(logger, minInterval, unit); - NoSpamLogStatement statement = wrapped.getStatement(message); + NoSpamLogStatement statement = wrapped.getStatement(key, message); return statement.log(level, nowNanos, objects); } @@ -221,17 +226,27 @@ public class NoSpamLogger return NoSpamLogger.this.getStatement(s, minIntervalNanos); } + public NoSpamLogStatement getStatement(String key, String s) + { + return NoSpamLogger.this.getStatement(key, s, minIntervalNanos); + } + public NoSpamLogStatement getStatement(String s, long minInterval, TimeUnit unit) { return NoSpamLogger.this.getStatement(s, unit.toNanos(minInterval)); } public NoSpamLogStatement getStatement(String s, long minIntervalNanos) { - NoSpamLogStatement statement = lastMessage.get(s); + return getStatement(s, s, minIntervalNanos); + } + + public NoSpamLogStatement getStatement(String key, String s, long minIntervalNanos) + { + NoSpamLogStatement statement = lastMessage.get(key); if (statement == null) { statement = new NoSpamLogStatement(s, minIntervalNanos); - NoSpamLogStatement temp = lastMessage.putIfAbsent(s, statement); + NoSpamLogStatement temp = lastMessage.putIfAbsent(key, statement); if (temp != null) statement = temp; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3668e15/src/java/org/apache/cassandra/utils/concurrent/Ref.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/concurrent/Ref.java b/src/java/org/apache/cassandra/utils/concurrent/Ref.java index 856c21e..25ebde9 100644 --- a/src/java/org/apache/cassandra/utils/concurrent/Ref.java +++ b/src/java/org/apache/cassandra/utils/concurrent/Ref.java @@ -9,10 +9,13 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -import com.google.common.collect.Iterables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; + import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; @@ -21,6 +24,8 @@ import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.util.Memory; import org.apache.cassandra.io.util.SafeMemory; import org.apache.cassandra.utils.NoSpamLogger; +import org.apache.cassandra.utils.Pair; +import org.cliffc.high_scale_lib.NonBlockingHashMap; import static java.util.Collections.emptyList; @@ -314,10 +319,20 @@ public final class Ref<T> implements RefCounted<T> } } + private static final Class<?>[] concurrentIterableClasses = new Class<?>[] { + ConcurrentLinkedQueue.class, + ConcurrentLinkedDeque.class, + ConcurrentSkipListSet.class, + CopyOnWriteArrayList.class, + CopyOnWriteArraySet.class, + DelayQueue.class, + NonBlockingHashMap.class, + }; + static final Set<Class<?>> concurrentIterables = Collections.newSetFromMap(new IdentityHashMap<>()); private static final Set<GlobalState> globallyExtant = Collections.newSetFromMap(new ConcurrentHashMap<>()); static final ReferenceQueue<Object> referenceQueue = new ReferenceQueue<>(); private static final ExecutorService EXEC = Executors.newFixedThreadPool(1, new NamedThreadFactory("Reference-Reaper")); - private static final ScheduledExecutorService STRONG_LEAK_DETECTOR = !DEBUG_ENABLED ? null : Executors.newScheduledThreadPool(1, new NamedThreadFactory("Strong-Reference-Leak-Detector")); + static final ScheduledExecutorService STRONG_LEAK_DETECTOR = !DEBUG_ENABLED ? null : Executors.newScheduledThreadPool(1, new NamedThreadFactory("Strong-Reference-Leak-Detector")); static { EXEC.execute(new ReferenceReaper()); @@ -326,6 +341,7 @@ public final class Ref<T> implements RefCounted<T> STRONG_LEAK_DETECTOR.scheduleAtFixedRate(new Visitor(), 1, 15, TimeUnit.MINUTES); STRONG_LEAK_DETECTOR.scheduleAtFixedRate(new StrongLeakDetector(), 2, 15, TimeUnit.MINUTES); } + concurrentIterables.addAll(Arrays.asList(concurrentIterableClasses)); } static final class ReferenceReaper implements Runnable @@ -353,10 +369,145 @@ public final class Ref<T> implements RefCounted<T> } } + static final Deque<InProgressVisit> inProgressVisitPool = new ArrayDeque<InProgressVisit>(); + + @SuppressWarnings({ "rawtypes", "unchecked" }) + static InProgressVisit newInProgressVisit(Object o, List<Field> fields, Field field, String name) + { + Preconditions.checkNotNull(o); + InProgressVisit ipv = inProgressVisitPool.pollLast(); + if (ipv == null) + ipv = new InProgressVisit(); + + ipv.o = o; + if (o instanceof Object[]) + ipv.collectionIterator = Arrays.asList((Object[])o).iterator(); + else if (o instanceof ConcurrentMap) + { + ipv.isMapIterator = true; + ipv.collectionIterator = ((Map)o).entrySet().iterator(); + } + else if (concurrentIterables.contains(o.getClass()) | o instanceof BlockingQueue) + ipv.collectionIterator = ((Iterable)o).iterator(); + + ipv.fields = fields; + ipv.field = field; + ipv.name = name; + return ipv; + } + + static void returnInProgressVisit(InProgressVisit ipv) + { + if (inProgressVisitPool.size() > 1024) + return; + ipv.name = null; + ipv.fields = null; + ipv.o = null; + ipv.fieldIndex = 0; + ipv.field = null; + ipv.collectionIterator = null; + ipv.mapEntryValue = null; + ipv.isMapIterator = false; + inProgressVisitPool.offer(ipv); + } + + /* + * Stack state for walking an object graph. + * Field index is the index of the current field being fetched. + */ + @SuppressWarnings({ "rawtypes"}) + static class InProgressVisit + { + String name; + List<Field> fields; + Object o; + int fieldIndex = 0; + Field field; + + //Need to know if Map.Entry should be returned or traversed as an object + boolean isMapIterator; + //If o is a ConcurrentMap, BlockingQueue, or Object[], this is populated with an iterator over the contents + Iterator<Object> collectionIterator; + //If o is a ConcurrentMap the entry set contains keys and values. The key is returned as the first child + //And the associated value is stashed here and returned next + Object mapEntryValue; + + private Field nextField() + { + if (fields.isEmpty()) + return null; + + if (fieldIndex >= fields.size()) + return null; + + Field retval = fields.get(fieldIndex); + fieldIndex++; + return retval; + } + + Pair<Object, Field> nextChild() throws IllegalAccessException + { + //If the last child returned was a key from a map, the value from that entry is stashed + //so it can be returned next + if (mapEntryValue != null) + { + Pair<Object, Field> retval = Pair.create(mapEntryValue, field); + mapEntryValue = null; + return retval; + } + + //If o is a ConcurrentMap, BlockingQueue, or Object[], then an iterator will be stored to return the elements + if (collectionIterator != null) + { + if (!collectionIterator.hasNext()) + return null; + Object nextItem = null; + //Find the next non-null element to traverse since returning null will cause the visitor to stop + while (collectionIterator.hasNext() && (nextItem = collectionIterator.next()) == null){} + if (nextItem != null) + { + if (isMapIterator && nextItem instanceof Map.Entry) + { + Map.Entry entry = (Map.Entry)nextItem; + mapEntryValue = entry.getValue(); + return Pair.create(entry.getKey(), field); + } + return Pair.create(nextItem, field); + } + else + { + return null; + } + } + + //Basic traversal of an object by its member fields + //Don't return null values as that indicates no more objects + while (true) + { + Field nextField = nextField(); + if (nextField == null) + return null; + Object nextObject = nextField.get(o); + if (nextObject != null) + return Pair.create(nextField.get(o), nextField); + } + } + + @Override + public String toString() + { + return field == null ? name : field.toString() + "-" + o.getClass().getName(); + } + } + static class Visitor implements Runnable { - final Stack<Field> path = new Stack<>(); + final Deque<InProgressVisit> path = new ArrayDeque<>(); final Set<Object> visited = Collections.newSetFromMap(new IdentityHashMap<>()); + @VisibleForTesting + int lastVisitedCount; + @VisibleForTesting + long iterations = 0; GlobalState visiting; public void run() @@ -371,9 +522,11 @@ public final class Ref<T> implements RefCounted<T> // do a graph exploration of the GlobalState, since it should be shallow; if it references itself, we have a problem path.clear(); visited.clear(); + lastVisitedCount = 0; + iterations = 0; visited.add(globalState); visiting = globalState; - visit(globalState.tidy); + traverse(globalState.tidy); } } catch (Throwable t) @@ -382,37 +535,66 @@ public final class Ref<T> implements RefCounted<T> } finally { + lastVisitedCount = visited.size(); path.clear(); visited.clear(); } } - void visit(final Object object) + /* + * Searches for an indirect strong reference between rootObject and visiting. + */ + void traverse(final RefCounted.Tidy rootObject) { - for (Field field : getFields(object.getClass())) + path.offer(newInProgressVisit(rootObject, getFields(rootObject.getClass()), null, rootObject.name())); + + InProgressVisit inProgress = null; + while (inProgress != null || !path.isEmpty()) { - path.push(field); + //If necessary fetch the next object to start tracing + if (inProgress == null) + inProgress = path.pollLast(); + try { - Object child = field.get(object); + Pair<Object, Field> p = inProgress.nextChild(); + Object child = null; + Field field = null; + + if (p != null) + { + iterations++; + child = p.left; + field = p.right; + } + if (child != null && visited.add(child)) { - visit(child); + path.offer(inProgress); + inProgress = newInProgressVisit(child, getFields(child.getClass()), field, null); + continue; } else if (visiting == child) { - logger.error("Strong self-ref loop detected {}", path); + NoSpamLogger.log(logger, + NoSpamLogger.Level.ERROR, + rootObject.getClass().getName(), + 1, + TimeUnit.SECONDS, + "Strong self-ref loop detected {}", + path); + } + else if (child == null) + { + returnInProgressVisit(inProgress); + inProgress = null; + continue; } } catch (IllegalAccessException e) { NoSpamLogger.log(logger, NoSpamLogger.Level.ERROR, 5, TimeUnit.MINUTES, "Could not fully check for self-referential leaks", e); } - catch (StackOverflowError e) - { - logger.error("Stackoverflow {}", path); - } - path.pop(); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3668e15/test/unit/org/apache/cassandra/utils/NoSpamLoggerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/utils/NoSpamLoggerTest.java b/test/unit/org/apache/cassandra/utils/NoSpamLoggerTest.java index afe4968..702fa98 100644 --- a/test/unit/org/apache/cassandra/utils/NoSpamLoggerTest.java +++ b/test/unit/org/apache/cassandra/utils/NoSpamLoggerTest.java @@ -126,6 +126,14 @@ public class NoSpamLoggerTest assertTrue(NoSpamLogger.log( mock, l, 5, TimeUnit.NANOSECONDS, statement, param)); assertEquals(2, logged.get(l).size()); + + assertTrue(NoSpamLogger.log( mock, l, "key", 5, TimeUnit.NANOSECONDS, statement, param)); + + assertEquals(3, logged.get(l).size()); + + assertFalse(NoSpamLogger.log( mock, l, "key", 5, TimeUnit.NANOSECONDS, statement, param)); + + assertEquals(3, logged.get(l).size()); } private void assertLoggedSizes(int info, int warn, int error) http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3668e15/test/unit/org/apache/cassandra/utils/concurrent/RefCountedTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/utils/concurrent/RefCountedTest.java b/test/unit/org/apache/cassandra/utils/concurrent/RefCountedTest.java index bb173fe..1a1864f 100644 --- a/test/unit/org/apache/cassandra/utils/concurrent/RefCountedTest.java +++ b/test/unit/org/apache/cassandra/utils/concurrent/RefCountedTest.java @@ -21,10 +21,31 @@ package org.apache.cassandra.utils.concurrent; import org.junit.Test; import junit.framework.Assert; + +import java.io.File; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicReference; + import org.apache.cassandra.utils.ObjectSizes; +import org.apache.cassandra.utils.Pair; +import org.apache.cassandra.utils.concurrent.Ref.Visitor; +@SuppressWarnings({"unused", "unchecked", "rawtypes"}) public class RefCountedTest { + static + { + if (Ref.STRONG_LEAK_DETECTOR != null) + Ref.STRONG_LEAK_DETECTOR.submit(() -> { Thread.sleep(Integer.MAX_VALUE); return null; }); + } private static final class Tidier implements RefCounted.Tidy { @@ -97,4 +118,255 @@ public class RefCountedTest throw new AssertionError(); ref.release(); } + + static final int entryCount = 1000000; + static final int fudgeFactor = 20; + + @Test + public void testLinkedList() + { + final List<Object> iterable = new LinkedList<Object>(); + Pair<Object, Object> p = Pair.create(iterable, iterable); + RefCounted.Tidy tidier = new RefCounted.Tidy() { + Object ref = iterable; + @Override + public void tidy() throws Exception + { + } + + @Override + public String name() + { + return "42"; + } + }; + Ref<Object> ref = new Ref(new AtomicReference<List<Object>>(iterable), tidier); + for (int i = 0; i < entryCount; i++) + { + iterable.add(p); + } + Visitor visitor = new Visitor(); + visitor.run(); + ref.close(); + + System.out.println("LinkedList visited " + visitor.lastVisitedCount + " iterations " + visitor.iterations); + //Should visit a lot of list nodes, but no more since there is only one object stored in the list + Assert.assertTrue(visitor.lastVisitedCount > entryCount && visitor.lastVisitedCount < entryCount + fudgeFactor); + //Should have a lot of iterations to walk the list, but linear to the number of entries + Assert.assertTrue(visitor.iterations > (entryCount * 3) && visitor.iterations < (entryCount * 3) + fudgeFactor); + } + + /* + * There was a traversal error terminating traversal for an object upon encountering a null + * field. Test for the bug here using CLQ. + */ + @Test + public void testCLQBug() + { + Ref.concurrentIterables.remove(ConcurrentLinkedQueue.class); + try + { + testConcurrentLinkedQueueImpl(true); + } + finally + { + Ref.concurrentIterables.add(ConcurrentLinkedQueue.class); + } + } + + private void testConcurrentLinkedQueueImpl(boolean bugTest) + { + final Queue<Object> iterable = new ConcurrentLinkedQueue<Object>(); + Pair<Object, Object> p = Pair.create(iterable, iterable); + RefCounted.Tidy tidier = new RefCounted.Tidy() { + Object ref = iterable; + @Override + public void tidy() throws Exception + { + } + + @Override + public String name() + { + return "42"; + } + }; + Ref<Object> ref = new Ref(new AtomicReference<Queue<Object>>(iterable), tidier); + for (int i = 0; i < entryCount; i++) + { + iterable.add(p); + } + Visitor visitor = new Visitor(); + visitor.run(); + ref.close(); + + System.out.println("ConcurrentLinkedQueue visited " + visitor.lastVisitedCount + " iterations " + visitor.iterations + " bug test " + bugTest); + + if (bugTest) + { + //Should have to visit a lot of queue nodes + Assert.assertTrue(visitor.lastVisitedCount > entryCount && visitor.lastVisitedCount < entryCount + fudgeFactor); + //Should have a lot of iterations to walk the queue, but linear to the number of entries + Assert.assertTrue(visitor.iterations > (entryCount * 2) && visitor.iterations < (entryCount * 2) + fudgeFactor); + } + else + { + //There are almost no objects in this linked list once it's iterated as a collection so visited count + //should be small + Assert.assertTrue(visitor.lastVisitedCount < 10); + //Should have a lot of iterations to walk the collection, but linear to the number of entries + Assert.assertTrue(visitor.iterations > entryCount && visitor.iterations < entryCount + fudgeFactor); + } + } + + @Test + public void testConcurrentLinkedQueue() + { + testConcurrentLinkedQueueImpl(false); + } + + @Test + public void testBlockingQueue() + { + final BlockingQueue<Object> iterable = new LinkedBlockingQueue<Object>(); + Pair<Object, Object> p = Pair.create(iterable, iterable); + RefCounted.Tidy tidier = new RefCounted.Tidy() { + Object ref = iterable; + @Override + public void tidy() throws Exception + { + } + + @Override + public String name() + { + return "42"; + } + }; + Ref<Object> ref = new Ref(new AtomicReference<BlockingQueue<Object>>(iterable), tidier); + for (int i = 0; i < entryCount; i++) + { + iterable.add(p); + } + Visitor visitor = new Visitor(); + visitor.run(); + ref.close(); + + System.out.println("BlockingQueue visited " + visitor.lastVisitedCount + " iterations " + visitor.iterations); + //There are almost no objects in this queue once it's iterated as a collection so visited count + //should be small + Assert.assertTrue(visitor.lastVisitedCount < 10); + //Should have a lot of iterations to walk the collection, but linear to the number of entries + Assert.assertTrue(visitor.iterations > entryCount && visitor.iterations < entryCount + fudgeFactor); + } + + @Test + public void testConcurrentMap() + { + final Map<Object, Object> map = new ConcurrentHashMap<Object, Object>(); + RefCounted.Tidy tidier = new RefCounted.Tidy() { + Object ref = map; + @Override + public void tidy() throws Exception + { + } + + @Override + public String name() + { + return "42"; + } + }; + Ref<Object> ref = new Ref(new AtomicReference<Map<Object, Object>>(map), tidier); + + Object o = new Object(); + for (int i = 0; i < entryCount; i++) + { + map.put(new Object(), o); + } + Visitor visitor = new Visitor(); + visitor.run(); + ref.close(); + + System.out.println("ConcurrentHashMap visited " + visitor.lastVisitedCount + " iterations " + visitor.iterations); + + //Should visit roughly the same number of objects as entries because the value object is constant + //Map.Entry objects shouldn't be counted since it is iterated as a collection + Assert.assertTrue(visitor.lastVisitedCount > entryCount && visitor.lastVisitedCount < entryCount + fudgeFactor); + //Should visit 2x the number of entries since we have to traverse the key and value separately + Assert.assertTrue(visitor.iterations > entryCount * 2 && visitor.iterations < entryCount * 2 + fudgeFactor); + } + + @Test + public void testHashMap() + { + final Map<Object, Object> map = new HashMap<Object, Object>(); + RefCounted.Tidy tidier = new RefCounted.Tidy() { + Object ref = map; + @Override + public void tidy() throws Exception + { + } + + @Override + public String name() + { + return "42"; + } + }; + Ref<Object> ref = new Ref(new AtomicReference<Map<Object, Object>>(map), tidier); + + Object o = new Object(); + for (int i = 0; i < entryCount; i++) + { + map.put(new Object(), o); + } + Visitor visitor = new Visitor(); + visitor.run(); + ref.close(); + + System.out.println("HashMap visited " + visitor.lastVisitedCount + " iterations " + visitor.iterations); + + //Should visit 2x the number of entries because of the wrapper Map.Entry objects + Assert.assertTrue(visitor.lastVisitedCount > (entryCount * 2) && visitor.lastVisitedCount < (entryCount * 2) + fudgeFactor); + //Should iterate 3x the number of entries since we have to traverse the key and value separately + Assert.assertTrue(visitor.iterations > (entryCount * 3) && visitor.iterations < (entryCount * 3) + fudgeFactor); + } + + @Test + public void testArray() throws Exception + { + final Object objects[] = new Object[entryCount]; + for (int i = 0; i < entryCount; i += 2) + objects[i] = new Object(); + + File f = File.createTempFile("foo", "bar"); + RefCounted.Tidy tidier = new RefCounted.Tidy() { + Object ref = objects; + //Checking we don't get an infinite loop out of traversing file refs + File fileRef = f; + + @Override + public void tidy() throws Exception + { + } + + @Override + public String name() + { + return "42"; + } + }; + Ref<Object> ref = new Ref(new AtomicReference<Object[]>(objects), tidier); + + Visitor visitor = new Visitor(); + visitor.run(); + ref.close(); + + System.out.println("Array visited " + visitor.lastVisitedCount + " iterations " + visitor.iterations); + //Should iterate the elements in the array and get a unique object from every other one + Assert.assertTrue(visitor.lastVisitedCount > (entryCount / 2) && visitor.lastVisitedCount < (entryCount / 2) + fudgeFactor); + //Should iterate over the array touching roughly the same number of objects as entries + Assert.assertTrue(visitor.iterations > (entryCount / 2) && visitor.iterations < (entryCount / 2) + fudgeFactor); + } }