This is an automated email from the ASF dual-hosted git repository. bbejeck pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 4788863 KAFKA-8007: Avoid copying on fetch in InMemoryWindowStore (#6335) 4788863 is described below commit 47888630a05260f548988d943a633120845f767d Author: A. Sophie Blee-Goldman <ableegold...@gmail.com> AuthorDate: Wed Mar 6 11:02:27 2019 -0800 KAFKA-8007: Avoid copying on fetch in InMemoryWindowStore (#6335) Rewrote the InMemoryWindowStore implementation by moving the work of a fetch to the iterator, and cleaned up the iterators as well. Reviewers: Guozhang Wang <wangg...@gmail.com>, Bill Bejeck <bbej...@gmail.com> --- .../state/internals/InMemoryWindowStore.java | 296 ++++++++++++--------- .../state/internals/InMemoryWindowStoreTest.java | 108 +++++++- 2 files changed, 284 insertions(+), 120 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java index 67eec0d..7d1b279 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java @@ -17,6 +17,11 @@ package org.apache.kafka.streams.state.internals; import java.nio.ByteBuffer; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentNavigableMap; +import java.util.concurrent.ConcurrentSkipListMap; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.utils.Bytes; @@ -33,20 +38,15 @@ import org.apache.kafka.streams.state.KeyValueIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.LinkedList; -import java.util.List; -import java.util.ListIterator; import java.util.Map; -import java.util.Map.Entry; -import java.util.NavigableMap; import java.util.NoSuchElementException; -import java.util.TreeMap; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.EXPIRED_WINDOW_RECORD_DROP; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCount; import static org.apache.kafka.streams.state.internals.WindowKeySchema.extractStoreKeyBytes; import static org.apache.kafka.streams.state.internals.WindowKeySchema.extractStoreTimestamp; + public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> { private static final Logger LOG = LoggerFactory.getLogger(InMemoryWindowStore.class); @@ -63,22 +63,24 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> { private final long windowSize; private final boolean retainDuplicates; - private final NavigableMap<Long, NavigableMap<Bytes, byte[]>> segmentMap; + private final ConcurrentNavigableMap<Long, ConcurrentNavigableMap<Bytes, byte[]>> segmentMap; + private final Set<InMemoryWindowStoreIteratorWrapper> openIterators; private volatile boolean open = false; InMemoryWindowStore(final String name, - final long retentionPeriod, - final long windowSize, - final boolean retainDuplicates, - final String metricScope) { + final long retentionPeriod, + final long windowSize, + final boolean retainDuplicates, + final String metricScope) { this.name = name; this.retentionPeriod = retentionPeriod; this.windowSize = windowSize; this.retainDuplicates = retainDuplicates; this.metricScope = metricScope; - this.segmentMap = new TreeMap<>(); + this.openIterators = ConcurrentHashMap.newKeySet(); + this.segmentMap = new ConcurrentSkipListMap<>(); } @Override @@ -132,7 +134,7 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> { LOG.debug("Skipping record for expired segment."); } else { if (value != null) { - this.segmentMap.computeIfAbsent(windowStartTimestamp, t -> new TreeMap<>()); + this.segmentMap.computeIfAbsent(windowStartTimestamp, t -> new ConcurrentSkipListMap<>()); this.segmentMap.get(windowStartTimestamp).put(keyBytes, value); } else { this.segmentMap.computeIfPresent(windowStartTimestamp, (t, kvMap) -> { @@ -147,7 +149,11 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> { public byte[] fetch(final Bytes key, final long windowStartTimestamp) { removeExpiredSegments(); - final NavigableMap<Bytes, byte[]> kvMap = this.segmentMap.get(windowStartTimestamp); + if (windowStartTimestamp <= this.observedStreamTime - this.retentionPeriod) { + return null; + } + + final ConcurrentNavigableMap<Bytes, byte[]> kvMap = this.segmentMap.get(windowStartTimestamp); if (kvMap == null) { return null; } else { @@ -159,9 +165,16 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> { @Override public WindowStoreIterator<byte[]> fetch(final Bytes key, final long timeFrom, final long timeTo) { removeExpiredSegments(); - final List<KeyValue<Long, byte[]>> records = retainDuplicates ? fetchWithDuplicates(key, timeFrom, timeTo) : fetchUnique(key, timeFrom, timeTo); - return new InMemoryWindowStoreIterator(records.listIterator()); + // add one b/c records expire exactly retentionPeriod ms after created + final long minTime = Math.max(timeFrom, this.observedStreamTime - this.retentionPeriod + 1); + + if (timeTo < minTime) { + return new WrappedInMemoryWindowStoreIterator(); + } + + return new WrappedInMemoryWindowStoreIterator( + key, key, this.segmentMap.subMap(minTime, true, timeTo, true).entrySet().iterator()); } @Deprecated @@ -171,52 +184,42 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> { final long timeFrom, final long timeTo) { removeExpiredSegments(); - final List<KeyValue<Windowed<Bytes>, byte[]>> returnSet = new LinkedList<>(); // add one b/c records expire exactly retentionPeriod ms after created final long minTime = Math.max(timeFrom, this.observedStreamTime - this.retentionPeriod + 1); - final Bytes keyFrom = retainDuplicates ? wrapForDups(from, 0) : from; - final Bytes keyTo = retainDuplicates ? wrapForDups(to, Integer.MAX_VALUE) : to; - for (final Map.Entry<Long, NavigableMap<Bytes, byte[]>> segmentMapEntry : this.segmentMap.subMap(minTime, true, timeTo, true).entrySet()) { - for (final Map.Entry<Bytes, byte[]> kvMapEntry : segmentMapEntry.getValue().subMap(keyFrom, true, keyTo, true).entrySet()) { - final Bytes keyBytes = retainDuplicates ? getKey(kvMapEntry.getKey()) : kvMapEntry.getKey(); - returnSet.add(getWindowedKeyValue(keyBytes, segmentMapEntry.getKey(), kvMapEntry.getValue())); - } + if (timeTo < minTime) { + return new WrappedWindowedKeyValueIterator(); } - return new InMemoryWindowedKeyValueIterator(returnSet.listIterator()); + + return new WrappedWindowedKeyValueIterator( + from, to, this.segmentMap.subMap(minTime, true, timeTo, true).entrySet().iterator()); } @Deprecated @Override public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final long timeFrom, final long timeTo) { removeExpiredSegments(); - final List<KeyValue<Windowed<Bytes>, byte[]>> returnSet = new LinkedList<>(); // add one b/c records expire exactly retentionPeriod ms after created final long minTime = Math.max(timeFrom, this.observedStreamTime - this.retentionPeriod + 1); - for (final Map.Entry<Long, NavigableMap<Bytes, byte[]>> segmentMapEntry : this.segmentMap.subMap(minTime, true, timeTo, true).entrySet()) { - for (final Map.Entry<Bytes, byte[]> kvMapEntry : segmentMapEntry.getValue().entrySet()) { - final Bytes keyBytes = retainDuplicates ? getKey(kvMapEntry.getKey()) : kvMapEntry.getKey(); - returnSet.add(getWindowedKeyValue(keyBytes, segmentMapEntry.getKey(), kvMapEntry.getValue())); - } + if (timeTo < minTime) { + return new WrappedWindowedKeyValueIterator(); } - return new InMemoryWindowedKeyValueIterator(returnSet.listIterator()); + + return new WrappedWindowedKeyValueIterator( + null, null, this.segmentMap.subMap(minTime, true, timeTo, true).entrySet().iterator()); } @Override public KeyValueIterator<Windowed<Bytes>, byte[]> all() { removeExpiredSegments(); - final List<KeyValue<Windowed<Bytes>, byte[]>> returnSet = new LinkedList<>(); - for (final Entry<Long, NavigableMap<Bytes, byte[]>> segmentMapEntry : this.segmentMap.entrySet()) { - for (final Entry<Bytes, byte[]> kvMapEntry : segmentMapEntry.getValue().entrySet()) { - final Bytes keyBytes = retainDuplicates ? getKey(kvMapEntry.getKey()) : kvMapEntry.getKey(); - returnSet.add(getWindowedKeyValue(keyBytes, segmentMapEntry.getKey(), kvMapEntry.getValue())); - } - } - return new InMemoryWindowedKeyValueIterator(returnSet.listIterator()); + final long minTime = this.observedStreamTime - this.retentionPeriod; + + return new WrappedWindowedKeyValueIterator( + null, null, this.segmentMap.tailMap(minTime, false).entrySet().iterator()); } @Override @@ -240,47 +243,12 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> { this.open = false; } - private List<KeyValue<Long, byte[]>> fetchUnique(final Bytes key, final long timeFrom, final long timeTo) { - final List<KeyValue<Long, byte[]>> returnSet = new LinkedList<>(); - - // add one b/c records expire exactly retentionPeriod ms after created - final long minTime = Math.max(timeFrom, this.observedStreamTime - this.retentionPeriod + 1); - - for (final Map.Entry<Long, NavigableMap<Bytes, byte[]>> segmentMapEntry : this.segmentMap.subMap(minTime, true, timeTo, true).entrySet()) { - final byte[] value = segmentMapEntry.getValue().get(key); - if (value != null) { - returnSet.add(new KeyValue<>(segmentMapEntry.getKey(), value)); - } - } - return returnSet; - } - - private List<KeyValue<Long, byte[]>> fetchWithDuplicates(final Bytes key, final long timeFrom, final long timeTo) { - final List<KeyValue<Long, byte[]>> returnSet = new LinkedList<>(); - - // add one b/c records expire exactly retentionPeriod ms after created - final long minTime = Math.max(timeFrom, this.observedStreamTime - this.retentionPeriod + 1); - final Bytes keyFrom = wrapForDups(key, 0); - final Bytes keyTo = wrapForDups(key, Integer.MAX_VALUE); - - for (final Map.Entry<Long, NavigableMap<Bytes, byte[]>> segmentMapEntry : this.segmentMap.subMap(minTime, true, timeTo, true).entrySet()) { - for (final Map.Entry<Bytes, byte[]> kvMapEntry : segmentMapEntry.getValue().subMap(keyFrom, true, keyTo, true).entrySet()) { - returnSet.add(new KeyValue<>(segmentMapEntry.getKey(), kvMapEntry.getValue())); - } - } - return returnSet; - } - private void removeExpiredSegments() { - final long minLiveTime = this.observedStreamTime - this.retentionPeriod; - this.segmentMap.headMap(minLiveTime, true).clear(); - } - - private KeyValue<Windowed<Bytes>, byte[]> getWindowedKeyValue(final Bytes key, - final long startTimestamp, - final byte[] value) { - final Windowed<Bytes> windowedK = new Windowed<>(key, new TimeWindow(startTimestamp, startTimestamp + windowSize)); - return new KeyValue<>(windowedK, value); + long minLiveTime = Math.max(0L, this.observedStreamTime - this.retentionPeriod + 1); + for (final InMemoryWindowStoreIteratorWrapper it : openIterators) { + minLiveTime = Math.min(minLiveTime, it.minTime()); + } + this.segmentMap.headMap(minLiveTime, false).clear(); } private void maybeUpdateSeqnumForDups() { @@ -304,76 +272,166 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> { } - private static class InMemoryWindowStoreIterator implements WindowStoreIterator<byte[]> { + private abstract class InMemoryWindowStoreIteratorWrapper implements Comparable<InMemoryWindowStoreIteratorWrapper> { + + private Iterator<Map.Entry<Long, ConcurrentNavigableMap<Bytes, byte[]>>> segmentIterator; + private Iterator<Map.Entry<Bytes, byte[]>> recordIterator; + private KeyValue<Bytes, byte[]> next; + private long currentTime; - private ListIterator<KeyValue<Long, byte[]>> iterator; + private final boolean allKeys; + private Bytes keyFrom; + private Bytes keyTo; - InMemoryWindowStoreIterator(final ListIterator<KeyValue<Long, byte[]>> iterator) { - this.iterator = iterator; + // Default constructor sets up a dummy iterator when no results are returned (eg entire fetch range is expired) + InMemoryWindowStoreIteratorWrapper() { + this.allKeys = false; + recordIterator = null; + } + + InMemoryWindowStoreIteratorWrapper(final Bytes keyFrom, + final Bytes keyTo, + final Iterator<Map.Entry<Long, ConcurrentNavigableMap<Bytes, byte[]>>> segmentIterator) { + this.allKeys = (keyFrom == null) && (keyTo == null); + if (retainDuplicates && !allKeys) { + this.keyFrom = wrapForDups(keyFrom, 0); + this.keyTo = wrapForDups(keyTo, Integer.MAX_VALUE); + } else { + this.keyFrom = keyFrom; + this.keyTo = keyTo; + } + + this.segmentIterator = segmentIterator; + this.recordIterator = setRecordIterator(); + + openIterators.add(this); } - @Override public boolean hasNext() { - return iterator.hasNext(); + if (next != null) { + return true; + } + if (recordIterator == null || (!recordIterator.hasNext() && !segmentIterator.hasNext())) { + return false; + } + + next = getNext(); + return next != null; } - @Override - public KeyValue<Long, byte[]> next() { - return iterator.next(); + public void remove() { + throw new UnsupportedOperationException( + "remove() is not supported in " + getClass().getName()); } - @Override - public Long peekNextKey() { - if (!hasNext()) { - throw new NoSuchElementException(); + public void close() { + openIterators.remove(this); + } + + // getNext is only called when either recordIterator or segmentIterator has a next + // Note this does not guarantee a next record exists as the next segments may not contain any keys in range + protected KeyValue<Bytes, byte[]> getNext() { + while (!recordIterator.hasNext()) { + recordIterator = setRecordIterator(); + if (recordIterator == null) { + return null; + } + } + final Map.Entry<Bytes, byte[]> nextRecord = recordIterator.next(); + return new KeyValue<>(nextRecord.getKey(), nextRecord.getValue()); + } + + // Resets recordIterator to point to the next segment and returns null if there are no more segments + // Note it may not actually point to anything if no keys in range exist in the next segment + Iterator<Map.Entry<Bytes, byte[]>> setRecordIterator() { + if (!segmentIterator.hasNext()) { + return null; + } + + final Map.Entry<Long, ConcurrentNavigableMap<Bytes, byte[]>> currentSegment = segmentIterator.next(); + currentTime = currentSegment.getKey(); + + if (allKeys) { + return currentSegment.getValue().entrySet().iterator(); } else { - final long next = iterator.next().key; - iterator.previous(); - return next; + return currentSegment.getValue().subMap(keyFrom, true, keyTo, true).entrySet().iterator(); } } - @Override - public void close() { - iterator = null; + Long minTime() { + return currentTime; } - } - private static class InMemoryWindowedKeyValueIterator implements KeyValueIterator<Windowed<Bytes>, byte[]> { + public int compareTo(final InMemoryWindowStoreIteratorWrapper other) { + return (int) (minTime() - other.minTime()); + } + } - ListIterator<KeyValue<Windowed<Bytes>, byte[]>> iterator; + private class WrappedInMemoryWindowStoreIterator extends InMemoryWindowStoreIteratorWrapper implements WindowStoreIterator<byte[]> { - InMemoryWindowedKeyValueIterator(final ListIterator<KeyValue<Windowed<Bytes>, byte[]>> iterator) { - this.iterator = iterator; + WrappedInMemoryWindowStoreIterator() { + super(); } - @Override - public boolean hasNext() { - return iterator.hasNext(); + WrappedInMemoryWindowStoreIterator(final Bytes keyFrom, + final Bytes keyTo, + final Iterator<Map.Entry<Long, ConcurrentNavigableMap<Bytes, byte[]>>> segmentIterator) { + super(keyFrom, keyTo, segmentIterator); } @Override - public KeyValue<Windowed<Bytes>, byte[]> next() { - return iterator.next(); + public Long peekNextKey() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + return super.currentTime; } @Override - public Windowed<Bytes> peekNextKey() { + public KeyValue<Long, byte[]> next() { if (!hasNext()) { throw new NoSuchElementException(); - } else { - final Windowed<Bytes> next = iterator.next().key; - iterator.previous(); - return next; } - } - @Override - public void close() { - iterator = null; + final KeyValue<Long, byte[]> result = new KeyValue<>(super.currentTime, super.next.value); + super.next = null; + return result; } } -} + private class WrappedWindowedKeyValueIterator extends InMemoryWindowStoreIteratorWrapper implements KeyValueIterator<Windowed<Bytes>, byte[]> { + + WrappedWindowedKeyValueIterator() { + super(); + } + + WrappedWindowedKeyValueIterator(final Bytes keyFrom, + final Bytes keyTo, + final Iterator<Map.Entry<Long, ConcurrentNavigableMap<Bytes, byte[]>>> segmentIterator) { + super(keyFrom, keyTo, segmentIterator); + } + + public Windowed<Bytes> peekNextKey() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + return getWindowedKey(); + } + + public KeyValue<Windowed<Bytes>, byte[]> next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + final KeyValue<Windowed<Bytes>, byte[]> result = new KeyValue<>(getWindowedKey(), super.next.value); + super.next = null; + return result; + } + private Windowed<Bytes> getWindowedKey() { + final Bytes key = retainDuplicates ? getKey(super.next.key) : super.next.key; + final TimeWindow timeWindow = new TimeWindow(super.currentTime, super.currentTime + windowSize); + return new Windowed<>(key, timeWindow); + } + } +} \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java index 5de4b44..e7f5ed0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java @@ -361,7 +361,7 @@ public class InMemoryWindowStoreTest { setCurrentTime(currentTime); windowStore.put(1, "five"); - final KeyValueIterator<Windowed<Integer>, String> iterator = windowStore.fetchAll(0L, currentTime); + KeyValueIterator<Windowed<Integer>, String> iterator = windowStore.fetchAll(0L, currentTime); // effect of this put (expires next oldest record, adds new one) should not be reflected in the already fetched results currentTime = currentTime + retentionPeriod / 4; @@ -375,6 +375,15 @@ public class InMemoryWindowStoreTest { assertEquals(windowedPair(1, "four", 3 * (retentionPeriod / 4)), iterator.next()); assertEquals(windowedPair(1, "five", retentionPeriod), iterator.next()); assertFalse(iterator.hasNext()); + + iterator = windowStore.fetchAll(0L, currentTime); + + // If we fetch again after the last put, the second oldest record should have expired and newest should appear in results + assertEquals(windowedPair(1, "three", retentionPeriod / 2), iterator.next()); + assertEquals(windowedPair(1, "four", 3 * (retentionPeriod / 4)), iterator.next()); + assertEquals(windowedPair(1, "five", retentionPeriod), iterator.next()); + assertEquals(windowedPair(1, "six", 5 * (retentionPeriod / 4)), iterator.next()); + assertFalse(iterator.hasNext()); } @Test @@ -473,4 +482,101 @@ public class InMemoryWindowStoreTest { final List<String> messages = appender.getMessages(); assertThat(messages, hasItem("Skipping record for expired segment.")); } + + @Test + public void testIteratorMultiplePeekAndHasNext() { + windowStore = createInMemoryWindowStore(context, false); + + long currentTime = 0; + setCurrentTime(currentTime); + windowStore.put(1, "one"); + + currentTime += windowSize * 10; + setCurrentTime(currentTime); + windowStore.put(2, "two"); + + currentTime += windowSize * 10; + setCurrentTime(currentTime); + windowStore.put(3, "three"); + + final KeyValueIterator<Windowed<Integer>, String> iterator = windowStore.fetch(1, 4, 0L, currentTime); + + assertFalse(!iterator.hasNext()); + assertFalse(!iterator.hasNext()); + assertEquals(new Windowed<>(1, WindowKeySchema.timeWindowForSize(0L, windowSize)), iterator.peekNextKey()); + assertEquals(new Windowed<>(1, WindowKeySchema.timeWindowForSize(0L, windowSize)), iterator.peekNextKey()); + + assertEquals(windowedPair(1, "one", 0), iterator.next()); + assertEquals(windowedPair(2, "two", windowSize * 10), iterator.next()); + assertEquals(windowedPair(3, "three", windowSize * 20), iterator.next()); + assertFalse(iterator.hasNext()); + } + + @Test + public void shouldNotThrowConcurrentModificationException() { + windowStore = createInMemoryWindowStore(context, false); + + long currentTime = 0; + setCurrentTime(currentTime); + windowStore.put(1, "one"); + + currentTime += windowSize * 10; + setCurrentTime(currentTime); + windowStore.put(1, "two"); + + final KeyValueIterator<Windowed<Integer>, String> iterator = windowStore.all(); + + currentTime += windowSize * 10; + setCurrentTime(currentTime); + windowStore.put(1, "three"); + + currentTime += windowSize * 10; + setCurrentTime(currentTime); + windowStore.put(2, "four"); + + // Iterator should return all records in store and not throw exception b/c some were added after fetch + assertEquals(windowedPair(1, "one", 0), iterator.next()); + assertEquals(windowedPair(1, "two", windowSize * 10), iterator.next()); + assertEquals(windowedPair(1, "three", windowSize * 20), iterator.next()); + assertEquals(windowedPair(2, "four", windowSize * 30), iterator.next()); + assertFalse(iterator.hasNext()); + } + + @Test + public void shouldNotExpireFromOpenIterator() { + windowStore = createInMemoryWindowStore(context, false); + + windowStore.put(1, "one", 0L); + windowStore.put(1, "two", 10L); + + windowStore.put(2, "one", 5L); + windowStore.put(2, "two", 15L); + + final WindowStoreIterator<String> iterator1 = windowStore.fetch(1, 0L, 50L); + final WindowStoreIterator<String> iterator2 = windowStore.fetch(2, 0L, 50L); + + // This put expires all four previous records, but they should still be returned from already open iterators + windowStore.put(1, "four", retentionPeriod + 50L); + + assertEquals(new KeyValue<>(0L, "one"), iterator1.next()); + assertEquals(new KeyValue<>(5L, "one"), iterator2.next()); + + assertEquals(new KeyValue<>(15L, "two"), iterator2.next()); + assertEquals(new KeyValue<>(10L, "two"), iterator1.next()); + + assertFalse(iterator1.hasNext()); + assertFalse(iterator2.hasNext()); + } + + @Test + public void shouldNotThrowExceptionWhenFetchRangeIsExpired() { + windowStore = createInMemoryWindowStore(context, false); + + windowStore.put(1, "one", 0L); + windowStore.put(1, "two", retentionPeriod); + + final WindowStoreIterator<String> iterator = windowStore.fetch(1, 0L, 10L); + + assertFalse(iterator.hasNext()); + } }