This is an automated email from the ASF dual-hosted git repository. bbejeck pushed a commit to branch 2.3 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.3 by this push: new f81189c KAFKA-8736: Track size in InMemoryKeyValueStore (#7177) f81189c is described below commit f81189cd3915f0876f8e2ed440269c9c0ecea1e9 Author: A. Sophie Blee-Goldman <sop...@confluent.io> AuthorDate: Tue Aug 13 14:54:58 2019 -0700 KAFKA-8736: Track size in InMemoryKeyValueStore (#7177) InMemoryKeyValueStore uses ConcurrentSkipListMap#size which takes linear time as it iterates over the entire map. We should just track size ourselves for approximateNumEntries Reviewers: Guozhang Wang <wangg...@gmail.com>, Matthias J. Sax <mj...@apache.org> --- .../state/internals/InMemoryKeyValueStore.java | 23 ++++++++++------------ 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java index aa7b2cc..2d68214 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java @@ -35,6 +35,7 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> { private final String name; private final ConcurrentNavigableMap<Bytes, byte[]> map = new ConcurrentSkipListMap<>(); private volatile boolean open = false; + private long size = 0L; // SkipListMap#size is O(N) so we just do our best to track it private static final Logger LOG = LoggerFactory.getLogger(InMemoryKeyValueStore.class); @@ -50,17 +51,10 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> { @Override public void init(final ProcessorContext context, final StateStore root) { - + size = 0; if (root != null) { // register the store - context.register(root, (key, value) -> { - // this is a delete - if (value == null) { - delete(Bytes.wrap(key)); - } else { - put(Bytes.wrap(key), value); - } - }); + context.register(root, (key, value) -> put(Bytes.wrap(key), value)); } open = true; @@ -84,9 +78,9 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> { @Override public void put(final Bytes key, final byte[] value) { if (value == null) { - map.remove(key); + size -= map.remove(key) == null ? 0 : 1; } else { - map.put(key, value); + size += map.put(key, value) == null ? 1 : 0; } } @@ -108,7 +102,9 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> { @Override public byte[] delete(final Bytes key) { - return map.remove(key); + final byte[] oldValue = map.remove(key); + size -= oldValue == null ? 0 : 1; + return oldValue; } @Override @@ -135,7 +131,7 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> { @Override public long approximateNumEntries() { - return map.size(); + return size; } @Override @@ -146,6 +142,7 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> { @Override public void close() { map.clear(); + size = 0; open = false; }