This is an automated email from the ASF dual-hosted git repository. bbejeck pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 538bd7e KAFKA-8094: Iterating over cache with get(key) is inefficient (#6433) 538bd7e is described below commit 538bd7eddf13897245524f015e3207affb03fcdc Author: A. Sophie Blee-Goldman <ableegold...@gmail.com> AuthorDate: Tue Mar 19 08:51:10 2019 -0700 KAFKA-8094: Iterating over cache with get(key) is inefficient (#6433) Use concurrent data structure for the underlying cache in NamedCache, and iterate over it with subMap instead of many calls to get() Reviewers: Guozhang Wang <wangg...@gmail.com>, Bill Bejeck <bbej...@gmail.com> --- .../kafka/streams/state/internals/NamedCache.java | 18 ++++++--------- .../kafka/streams/state/internals/ThreadCache.java | 24 +++++++++---------- .../streams/state/internals/NamedCacheTest.java | 27 ---------------------- .../streams/state/internals/ThreadCacheTest.java | 3 ++- 4 files changed, 21 insertions(+), 51 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java index 3ce7cbe..0201f20 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.streams.state.internals; +import java.util.NavigableMap; +import java.util.concurrent.ConcurrentSkipListMap; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.Avg; @@ -33,13 +35,11 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.TreeMap; -import java.util.TreeSet; class NamedCache { private static final Logger log = LoggerFactory.getLogger(NamedCache.class); private final String name; - private final TreeMap<Bytes, LRUNode> cache = new TreeMap<>(); + private final NavigableMap<Bytes, LRUNode> cache = new ConcurrentSkipListMap<>(); private final Set<Bytes> dirtyKeys = new LinkedHashSet<>(); private ThreadCache.DirtyEntryFlushListener listener; private LRUNode tail; @@ -266,16 +266,12 @@ class NamedCache { return cache.size(); } - synchronized Iterator<Bytes> keyRange(final Bytes from, final Bytes to) { - return keySetIterator(cache.navigableKeySet().subSet(from, true, to, true)); + synchronized Iterator<Map.Entry<Bytes, LRUNode>> subMapIterator(final Bytes from, final Bytes to) { + return cache.subMap(from, true, to, true).entrySet().iterator(); } - private Iterator<Bytes> keySetIterator(final Set<Bytes> keySet) { - return new TreeSet<>(keySet).iterator(); - } - - synchronized Iterator<Bytes> allKeys() { - return keySetIterator(cache.navigableKeySet()); + synchronized Iterator<Map.Entry<Bytes, LRUNode>> allIterator() { + return cache.entrySet().iterator(); } synchronized LRUCacheEntry first() { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java index 941b522..0db6c78 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.internals.NamedCache.LRUNode; import org.slf4j.Logger; import java.util.Collections; @@ -180,17 +181,17 @@ public class ThreadCache { public MemoryLRUCacheBytesIterator range(final String namespace, final Bytes from, final Bytes to) { final NamedCache cache = getCache(namespace); if (cache == null) { - return new MemoryLRUCacheBytesIterator(Collections.<Bytes>emptyIterator(), new NamedCache(namespace, this.metrics)); + return new MemoryLRUCacheBytesIterator(Collections.emptyIterator()); } - return new MemoryLRUCacheBytesIterator(cache.keyRange(from, to), cache); + return new MemoryLRUCacheBytesIterator(cache.subMapIterator(from, to)); } public MemoryLRUCacheBytesIterator all(final String namespace) { final NamedCache cache = getCache(namespace); if (cache == null) { - return new MemoryLRUCacheBytesIterator(Collections.<Bytes>emptyIterator(), new NamedCache(namespace, this.metrics)); + return new MemoryLRUCacheBytesIterator(Collections.emptyIterator()); } - return new MemoryLRUCacheBytesIterator(cache.allKeys(), cache); + return new MemoryLRUCacheBytesIterator(cache.allIterator()); } public long size() { @@ -260,13 +261,11 @@ public class ThreadCache { } static class MemoryLRUCacheBytesIterator implements PeekingKeyValueIterator<Bytes, LRUCacheEntry> { - private final Iterator<Bytes> keys; - private final NamedCache cache; + private final Iterator<Map.Entry<Bytes, LRUNode>> underlying; private KeyValue<Bytes, LRUCacheEntry> nextEntry; - MemoryLRUCacheBytesIterator(final Iterator<Bytes> keys, final NamedCache cache) { - this.keys = keys; - this.cache = cache; + MemoryLRUCacheBytesIterator(final Iterator<Map.Entry<Bytes, LRUNode>> underlying) { + this.underlying = underlying; } public Bytes peekNextKey() { @@ -290,7 +289,7 @@ public class ThreadCache { return true; } - while (keys.hasNext() && nextEntry == null) { + while (underlying.hasNext() && nextEntry == null) { internalNext(); } @@ -308,8 +307,9 @@ public class ThreadCache { } private void internalNext() { - final Bytes cacheKey = keys.next(); - final LRUCacheEntry entry = cache.get(cacheKey); + final Map.Entry<Bytes, LRUNode> mapEntry = underlying.next(); + final Bytes cacheKey = mapEntry.getKey(); + final LRUCacheEntry entry = mapEntry.getValue().entry(); if (entry == null) { return; } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java index 394feed..6c82209 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java @@ -32,7 +32,6 @@ import org.junit.Test; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -40,7 +39,6 @@ import java.util.Map; import static org.apache.kafka.test.StreamsTestUtils.getMetricByNameFilterByTags; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; @@ -209,31 +207,6 @@ public class NamedCacheTest { } @Test - public void shouldGetRangeIteratorOverKeys() { - cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new byte[]{10}, headers, true, 0, 0, 0, "")); - cache.put(Bytes.wrap(new byte[]{1}), new LRUCacheEntry(new byte[]{20})); - cache.put(Bytes.wrap(new byte[]{2}), new LRUCacheEntry(new byte[]{30}, null, true, 0, 0, 0, "")); - - final Iterator<Bytes> iterator = cache.keyRange(Bytes.wrap(new byte[]{1}), Bytes.wrap(new byte[]{2})); - assertEquals(Bytes.wrap(new byte[]{1}), iterator.next()); - assertEquals(Bytes.wrap(new byte[]{2}), iterator.next()); - assertFalse(iterator.hasNext()); - } - - @Test - public void shouldGetIteratorOverAllKeys() { - cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new byte[]{10}, headers, true, 0, 0, 0, "")); - cache.put(Bytes.wrap(new byte[]{1}), new LRUCacheEntry(new byte[]{20})); - cache.put(Bytes.wrap(new byte[]{2}), new LRUCacheEntry(new byte[]{30}, null, true, 0, 0, 0, "")); - - final Iterator<Bytes> iterator = cache.allKeys(); - assertEquals(Bytes.wrap(new byte[]{0}), iterator.next()); - assertEquals(Bytes.wrap(new byte[]{1}), iterator.next()); - assertEquals(Bytes.wrap(new byte[]{2}), iterator.next()); - assertFalse(iterator.hasNext()); - } - - @Test public void shouldNotThrowNullPointerWhenCacheIsEmptyAndEvictionCalled() { cache.evict(); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java index a7a64c4..5882ee4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java @@ -310,10 +310,11 @@ public class ThreadCacheTest { } assertEquals(5, cache.size()); - final ThreadCache.MemoryLRUCacheBytesIterator range = cache.range(namespace, Bytes.wrap(new byte[]{0}), Bytes.wrap(new byte[]{5})); // should evict byte[] {0} cache.put(namespace, Bytes.wrap(new byte[]{6}), dirtyEntry(new byte[]{6})); + final ThreadCache.MemoryLRUCacheBytesIterator range = cache.range(namespace, Bytes.wrap(new byte[]{0}), Bytes.wrap(new byte[]{5})); + assertEquals(Bytes.wrap(new byte[]{1}), range.peekNextKey()); }