Repository: kafka Updated Branches: refs/heads/trunk d3b8ff024 -> 9e4548df3
KAFKA-4863; Querying window store may return unwanted keys Make sure that the iterator returned from `WindowStore.fetch(..)` only returns matching keys, rather than all keys that are a prefix match. Author: Damian Guy <damian....@gmail.com> Reviewers: Eno Thereska, Guozhang Wang Closes #2662 from dguy/kafka-4863 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9e4548df Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9e4548df Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9e4548df Branch: refs/heads/trunk Commit: 9e4548df30d50a56ae99cb3383f1a3f97bbe77bb Parents: d3b8ff0 Author: Damian Guy <damian....@gmail.com> Authored: Tue Mar 14 14:21:03 2017 -0700 Committer: Guozhang Wang <wangg...@gmail.com> Committed: Tue Mar 14 14:21:03 2017 -0700 ---------------------------------------------------------------------- .../state/internals/CachingSessionStore.java | 52 -- .../state/internals/CachingWindowStore.java | 12 +- .../state/internals/FilteredCacheIterator.java | 73 ++ .../state/internals/WindowKeySchema.java | 22 +- .../state/internals/CachingWindowStoreTest.java | 25 +- .../internals/FilteredCacheIteratorTest.java | 118 +++ .../state/internals/RocksDBWindowStoreTest.java | 843 ++++++++++--------- 7 files changed, 663 insertions(+), 482 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/9e4548df/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java index 58c0133..a4b46ff 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java @@ -18,7 +18,6 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.Bytes; -import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.CacheFlushListener; import org.apache.kafka.streams.kstream.internals.SessionKeySerde; @@ -31,7 +30,6 @@ import org.apache.kafka.streams.state.SessionStore; import org.apache.kafka.streams.state.StateSerdes; import java.util.List; -import java.util.NoSuchElementException; class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore implements SessionStore<K, AGG>, CachedStateStore<Windowed<K>, AGG> { @@ -162,54 +160,4 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i this.flushListener = flushListener; } - private static class FilteredCacheIterator implements PeekingKeyValueIterator<Bytes, LRUCacheEntry> { - private final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator; - private final HasNextCondition hasNextCondition; - - FilteredCacheIterator(final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator, - final HasNextCondition hasNextCondition) { - this.cacheIterator = cacheIterator; - this.hasNextCondition = hasNextCondition; - } - - @Override - public void close() { - // no-op - } - - @Override - public Bytes peekNextKey() { - if (!hasNext()) { - throw new NoSuchElementException(); - } - return cacheIterator.peekNextKey(); - } - - @Override - public boolean hasNext() { - return hasNextCondition.hasNext(cacheIterator); - } - - @Override - public KeyValue<Bytes, LRUCacheEntry> next() { - if (!hasNext()) { - throw new NoSuchElementException(); - } - return cacheIterator.next(); - - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - - @Override - public KeyValue<Bytes, LRUCacheEntry> peekNext() { - if (!hasNext()) { - throw new NoSuchElementException(); - } - return cacheIterator.peekNext(); - } - } } http://git-wip-us.apache.org/repos/asf/kafka/blob/9e4548df/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java index 7ea2fa4..4003e54 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java @@ -38,6 +38,7 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl private final Serde<K> keySerde; private final Serde<V> valueSerde; private final long windowSize; + private final SegmentedBytesStore.KeySchema keySchema = new WindowKeySchema(); private String name; private ThreadCache cache; @@ -149,9 +150,16 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl Bytes fromBytes = WindowStoreUtils.toBinaryKey(key, timeFrom, 0, serdes); Bytes toBytes = WindowStoreUtils.toBinaryKey(key, timeTo, 0, serdes); - final WindowStoreIterator<byte[]> underlyingIterator = underlying.fetch(Bytes.wrap(serdes.rawKey(key)), timeFrom, timeTo); + final Bytes keyBytes = Bytes.wrap(serdes.rawKey(key)); + final WindowStoreIterator<byte[]> underlyingIterator = underlying.fetch(keyBytes, timeFrom, timeTo); final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(name, fromBytes, toBytes); - return new MergedSortedCacheWindowStoreIterator<>(cacheIterator, + + final HasNextCondition hasNextCondition = keySchema.hasNextCondition(keyBytes, + timeFrom, + timeTo); + final PeekingKeyValueIterator<Bytes, LRUCacheEntry> filteredCacheIterator = new FilteredCacheIterator(cacheIterator, hasNextCondition); + + return new MergedSortedCacheWindowStoreIterator<>(filteredCacheIterator, underlyingIterator, new StateSerdes<>(serdes.stateName(), Serdes.Long(), serdes.valueSerde())); } http://git-wip-us.apache.org/repos/asf/kafka/blob/9e4548df/streams/src/main/java/org/apache/kafka/streams/state/internals/FilteredCacheIterator.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/FilteredCacheIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/FilteredCacheIterator.java new file mode 100644 index 0000000..19370b9 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/FilteredCacheIterator.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; + +import java.util.NoSuchElementException; + +class FilteredCacheIterator implements PeekingKeyValueIterator<Bytes, LRUCacheEntry> { + private final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator; + private final HasNextCondition hasNextCondition; + + FilteredCacheIterator(final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator, + final HasNextCondition hasNextCondition) { + this.cacheIterator = cacheIterator; + this.hasNextCondition = hasNextCondition; + } + + @Override + public void close() { + // no-op + } + + @Override + public Bytes peekNextKey() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + return cacheIterator.peekNextKey(); + } + + @Override + public boolean hasNext() { + return hasNextCondition.hasNext(cacheIterator); + } + + @Override + public KeyValue<Bytes, LRUCacheEntry> next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + return cacheIterator.next(); + + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + @Override + public KeyValue<Bytes, LRUCacheEntry> peekNext() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + return cacheIterator.peekNext(); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/9e4548df/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java index 7ed598e..0a89da7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java @@ -24,12 +24,6 @@ import org.apache.kafka.streams.state.StateSerdes; import java.util.List; class WindowKeySchema implements RocksDBSegmentedBytesStore.KeySchema { - private static final HasNextCondition ITERATOR_HAS_NEXT = new HasNextCondition() { - @Override - public boolean hasNext(final KeyValueIterator<Bytes, ?> iterator) { - return iterator.hasNext(); - } - }; private final StateSerdes<Bytes, byte[]> serdes = new StateSerdes<>("window-store-key-schema", Serdes.Bytes(), Serdes.ByteArray()); @Override @@ -49,7 +43,21 @@ class WindowKeySchema implements RocksDBSegmentedBytesStore.KeySchema { @Override public HasNextCondition hasNextCondition(final Bytes binaryKey, final long from, final long to) { - return ITERATOR_HAS_NEXT; + return new HasNextCondition() { + @Override + public boolean hasNext(final KeyValueIterator<Bytes, ?> iterator) { + if (iterator.hasNext()) { + final Bytes bytes = iterator.peekNextKey(); + final Bytes keyBytes = WindowStoreUtils.bytesKeyFromBinaryKey(bytes.get()); + if (!keyBytes.equals(binaryKey)) { + return false; + } + final long time = WindowStoreUtils.timestampFromBinaryKey(bytes.get()); + return time >= from && time <= to; + } + return false; + } + }; } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/9e4548df/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java index c7b6846..297a88e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java @@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.kstream.Windowed; @@ -30,12 +31,17 @@ import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.WindowStoreIterator; import org.apache.kafka.test.MockProcessorContext; import org.apache.kafka.test.TestUtils; +import org.junit.After; import org.junit.Before; import org.junit.Test; import java.io.IOException; +import java.util.List; import static org.apache.kafka.streams.state.internals.ThreadCacheTest.memoryCacheEntrySize; +import static org.apache.kafka.test.StreamsTestUtils.toList; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -53,13 +59,12 @@ public class CachingWindowStoreTest { private ThreadCache cache; private String topic; private WindowKeySchema keySchema; - private RocksDBWindowStore<Bytes, byte[]> windowStore; @Before public void setUp() throws Exception { keySchema = new WindowKeySchema(); underlying = new RocksDBSegmentedBytesStore("test", 30000, 3, keySchema); - windowStore = new RocksDBWindowStore<>(underlying, Serdes.Bytes(), Serdes.ByteArray(), false); + final RocksDBWindowStore<Bytes, byte[]> windowStore = new RocksDBWindowStore<>(underlying, Serdes.Bytes(), Serdes.ByteArray(), false); cacheListener = new CachingKeyValueStoreTest.CacheFlushListenerStub<>(); cachingStore = new CachingWindowStore<>(windowStore, Serdes.String(), @@ -73,6 +78,10 @@ public class CachingWindowStoreTest { cachingStore.init(context, cachingStore); } + @After + public void closeStore() { + cachingStore.close(); + } @Test public void shouldPutFetchFromCache() throws Exception { @@ -179,6 +188,18 @@ public class CachingWindowStoreTest { cachingStore.put("a", "a"); } + @SuppressWarnings("unchecked") + @Test + public void shouldFetchAndIterateOverExactKeys() throws Exception { + cachingStore.put("a", "0001", 0); + cachingStore.put("aa", "0002", 0); + cachingStore.put("a", "0003", 1); + cachingStore.put("aa", "0004", 1); + cachingStore.put("a", "0005", 60000); + + final List<KeyValue<Long, String>> expected = Utils.mkList(KeyValue.pair(0L, "0001"), KeyValue.pair(1L, "0003"), KeyValue.pair(60000L, "0005")); + assertThat(toList(cachingStore.fetch("a", 0, Long.MAX_VALUE)), equalTo(expected)); + } private int addItemsToCache() throws IOException { int cachedSize = 0; http://git-wip-us.apache.org/repos/asf/kafka/blob/9e4548df/streams/src/test/java/org/apache/kafka/streams/state/internals/FilteredCacheIteratorTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/FilteredCacheIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/FilteredCacheIteratorTest.java new file mode 100644 index 0000000..acded8c --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/FilteredCacheIteratorTest.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.junit.Before; +import org.junit.Test; + +import java.util.List; + +import static org.apache.kafka.test.StreamsTestUtils.toList; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class FilteredCacheIteratorTest { + + @SuppressWarnings("unchecked") + private final InMemoryKeyValueStore<Bytes, LRUCacheEntry> store = new InMemoryKeyValueStore("name", null, null); + private final KeyValue<Bytes, LRUCacheEntry> firstEntry = KeyValue.pair(Bytes.wrap("a".getBytes()), + new LRUCacheEntry("1".getBytes())); + private final List<KeyValue<Bytes, LRUCacheEntry>> entries = Utils.mkList( + firstEntry, + KeyValue.pair(Bytes.wrap("b".getBytes()), + new LRUCacheEntry("2".getBytes())), + KeyValue.pair(Bytes.wrap("c".getBytes()), + new LRUCacheEntry("3".getBytes()))); + + private FilteredCacheIterator allIterator; + private FilteredCacheIterator firstEntryIterator; + + @Before + public void before() { + store.putAll(entries); + final HasNextCondition allCondition = new HasNextCondition() { + @Override + public boolean hasNext(final KeyValueIterator<Bytes, ?> iterator) { + return iterator.hasNext(); + } + }; + allIterator = new FilteredCacheIterator( + new DelegatingPeekingKeyValueIterator<>("", + store.all()), allCondition); + + final HasNextCondition firstEntryCondition = new HasNextCondition() { + @Override + public boolean hasNext(final KeyValueIterator<Bytes, ?> iterator) { + return iterator.hasNext() && iterator.peekNextKey().equals(firstEntry.key); + } + }; + firstEntryIterator = new FilteredCacheIterator( + new DelegatingPeekingKeyValueIterator<>("", + store.all()), firstEntryCondition); + + } + + @Test + public void shouldAllowEntryMatchingHasNextCondition() throws Exception { + final List<KeyValue<Bytes, LRUCacheEntry>> keyValues = toList(allIterator); + assertThat(keyValues, equalTo(entries)); + } + + @Test + public void shouldPeekNextKey() throws Exception { + while (allIterator.hasNext()) { + final Bytes nextKey = allIterator.peekNextKey(); + final KeyValue<Bytes, LRUCacheEntry> next = allIterator.next(); + assertThat(next.key, equalTo(nextKey)); + } + } + + @Test + public void shouldPeekNext() throws Exception { + while (allIterator.hasNext()) { + final KeyValue<Bytes, LRUCacheEntry> peeked = allIterator.peekNext(); + final KeyValue<Bytes, LRUCacheEntry> next = allIterator.next(); + assertThat(peeked, equalTo(next)); + } + } + + @Test + public void shouldNotHaveNextIfHasNextConditionNotMet() throws Exception { + assertTrue(firstEntryIterator.hasNext()); + firstEntryIterator.next(); + assertFalse(firstEntryIterator.hasNext()); + } + + @Test + public void shouldFilterEntriesNotMatchingHasNextCondition() throws Exception { + final List<KeyValue<Bytes, LRUCacheEntry>> keyValues = toList(firstEntryIterator); + assertThat(keyValues, equalTo(Utils.mkList(firstEntry))); + } + + @Test(expected = UnsupportedOperationException.class) + public void shouldThrowUnsupportedOperationExeceptionOnRemove() throws Exception { + allIterator.remove(); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/9e4548df/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java index b7dd942..7352673 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java @@ -34,6 +34,7 @@ import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.state.WindowStoreIterator; import org.apache.kafka.test.MockProcessorContext; import org.apache.kafka.test.TestUtils; +import org.junit.After; import org.junit.Test; import java.io.File; @@ -47,6 +48,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; @@ -87,6 +90,7 @@ public class RocksDBWindowStoreTest { private final File baseDir = TestUtils.tempDirectory("test"); private final MockProcessorContext context = new MockProcessorContext(baseDir, Serdes.ByteArray(), Serdes.ByteArray(), recordCollector, cache); + private WindowStore windowStore; @SuppressWarnings("unchecked") private <K, V> WindowStore<K, V> createWindowStore(ProcessorContext context, final boolean enableCaching, final boolean retainDuplicates) { @@ -96,9 +100,15 @@ public class RocksDBWindowStoreTest { return store; } + @After + public void closeStore() { + windowStore.close(); + } + + @SuppressWarnings("unchecked") @Test public void shouldOnlyIterateOpenSegments() throws Exception { - final WindowStore<Integer, String> windowStore = createWindowStore(context, false, true); + windowStore = createWindowStore(context, false, true); long currentTime = 0; context.setRecordContext(createRecordContext(currentTime)); windowStore.put(1, "one"); @@ -128,494 +138,461 @@ public class RocksDBWindowStoreTest { return new ProcessorRecordContext(time, 0, 0, "topic"); } + @SuppressWarnings("unchecked") @Test public void testPutAndFetch() throws IOException { - WindowStore<Integer, String> store = createWindowStore(context, false, true); - try { - long startTime = segmentSize - 4L; - - putFirstBatch(store, startTime, context); - - assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime + 0L - windowSize, startTime + 0L + windowSize))); - assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + 1L - windowSize, startTime + 1L + windowSize))); - assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L - windowSize, startTime + 2L + windowSize))); - assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + 3L - windowSize, startTime + 3L + windowSize))); - assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + 4L - windowSize, startTime + 4L + windowSize))); - assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + 5L - windowSize, startTime + 5L + windowSize))); - - putSecondBatch(store, startTime, context); - - assertEquals(Utils.mkList(), toList(store.fetch(2, startTime - 2L - windowSize, startTime - 2L + windowSize))); - assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime - 1L - windowSize, startTime - 1L + windowSize))); - assertEquals(Utils.mkList("two", "two+1"), toList(store.fetch(2, startTime - windowSize, startTime + windowSize))); - assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(store.fetch(2, startTime + 1L - windowSize, startTime + 1L + windowSize))); - assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(store.fetch(2, startTime + 2L - windowSize, startTime + 2L + windowSize))); - assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4"), toList(store.fetch(2, startTime + 3L - windowSize, startTime + 3L + windowSize))); - assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4", "two+5"), toList(store.fetch(2, startTime + 4L - windowSize, startTime + 4L + windowSize))); - assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 5L - windowSize, startTime + 5L + windowSize))); - assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 6L - windowSize, startTime + 6L + windowSize))); - assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 7L - windowSize, startTime + 7L + windowSize))); - assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 8L - windowSize, startTime + 8L + windowSize))); - assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 9L - windowSize, startTime + 9L + windowSize))); - assertEquals(Utils.mkList("two+5", "two+6"), toList(store.fetch(2, startTime + 10L - windowSize, startTime + 10L + windowSize))); - assertEquals(Utils.mkList("two+6"), toList(store.fetch(2, startTime + 11L - windowSize, startTime + 11L + windowSize))); - assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 12L - windowSize, startTime + 12L + windowSize))); - - // Flush the store and verify all current entries were properly flushed ... - store.flush(); - - Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog, startTime); - - assertEquals(Utils.mkSet("zero@0"), entriesByKey.get(0)); - assertEquals(Utils.mkSet("one@1"), entriesByKey.get(1)); - assertEquals(Utils.mkSet("two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"), entriesByKey.get(2)); - assertNull(entriesByKey.get(3)); - assertEquals(Utils.mkSet("four@4"), entriesByKey.get(4)); - assertEquals(Utils.mkSet("five@5"), entriesByKey.get(5)); - assertNull(entriesByKey.get(6)); - - } finally { - store.close(); - } + windowStore = createWindowStore(context, false, true); + long startTime = segmentSize - 4L; + + putFirstBatch(windowStore, startTime, context); + + assertEquals(Utils.mkList("zero"), toList(windowStore.fetch(0, startTime + 0L - windowSize, startTime + 0L + windowSize))); + assertEquals(Utils.mkList("one"), toList(windowStore.fetch(1, startTime + 1L - windowSize, startTime + 1L + windowSize))); + assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + 2L - windowSize, startTime + 2L + windowSize))); + assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + 3L - windowSize, startTime + 3L + windowSize))); + assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + 4L - windowSize, startTime + 4L + windowSize))); + assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + 5L - windowSize, startTime + 5L + windowSize))); + + putSecondBatch(windowStore, startTime, context); + + assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime - 2L - windowSize, startTime - 2L + windowSize))); + assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime - 1L - windowSize, startTime - 1L + windowSize))); + assertEquals(Utils.mkList("two", "two+1"), toList(windowStore.fetch(2, startTime - windowSize, startTime + windowSize))); + assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(windowStore.fetch(2, startTime + 1L - windowSize, startTime + 1L + windowSize))); + assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(windowStore.fetch(2, startTime + 2L - windowSize, startTime + 2L + windowSize))); + assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4"), toList(windowStore.fetch(2, startTime + 3L - windowSize, startTime + 3L + windowSize))); + assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4", "two+5"), toList(windowStore.fetch(2, startTime + 4L - windowSize, startTime + 4L + windowSize))); + assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 5L - windowSize, startTime + 5L + windowSize))); + assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 6L - windowSize, startTime + 6L + windowSize))); + assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 7L - windowSize, startTime + 7L + windowSize))); + assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 8L - windowSize, startTime + 8L + windowSize))); + assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 9L - windowSize, startTime + 9L + windowSize))); + assertEquals(Utils.mkList("two+5", "two+6"), toList(windowStore.fetch(2, startTime + 10L - windowSize, startTime + 10L + windowSize))); + assertEquals(Utils.mkList("two+6"), toList(windowStore.fetch(2, startTime + 11L - windowSize, startTime + 11L + windowSize))); + assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 12L - windowSize, startTime + 12L + windowSize))); + + // Flush the store and verify all current entries were properly flushed ... + windowStore.flush(); + + Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog, startTime); + + assertEquals(Utils.mkSet("zero@0"), entriesByKey.get(0)); + assertEquals(Utils.mkSet("one@1"), entriesByKey.get(1)); + assertEquals(Utils.mkSet("two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"), entriesByKey.get(2)); + assertNull(entriesByKey.get(3)); + assertEquals(Utils.mkSet("four@4"), entriesByKey.get(4)); + assertEquals(Utils.mkSet("five@5"), entriesByKey.get(5)); + assertNull(entriesByKey.get(6)); } + @SuppressWarnings("unchecked") @Test public void testPutAndFetchBefore() throws IOException { - WindowStore<Integer, String> store = createWindowStore(context, false, true); - try { - long startTime = segmentSize - 4L; - - putFirstBatch(store, startTime, context); - - assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime + 0L - windowSize, startTime + 0L))); - assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + 1L - windowSize, startTime + 1L))); - assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L - windowSize, startTime + 2L))); - assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + 3L - windowSize, startTime + 3L))); - assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + 4L - windowSize, startTime + 4L))); - assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + 5L - windowSize, startTime + 5L))); - - putSecondBatch(store, startTime, context); - - assertEquals(Utils.mkList(), toList(store.fetch(2, startTime - 1L - windowSize, startTime - 1L))); - assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 0L - windowSize, startTime + 0L))); - assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 1L - windowSize, startTime + 1L))); - assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L - windowSize, startTime + 2L))); - assertEquals(Utils.mkList("two", "two+1"), toList(store.fetch(2, startTime + 3L - windowSize, startTime + 3L))); - assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(store.fetch(2, startTime + 4L - windowSize, startTime + 4L))); - assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(store.fetch(2, startTime + 5L - windowSize, startTime + 5L))); - assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4"), toList(store.fetch(2, startTime + 6L - windowSize, startTime + 6L))); - assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5"), toList(store.fetch(2, startTime + 7L - windowSize, startTime + 7L))); - assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 8L - windowSize, startTime + 8L))); - assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 9L - windowSize, startTime + 9L))); - assertEquals(Utils.mkList("two+5", "two+6"), toList(store.fetch(2, startTime + 10L - windowSize, startTime + 10L))); - assertEquals(Utils.mkList("two+6"), toList(store.fetch(2, startTime + 11L - windowSize, startTime + 11L))); - assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 12L - windowSize, startTime + 12L))); - assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 13L - windowSize, startTime + 13L))); - - // Flush the store and verify all current entries were properly flushed ... - store.flush(); - - Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog, startTime); - - assertEquals(Utils.mkSet("zero@0"), entriesByKey.get(0)); - assertEquals(Utils.mkSet("one@1"), entriesByKey.get(1)); - assertEquals(Utils.mkSet("two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"), entriesByKey.get(2)); - assertNull(entriesByKey.get(3)); - assertEquals(Utils.mkSet("four@4"), entriesByKey.get(4)); - assertEquals(Utils.mkSet("five@5"), entriesByKey.get(5)); - assertNull(entriesByKey.get(6)); - - } finally { - store.close(); - } + windowStore = createWindowStore(context, false, true); + long startTime = segmentSize - 4L; + + putFirstBatch(windowStore, startTime, context); + + assertEquals(Utils.mkList("zero"), toList(windowStore.fetch(0, startTime + 0L - windowSize, startTime + 0L))); + assertEquals(Utils.mkList("one"), toList(windowStore.fetch(1, startTime + 1L - windowSize, startTime + 1L))); + assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + 2L - windowSize, startTime + 2L))); + assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + 3L - windowSize, startTime + 3L))); + assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + 4L - windowSize, startTime + 4L))); + assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + 5L - windowSize, startTime + 5L))); + + putSecondBatch(windowStore, startTime, context); + + assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime - 1L - windowSize, startTime - 1L))); + assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 0L - windowSize, startTime + 0L))); + assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 1L - windowSize, startTime + 1L))); + assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + 2L - windowSize, startTime + 2L))); + assertEquals(Utils.mkList("two", "two+1"), toList(windowStore.fetch(2, startTime + 3L - windowSize, startTime + 3L))); + assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(windowStore.fetch(2, startTime + 4L - windowSize, startTime + 4L))); + assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(windowStore.fetch(2, startTime + 5L - windowSize, startTime + 5L))); + assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4"), toList(windowStore.fetch(2, startTime + 6L - windowSize, startTime + 6L))); + assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5"), toList(windowStore.fetch(2, startTime + 7L - windowSize, startTime + 7L))); + assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 8L - windowSize, startTime + 8L))); + assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 9L - windowSize, startTime + 9L))); + assertEquals(Utils.mkList("two+5", "two+6"), toList(windowStore.fetch(2, startTime + 10L - windowSize, startTime + 10L))); + assertEquals(Utils.mkList("two+6"), toList(windowStore.fetch(2, startTime + 11L - windowSize, startTime + 11L))); + assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 12L - windowSize, startTime + 12L))); + assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 13L - windowSize, startTime + 13L))); + + // Flush the store and verify all current entries were properly flushed ... + windowStore.flush(); + + Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog, startTime); + + assertEquals(Utils.mkSet("zero@0"), entriesByKey.get(0)); + assertEquals(Utils.mkSet("one@1"), entriesByKey.get(1)); + assertEquals(Utils.mkSet("two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"), entriesByKey.get(2)); + assertNull(entriesByKey.get(3)); + assertEquals(Utils.mkSet("four@4"), entriesByKey.get(4)); + assertEquals(Utils.mkSet("five@5"), entriesByKey.get(5)); + assertNull(entriesByKey.get(6)); } + @SuppressWarnings("unchecked") @Test public void testPutAndFetchAfter() throws IOException { - WindowStore<Integer, String> store = createWindowStore(context, false, true); - try { - long startTime = segmentSize - 4L; - - putFirstBatch(store, startTime, context); - - assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime + 0L, startTime + 0L + windowSize))); - assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + 1L, startTime + 1L + windowSize))); - assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L, startTime + 2L + windowSize))); - assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + 3L, startTime + 3L + windowSize))); - assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + 4L, startTime + 4L + windowSize))); - assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + 5L, startTime + 5L + windowSize))); - - putSecondBatch(store, startTime, context); - - assertEquals(Utils.mkList(), toList(store.fetch(2, startTime - 2L, startTime - 2L + windowSize))); - assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime - 1L, startTime - 1L + windowSize))); - assertEquals(Utils.mkList("two", "two+1"), toList(store.fetch(2, startTime, startTime + windowSize))); - assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(store.fetch(2, startTime + 1L, startTime + 1L + windowSize))); - assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(store.fetch(2, startTime + 2L, startTime + 2L + windowSize))); - assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4"), toList(store.fetch(2, startTime + 3L, startTime + 3L + windowSize))); - assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5"), toList(store.fetch(2, startTime + 4L, startTime + 4L + windowSize))); - assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 5L, startTime + 5L + windowSize))); - assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 6L, startTime + 6L + windowSize))); - assertEquals(Utils.mkList("two+5", "two+6"), toList(store.fetch(2, startTime + 7L, startTime + 7L + windowSize))); - assertEquals(Utils.mkList("two+6"), toList(store.fetch(2, startTime + 8L, startTime + 8L + windowSize))); - assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 9L, startTime + 9L + windowSize))); - assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 10L, startTime + 10L + windowSize))); - assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 11L, startTime + 11L + windowSize))); - assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 12L, startTime + 12L + windowSize))); - - // Flush the store and verify all current entries were properly flushed ... - store.flush(); - - Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog, startTime); - - assertEquals(Utils.mkSet("zero@0"), entriesByKey.get(0)); - assertEquals(Utils.mkSet("one@1"), entriesByKey.get(1)); - assertEquals(Utils.mkSet("two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"), entriesByKey.get(2)); - assertNull(entriesByKey.get(3)); - assertEquals(Utils.mkSet("four@4"), entriesByKey.get(4)); - assertEquals(Utils.mkSet("five@5"), entriesByKey.get(5)); - assertNull(entriesByKey.get(6)); - - } finally { - store.close(); - } + windowStore = createWindowStore(context, false, true); + long startTime = segmentSize - 4L; + + putFirstBatch(windowStore, startTime, context); + + assertEquals(Utils.mkList("zero"), toList(windowStore.fetch(0, startTime + 0L, startTime + 0L + windowSize))); + assertEquals(Utils.mkList("one"), toList(windowStore.fetch(1, startTime + 1L, startTime + 1L + windowSize))); + assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + 2L, startTime + 2L + windowSize))); + assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + 3L, startTime + 3L + windowSize))); + assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + 4L, startTime + 4L + windowSize))); + assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + 5L, startTime + 5L + windowSize))); + + putSecondBatch(windowStore, startTime, context); + + assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime - 2L, startTime - 2L + windowSize))); + assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime - 1L, startTime - 1L + windowSize))); + assertEquals(Utils.mkList("two", "two+1"), toList(windowStore.fetch(2, startTime, startTime + windowSize))); + assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(windowStore.fetch(2, startTime + 1L, startTime + 1L + windowSize))); + assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(windowStore.fetch(2, startTime + 2L, startTime + 2L + windowSize))); + assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4"), toList(windowStore.fetch(2, startTime + 3L, startTime + 3L + windowSize))); + assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5"), toList(windowStore.fetch(2, startTime + 4L, startTime + 4L + windowSize))); + assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 5L, startTime + 5L + windowSize))); + assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 6L, startTime + 6L + windowSize))); + assertEquals(Utils.mkList("two+5", "two+6"), toList(windowStore.fetch(2, startTime + 7L, startTime + 7L + windowSize))); + assertEquals(Utils.mkList("two+6"), toList(windowStore.fetch(2, startTime + 8L, startTime + 8L + windowSize))); + assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 9L, startTime + 9L + windowSize))); + assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 10L, startTime + 10L + windowSize))); + assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 11L, startTime + 11L + windowSize))); + assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 12L, startTime + 12L + windowSize))); + + // Flush the store and verify all current entries were properly flushed ... + windowStore.flush(); + + Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog, startTime); + + assertEquals(Utils.mkSet("zero@0"), entriesByKey.get(0)); + assertEquals(Utils.mkSet("one@1"), entriesByKey.get(1)); + assertEquals(Utils.mkSet("two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"), entriesByKey.get(2)); + assertNull(entriesByKey.get(3)); + assertEquals(Utils.mkSet("four@4"), entriesByKey.get(4)); + assertEquals(Utils.mkSet("five@5"), entriesByKey.get(5)); + assertNull(entriesByKey.get(6)); } + @SuppressWarnings("unchecked") @Test public void testPutSameKeyTimestamp() throws IOException { - WindowStore<Integer, String> store = createWindowStore(context, false, true); - try { - long startTime = segmentSize - 4L; - - context.setRecordContext(createRecordContext(startTime)); - store.put(0, "zero"); + windowStore = createWindowStore(context, false, true); + long startTime = segmentSize - 4L; - assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime - windowSize, startTime + windowSize))); + context.setRecordContext(createRecordContext(startTime)); + windowStore.put(0, "zero"); - store.put(0, "zero"); - store.put(0, "zero+"); - store.put(0, "zero++"); + assertEquals(Utils.mkList("zero"), toList(windowStore.fetch(0, startTime - windowSize, startTime + windowSize))); - assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0, startTime - windowSize, startTime + windowSize))); - assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0, startTime + 1L - windowSize, startTime + 1L + windowSize))); - assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0, startTime + 2L - windowSize, startTime + 2L + windowSize))); - assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0, startTime + 3L - windowSize, startTime + 3L + windowSize))); - assertEquals(Utils.mkList(), toList(store.fetch(0, startTime + 4L - windowSize, startTime + 4L + windowSize))); + windowStore.put(0, "zero"); + windowStore.put(0, "zero+"); + windowStore.put(0, "zero++"); - // Flush the store and verify all current entries were properly flushed ... - store.flush(); + assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(windowStore.fetch(0, startTime - windowSize, startTime + windowSize))); + assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(windowStore.fetch(0, startTime + 1L - windowSize, startTime + 1L + windowSize))); + assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(windowStore.fetch(0, startTime + 2L - windowSize, startTime + 2L + windowSize))); + assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(windowStore.fetch(0, startTime + 3L - windowSize, startTime + 3L + windowSize))); + assertEquals(Utils.mkList(), toList(windowStore.fetch(0, startTime + 4L - windowSize, startTime + 4L + windowSize))); - Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog, startTime); + // Flush the store and verify all current entries were properly flushed ... + windowStore.flush(); - assertEquals(Utils.mkSet("zero@0", "zero@0", "zero+@0", "zero++@0"), entriesByKey.get(0)); + Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog, startTime); - } finally { - store.close(); - } + assertEquals(Utils.mkSet("zero@0", "zero@0", "zero+@0", "zero++@0"), entriesByKey.get(0)); } @Test public void testCachingEnabled() throws IOException { - WindowStore<Integer, String> store = createWindowStore(context, true, false); - assertTrue(store instanceof CachedStateStore); + windowStore = createWindowStore(context, true, false); + assertTrue(windowStore instanceof CachedStateStore); } + @SuppressWarnings("unchecked") @Test public void testRolling() throws IOException { - WindowStore<Integer, String> store = createWindowStore(context, false, true); + windowStore = createWindowStore(context, false, true); + + // to validate segments + final Segments segments = new Segments(windowName, retentionPeriod, numSegments); + long startTime = segmentSize * 2; + long incr = segmentSize / 2; + context.setRecordContext(createRecordContext(startTime)); + windowStore.put(0, "zero"); + assertEquals(Utils.mkSet(segments.segmentName(2)), segmentDirs(baseDir)); + + context.setRecordContext(createRecordContext(startTime + incr)); + windowStore.put(1, "one"); + assertEquals(Utils.mkSet(segments.segmentName(2)), segmentDirs(baseDir)); + + context.setRecordContext(createRecordContext(startTime + incr * 2)); + windowStore.put(2, "two"); + assertEquals(Utils.mkSet(segments.segmentName(2), + segments.segmentName(3)), segmentDirs(baseDir)); + + context.setRecordContext(createRecordContext(startTime + incr * 4)); + windowStore.put(4, "four"); + assertEquals(Utils.mkSet(segments.segmentName(2), + segments.segmentName(3), + segments.segmentName(4)), segmentDirs(baseDir)); + + + context.setRecordContext(createRecordContext(startTime + incr * 5)); + windowStore.put(5, "five"); + assertEquals(Utils.mkSet(segments.segmentName(2), + segments.segmentName(3), + segments.segmentName(4)), segmentDirs(baseDir)); + + assertEquals(Utils.mkList("zero"), toList(windowStore.fetch(0, startTime - windowSize, startTime + windowSize))); + assertEquals(Utils.mkList("one"), toList(windowStore.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize))); + assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize))); + assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize))); + assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize))); + assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize))); + + context.setRecordContext(createRecordContext(startTime + incr * 6)); + windowStore.put(6, "six"); + assertEquals(Utils.mkSet(segments.segmentName(3), + segments.segmentName(4), + segments.segmentName(5)), segmentDirs(baseDir)); + + + assertEquals(Utils.mkList(), toList(windowStore.fetch(0, startTime - windowSize, startTime + windowSize))); + assertEquals(Utils.mkList(), toList(windowStore.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize))); + assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize))); + assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize))); + assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize))); + assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize))); + assertEquals(Utils.mkList("six"), toList(windowStore.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize))); + + + context.setRecordContext(createRecordContext(startTime + incr * 7)); + windowStore.put(7, "seven"); + assertEquals(Utils.mkSet(segments.segmentName(3), + segments.segmentName(4), + segments.segmentName(5)), segmentDirs(baseDir)); + + assertEquals(Utils.mkList(), toList(windowStore.fetch(0, startTime - windowSize, startTime + windowSize))); + assertEquals(Utils.mkList(), toList(windowStore.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize))); + assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize))); + assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize))); + assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize))); + assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize))); + assertEquals(Utils.mkList("six"), toList(windowStore.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize))); + assertEquals(Utils.mkList("seven"), toList(windowStore.fetch(7, startTime + incr * 7 - windowSize, startTime + incr * 7 + windowSize))); + + context.setRecordContext(createRecordContext(startTime + incr * 8)); + windowStore.put(8, "eight"); + assertEquals(Utils.mkSet(segments.segmentName(4), + segments.segmentName(5), + segments.segmentName(6)), segmentDirs(baseDir)); + + + assertEquals(Utils.mkList(), toList(windowStore.fetch(0, startTime - windowSize, startTime + windowSize))); + assertEquals(Utils.mkList(), toList(windowStore.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize))); + assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize))); + assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize))); + assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize))); + assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize))); + assertEquals(Utils.mkList("six"), toList(windowStore.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize))); + assertEquals(Utils.mkList("seven"), toList(windowStore.fetch(7, startTime + incr * 7 - windowSize, startTime + incr * 7 + windowSize))); + assertEquals(Utils.mkList("eight"), toList(windowStore.fetch(8, startTime + incr * 8 - windowSize, startTime + incr * 8 + windowSize))); + + // check segment directories + windowStore.flush(); + assertEquals(Utils.mkSet(segments.segmentName(4), + segments.segmentName(5), + segments.segmentName(6)), segmentDirs(baseDir)); + - try { - // to validate segments - final Segments segments = new Segments(windowName, retentionPeriod, numSegments); - long startTime = segmentSize * 2; - long incr = segmentSize / 2; - context.setRecordContext(createRecordContext(startTime)); - store.put(0, "zero"); - assertEquals(Utils.mkSet(segments.segmentName(2)), segmentDirs(baseDir)); - - context.setRecordContext(createRecordContext(startTime + incr)); - store.put(1, "one"); - assertEquals(Utils.mkSet(segments.segmentName(2)), segmentDirs(baseDir)); - - context.setRecordContext(createRecordContext(startTime + incr * 2)); - store.put(2, "two"); - assertEquals(Utils.mkSet(segments.segmentName(2), - segments.segmentName(3)), segmentDirs(baseDir)); - - context.setRecordContext(createRecordContext(startTime + incr * 4)); - store.put(4, "four"); - assertEquals(Utils.mkSet(segments.segmentName(2), - segments.segmentName(3), - segments.segmentName(4)), segmentDirs(baseDir)); - - - context.setRecordContext(createRecordContext(startTime + incr * 5)); - store.put(5, "five"); - assertEquals(Utils.mkSet(segments.segmentName(2), - segments.segmentName(3), - segments.segmentName(4)), segmentDirs(baseDir)); - - assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime - windowSize, startTime + windowSize))); - assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize))); - assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize))); - assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize))); - assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize))); - assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize))); - - context.setRecordContext(createRecordContext(startTime + incr * 6)); - store.put(6, "six"); - assertEquals(Utils.mkSet(segments.segmentName(3), - segments.segmentName(4), - segments.segmentName(5)), segmentDirs(baseDir)); - - - assertEquals(Utils.mkList(), toList(store.fetch(0, startTime - windowSize, startTime + windowSize))); - assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize))); - assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize))); - assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize))); - assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize))); - assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize))); - assertEquals(Utils.mkList("six"), toList(store.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize))); - - - context.setRecordContext(createRecordContext(startTime + incr * 7)); - store.put(7, "seven"); - assertEquals(Utils.mkSet(segments.segmentName(3), - segments.segmentName(4), - segments.segmentName(5)), segmentDirs(baseDir)); - - assertEquals(Utils.mkList(), toList(store.fetch(0, startTime - windowSize, startTime + windowSize))); - assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize))); - assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize))); - assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize))); - assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize))); - assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize))); - assertEquals(Utils.mkList("six"), toList(store.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize))); - assertEquals(Utils.mkList("seven"), toList(store.fetch(7, startTime + incr * 7 - windowSize, startTime + incr * 7 + windowSize))); - - context.setRecordContext(createRecordContext(startTime + incr * 8)); - store.put(8, "eight"); - assertEquals(Utils.mkSet(segments.segmentName(4), - segments.segmentName(5), - segments.segmentName(6)), segmentDirs(baseDir)); - - - assertEquals(Utils.mkList(), toList(store.fetch(0, startTime - windowSize, startTime + windowSize))); - assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize))); - assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize))); - assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize))); - assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize))); - assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize))); - assertEquals(Utils.mkList("six"), toList(store.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize))); - assertEquals(Utils.mkList("seven"), toList(store.fetch(7, startTime + incr * 7 - windowSize, startTime + incr * 7 + windowSize))); - assertEquals(Utils.mkList("eight"), toList(store.fetch(8, startTime + incr * 8 - windowSize, startTime + incr * 8 + windowSize))); - - // check segment directories - store.flush(); - assertEquals(Utils.mkSet(segments.segmentName(4), - segments.segmentName(5), - segments.segmentName(6)), segmentDirs(baseDir)); - - } finally { - store.close(); - } } + @SuppressWarnings("unchecked") @Test public void testRestore() throws IOException { long startTime = segmentSize * 2; long incr = segmentSize / 2; - WindowStore<Integer, String> store = createWindowStore(context, false, true); - try { - context.setRecordContext(createRecordContext(startTime)); - store.put(0, "zero"); - context.setRecordContext(createRecordContext(startTime + incr)); - store.put(1, "one"); - context.setRecordContext(createRecordContext(startTime + incr * 2)); - store.put(2, "two"); - context.setRecordContext(createRecordContext(startTime + incr * 3)); - store.put(3, "three"); - context.setRecordContext(createRecordContext(startTime + incr * 4)); - store.put(4, "four"); - context.setRecordContext(createRecordContext(startTime + incr * 5)); - store.put(5, "five"); - context.setRecordContext(createRecordContext(startTime + incr * 6)); - store.put(6, "six"); - context.setRecordContext(createRecordContext(startTime + incr * 7)); - store.put(7, "seven"); - context.setRecordContext(createRecordContext(startTime + incr * 8)); - store.put(8, "eight"); - store.flush(); - - } finally { - store.close(); - } + windowStore = createWindowStore(context, false, true); + context.setRecordContext(createRecordContext(startTime)); + windowStore.put(0, "zero"); + context.setRecordContext(createRecordContext(startTime + incr)); + windowStore.put(1, "one"); + context.setRecordContext(createRecordContext(startTime + incr * 2)); + windowStore.put(2, "two"); + context.setRecordContext(createRecordContext(startTime + incr * 3)); + windowStore.put(3, "three"); + context.setRecordContext(createRecordContext(startTime + incr * 4)); + windowStore.put(4, "four"); + context.setRecordContext(createRecordContext(startTime + incr * 5)); + windowStore.put(5, "five"); + context.setRecordContext(createRecordContext(startTime + incr * 6)); + windowStore.put(6, "six"); + context.setRecordContext(createRecordContext(startTime + incr * 7)); + windowStore.put(7, "seven"); + context.setRecordContext(createRecordContext(startTime + incr * 8)); + windowStore.put(8, "eight"); + windowStore.flush(); + + windowStore.close(); // remove local store image Utils.delete(baseDir); - WindowStore<Integer, String> store2 = createWindowStore(context, false, true); - assertEquals(Utils.mkList(), toList(store2.fetch(0, startTime - windowSize, startTime + windowSize))); - assertEquals(Utils.mkList(), toList(store2.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize))); - assertEquals(Utils.mkList(), toList(store2.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize))); - assertEquals(Utils.mkList(), toList(store2.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize))); - assertEquals(Utils.mkList(), toList(store2.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize))); - assertEquals(Utils.mkList(), toList(store2.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize))); - assertEquals(Utils.mkList(), toList(store2.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize))); - assertEquals(Utils.mkList(), toList(store2.fetch(7, startTime + incr * 7 - windowSize, startTime + incr * 7 + windowSize))); - assertEquals(Utils.mkList(), toList(store2.fetch(8, startTime + incr * 8 - windowSize, startTime + incr * 8 + windowSize))); - - try { - context.restore(windowName, changeLog); - - assertEquals(Utils.mkList(), toList(store2.fetch(0, startTime - windowSize, startTime + windowSize))); - assertEquals(Utils.mkList(), toList(store2.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize))); - assertEquals(Utils.mkList(), toList(store2.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize))); - assertEquals(Utils.mkList(), toList(store2.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize))); - assertEquals(Utils.mkList("four"), toList(store2.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize))); - assertEquals(Utils.mkList("five"), toList(store2.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize))); - assertEquals(Utils.mkList("six"), toList(store2.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize))); - assertEquals(Utils.mkList("seven"), toList(store2.fetch(7, startTime + incr * 7 - windowSize, startTime + incr * 7 + windowSize))); - assertEquals(Utils.mkList("eight"), toList(store2.fetch(8, startTime + incr * 8 - windowSize, startTime + incr * 8 + windowSize))); - - // check segment directories - store2.flush(); - assertEquals( - Utils.mkSet(segments.segmentName(4L), segments.segmentName(5L), segments.segmentName(6L)), - segmentDirs(baseDir) - ); - } finally { - store2.close(); - } + windowStore = createWindowStore(context, false, true); + assertEquals(Utils.mkList(), toList(windowStore.fetch(0, startTime - windowSize, startTime + windowSize))); + assertEquals(Utils.mkList(), toList(windowStore.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize))); + assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize))); + assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize))); + assertEquals(Utils.mkList(), toList(windowStore.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize))); + assertEquals(Utils.mkList(), toList(windowStore.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize))); + assertEquals(Utils.mkList(), toList(windowStore.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize))); + assertEquals(Utils.mkList(), toList(windowStore.fetch(7, startTime + incr * 7 - windowSize, startTime + incr * 7 + windowSize))); + assertEquals(Utils.mkList(), toList(windowStore.fetch(8, startTime + incr * 8 - windowSize, startTime + incr * 8 + windowSize))); + + context.restore(windowName, changeLog); + + assertEquals(Utils.mkList(), toList(windowStore.fetch(0, startTime - windowSize, startTime + windowSize))); + assertEquals(Utils.mkList(), toList(windowStore.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize))); + assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize))); + assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize))); + assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize))); + assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize))); + assertEquals(Utils.mkList("six"), toList(windowStore.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize))); + assertEquals(Utils.mkList("seven"), toList(windowStore.fetch(7, startTime + incr * 7 - windowSize, startTime + incr * 7 + windowSize))); + assertEquals(Utils.mkList("eight"), toList(windowStore.fetch(8, startTime + incr * 8 - windowSize, startTime + incr * 8 + windowSize))); + + // check segment directories + windowStore.flush(); + assertEquals( + Utils.mkSet(segments.segmentName(4L), segments.segmentName(5L), segments.segmentName(6L)), + segmentDirs(baseDir) + ); } + @SuppressWarnings("unchecked") @Test public void testSegmentMaintenance() throws IOException { - WindowStore<Integer, String> store = createWindowStore(context, false, true); - try { - context.setTime(0L); - context.setRecordContext(createRecordContext(0)); - store.put(0, "v"); - assertEquals( - Utils.mkSet(segments.segmentName(0L)), - segmentDirs(baseDir) - ); - - context.setRecordContext(createRecordContext(59999)); - store.put(0, "v"); - store.put(0, "v"); - assertEquals( - Utils.mkSet(segments.segmentName(0L)), - segmentDirs(baseDir) - ); - - context.setRecordContext(createRecordContext(60000)); - store.put(0, "v"); - assertEquals( - Utils.mkSet(segments.segmentName(0L), segments.segmentName(1L)), - segmentDirs(baseDir) - ); - - WindowStoreIterator iter; - int fetchedCount; - - iter = store.fetch(0, 0L, 240000L); - fetchedCount = 0; - while (iter.hasNext()) { - iter.next(); - fetchedCount++; - } - assertEquals(4, fetchedCount); + windowStore = createWindowStore(context, false, true); + context.setTime(0L); + context.setRecordContext(createRecordContext(0)); + windowStore.put(0, "v"); + assertEquals( + Utils.mkSet(segments.segmentName(0L)), + segmentDirs(baseDir) + ); + + context.setRecordContext(createRecordContext(59999)); + windowStore.put(0, "v"); + windowStore.put(0, "v"); + assertEquals( + Utils.mkSet(segments.segmentName(0L)), + segmentDirs(baseDir) + ); + + context.setRecordContext(createRecordContext(60000)); + windowStore.put(0, "v"); + assertEquals( + Utils.mkSet(segments.segmentName(0L), segments.segmentName(1L)), + segmentDirs(baseDir) + ); + + WindowStoreIterator iter; + int fetchedCount; + + iter = windowStore.fetch(0, 0L, 240000L); + fetchedCount = 0; + while (iter.hasNext()) { + iter.next(); + fetchedCount++; + } + assertEquals(4, fetchedCount); - assertEquals( - Utils.mkSet(segments.segmentName(0L), segments.segmentName(1L)), - segmentDirs(baseDir) - ); + assertEquals( + Utils.mkSet(segments.segmentName(0L), segments.segmentName(1L)), + segmentDirs(baseDir) + ); - context.setRecordContext(createRecordContext(180000)); - store.put(0, "v"); + context.setRecordContext(createRecordContext(180000)); + windowStore.put(0, "v"); - iter = store.fetch(0, 0L, 240000L); - fetchedCount = 0; - while (iter.hasNext()) { - iter.next(); - fetchedCount++; - } - assertEquals(2, fetchedCount); + iter = windowStore.fetch(0, 0L, 240000L); + fetchedCount = 0; + while (iter.hasNext()) { + iter.next(); + fetchedCount++; + } + assertEquals(2, fetchedCount); - assertEquals( - Utils.mkSet(segments.segmentName(1L), segments.segmentName(3L)), - segmentDirs(baseDir) - ); + assertEquals( + Utils.mkSet(segments.segmentName(1L), segments.segmentName(3L)), + segmentDirs(baseDir) + ); - context.setRecordContext(createRecordContext(300000)); - store.put(0, "v"); + context.setRecordContext(createRecordContext(300000)); + windowStore.put(0, "v"); - iter = store.fetch(0, 240000L, 1000000L); - fetchedCount = 0; - while (iter.hasNext()) { - iter.next(); - fetchedCount++; - } - assertEquals(1, fetchedCount); + iter = windowStore.fetch(0, 240000L, 1000000L); + fetchedCount = 0; + while (iter.hasNext()) { + iter.next(); + fetchedCount++; + } + assertEquals(1, fetchedCount); - assertEquals( - Utils.mkSet(segments.segmentName(3L), segments.segmentName(5L)), - segmentDirs(baseDir) - ); + assertEquals( + Utils.mkSet(segments.segmentName(3L), segments.segmentName(5L)), + segmentDirs(baseDir) + ); - } finally { - store.close(); - } } + @SuppressWarnings("unchecked") @Test public void testInitialLoading() throws IOException { File storeDir = new File(baseDir, windowName); - WindowStore<Integer, String> store = createWindowStore(context, false, true); + windowStore = createWindowStore(context, false, true); - try { - new File(storeDir, segments.segmentName(0L)).mkdir(); - new File(storeDir, segments.segmentName(1L)).mkdir(); - new File(storeDir, segments.segmentName(2L)).mkdir(); - new File(storeDir, segments.segmentName(3L)).mkdir(); - new File(storeDir, segments.segmentName(4L)).mkdir(); - new File(storeDir, segments.segmentName(5L)).mkdir(); - new File(storeDir, segments.segmentName(6L)).mkdir(); - } finally { - store.close(); - } + new File(storeDir, segments.segmentName(0L)).mkdir(); + new File(storeDir, segments.segmentName(1L)).mkdir(); + new File(storeDir, segments.segmentName(2L)).mkdir(); + new File(storeDir, segments.segmentName(3L)).mkdir(); + new File(storeDir, segments.segmentName(4L)).mkdir(); + new File(storeDir, segments.segmentName(5L)).mkdir(); + new File(storeDir, segments.segmentName(6L)).mkdir(); + windowStore.close(); - store = createWindowStore(context, false, true); + windowStore = createWindowStore(context, false, true); - try { - assertEquals( - Utils.mkSet(segments.segmentName(4L), segments.segmentName(5L), segments.segmentName(6L)), - segmentDirs(baseDir) - ); + assertEquals( + Utils.mkSet(segments.segmentName(4L), segments.segmentName(5L), segments.segmentName(6L)), + segmentDirs(baseDir) + ); - try (WindowStoreIterator iter = store.fetch(0, 0L, 1000000L)) { - while (iter.hasNext()) { - iter.next(); - } + try (WindowStoreIterator iter = windowStore.fetch(0, 0L, 1000000L)) { + while (iter.hasNext()) { + iter.next(); } - - assertEquals( - Utils.mkSet(segments.segmentName(4L), segments.segmentName(5L), segments.segmentName(6L)), - segmentDirs(baseDir) - ); - - } finally { - store.close(); } + + assertEquals( + Utils.mkSet(segments.segmentName(4L), segments.segmentName(5L), segments.segmentName(6L)), + segmentDirs(baseDir) + ); } + @SuppressWarnings("unchecked") @Test public void shouldCloseOpenIteratorsWhenStoreIsClosedAndThrowInvalidStateStoreExceptionOnHasNextAndNext() throws Exception { - final WindowStore<Integer, String> windowStore = createWindowStore(context, false, true); + windowStore = createWindowStore(context, false, true); context.setRecordContext(createRecordContext(0)); windowStore.put(1, "one", 1L); windowStore.put(1, "two", 2L); @@ -639,6 +616,34 @@ public class RocksDBWindowStoreTest { } } + @SuppressWarnings("unchecked") + @Test + public void shouldFetchAndIterateOverExactKeys() throws Exception { + final RocksDBWindowStoreSupplier<String, String> supplier = + new RocksDBWindowStoreSupplier<>( + "window", + 60 * 1000L * 2, 3, + true, + Serdes.String(), + Serdes.String(), + windowSize, + true, + Collections.<String, String>emptyMap(), + false); + + windowStore = supplier.get(); + windowStore.init(context, windowStore); + + windowStore.put("a", "0001", 0); + windowStore.put("aa", "0002", 0); + windowStore.put("a", "0003", 1); + windowStore.put("aa", "0004", 1); + windowStore.put("a", "0005", 60000); + + final List expected = Utils.mkList("0001", "0003", "0005"); + assertThat(toList(windowStore.fetch("a", 0, Long.MAX_VALUE)), equalTo(expected)); + } + private void putFirstBatch(final WindowStore<Integer, String> store, final long startTime, final MockProcessorContext context) { context.setRecordContext(createRecordContext(startTime)); store.put(0, "zero");