Repository: kafka Updated Branches: refs/heads/trunk cc3570d1a -> a62eb5993
http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStore.java index 5189318..2f30712 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStore.java @@ -20,6 +20,7 @@ package org.apache.kafka.streams.state; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.kstream.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; import java.text.SimpleDateFormat; @@ -34,6 +35,8 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> { public static final long MIN_SEGMENT_INTERVAL = 60 * 1000; // one minute + private static final long USE_CURRENT_TIMESTAMP = -1L; + private static class Segment extends RocksDBStore<byte[], byte[]> { public final long id; @@ -73,11 +76,14 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> { } @Override - public V next() { + public KeyValue<Long, V> next() { if (index >= iterators.length) throw new NoSuchElementException(); - return serdes.valueFrom(iterators[index].next().value()); + Entry<byte[], byte[]> entry = iterators[index].next(); + + return new KeyValue<>(WindowStoreUtil.timestampFromBinaryKey(entry.key()), + serdes.valueFrom(entry.value())); } @Override @@ -86,6 +92,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> { iterators[index].remove(); } + @Override public void close() { for (KeyValueIterator<byte[], byte[]> iterator : iterators) { iterator.close(); @@ -94,9 +101,8 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> { } private final String name; - private final long windowBefore; - private final long windowAfter; private final long segmentInterval; + private final boolean retainDuplicates; private final Segment[] segments; private final Serdes<K, V> serdes; private final SimpleDateFormat formatter; @@ -105,14 +111,8 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> { private long currentSegmentId = -1L; private int seqnum = 0; - public RocksDBWindowStore(String name, long windowBefore, long windowAfter, long retentionPeriod, int numSegments, Serdes<K, V> serdes) { + public RocksDBWindowStore(String name, long retentionPeriod, int numSegments, boolean retainDuplicates, Serdes<K, V> serdes) { this.name = name; - this.windowBefore = windowBefore; - this.windowAfter = windowAfter; - - // The retention period must be at least two times as long as the total window size - if ((this.windowBefore + this.windowAfter + 1) * 2 > retentionPeriod) - retentionPeriod = (this.windowBefore + this.windowAfter + 1) * 2; // The segment interval must be greater than MIN_SEGMENT_INTERVAL this.segmentInterval = Math.max(retentionPeriod / (numSegments - 1), MIN_SEGMENT_INTERVAL); @@ -120,6 +120,8 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> { this.segments = new Segment[numSegments]; this.serdes = serdes; + this.retainDuplicates = retainDuplicates; + // Create a date formatter. Formatted timestamps are used as segment name suffixes this.formatter = new SimpleDateFormat("yyyyMMddHHmm"); this.formatter.setTimeZone(new SimpleTimeZone(0, "GMT")); @@ -158,12 +160,18 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> { @Override public void put(K key, V value) { - putAndReturnInternalKey(key, value); + putAndReturnInternalKey(key, value, USE_CURRENT_TIMESTAMP); } @Override - public byte[] putAndReturnInternalKey(K key, V value) { - long timestamp = context.timestamp(); + public void put(K key, V value, long timestamp) { + putAndReturnInternalKey(key, value, timestamp); + } + + @Override + public byte[] putAndReturnInternalKey(K key, V value, long t) { + long timestamp = t == USE_CURRENT_TIMESTAMP ? context.timestamp() : t; + long segmentId = segmentId(timestamp); if (segmentId > currentSegmentId) { @@ -174,7 +182,8 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> { // If the record is within the retention period, put it in the store. if (segmentId > currentSegmentId - segments.length) { - seqnum = (seqnum + 1) & 0x7FFFFFFF; + if (retainDuplicates) + seqnum = (seqnum + 1) & 0x7FFFFFFF; byte[] binaryKey = WindowStoreUtil.toBinaryKey(key, timestamp, seqnum, serdes); getSegment(segmentId).put(binaryKey, serdes.rawValue(value)); return binaryKey; @@ -213,10 +222,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> { @SuppressWarnings("unchecked") @Override - public WindowStoreIterator<V> fetch(K key, long timestamp) { - long timeFrom = Math.max(0L, timestamp - windowBefore); - long timeTo = Math.max(0L, timestamp + windowAfter); - + public WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo) { long segFrom = segmentId(timeFrom); long segTo = segmentId(Math.max(0L, timeTo)); http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStoreSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStoreSupplier.java index 73814ef..fcdcb9b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStoreSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStoreSupplier.java @@ -32,18 +32,16 @@ import org.apache.kafka.streams.processor.StateStoreSupplier; public class RocksDBWindowStoreSupplier<K, V> implements StateStoreSupplier { private final String name; - private final long windowBefore; - private final long windowAfter; private final long retentionPeriod; + private final boolean retainDuplicates; private final int numSegments; private final Serdes serdes; private final Time time; - public RocksDBWindowStoreSupplier(String name, long windowBefore, long windowAfter, long retentionPeriod, int numSegments, Serdes<K, V> serdes, Time time) { + public RocksDBWindowStoreSupplier(String name, long retentionPeriod, int numSegments, boolean retainDuplicates, Serdes<K, V> serdes, Time time) { this.name = name; - this.windowBefore = windowBefore; - this.windowAfter = windowAfter; this.retentionPeriod = retentionPeriod; + this.retainDuplicates = retainDuplicates; this.numSegments = numSegments; this.serdes = serdes; this.time = time; @@ -54,7 +52,7 @@ public class RocksDBWindowStoreSupplier<K, V> implements StateStoreSupplier { } public StateStore get() { - return new MeteredWindowStore<>(new RocksDBWindowStore<K, V>(name, windowBefore, windowAfter, retentionPeriod, numSegments, serdes), "rocksdb-window", time); + return new MeteredWindowStore<>(new RocksDBWindowStore<K, V>(name, retentionPeriod, numSegments, retainDuplicates, serdes), "rocksdb-window", time); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java index 344aa91..b17d889 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java @@ -25,9 +25,11 @@ public interface WindowStore<K, V> extends StateStore { void put(K key, V value); - byte[] putAndReturnInternalKey(K key, V value); + void put(K key, V value, long timestamp); - WindowStoreIterator<V> fetch(K key, long timestamp); + byte[] putAndReturnInternalKey(K key, V value, long timestamp); + + WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo); void putInternal(byte[] binaryKey, byte[] binaryValue); http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java index e57a00f..55d1ac3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java @@ -19,8 +19,10 @@ package org.apache.kafka.streams.state; +import org.apache.kafka.streams.kstream.KeyValue; + import java.util.Iterator; -public interface WindowStoreIterator<E> extends Iterator<E> { +public interface WindowStoreIterator<E> extends Iterator<KeyValue<Long, E>> { void close(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamAggregateTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamAggregateTest.java new file mode 100644 index 0000000..ba596a9 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamAggregateTest.java @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.AggregatorSupplier; +import org.apache.kafka.streams.kstream.HoppingWindows; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.test.KStreamTestDriver; +import org.apache.kafka.test.MockProcessorSupplier; +import org.junit.Test; + +import java.io.File; +import java.nio.file.Files; + +import static org.junit.Assert.assertEquals; + +public class KStreamAggregateTest { + + private final Serializer<String> strSerializer = new StringSerializer(); + private final Deserializer<String> strDeserializer = new StringDeserializer(); + + private class StringCanonizeSupplier implements AggregatorSupplier<String, String, String> { + + private class StringCanonizer implements Aggregator<String, String, String> { + + @Override + public String initialValue() { + return "0"; + } + + @Override + public String add(String aggKey, String value, String aggregate) { + return aggregate + "+" + value; + } + + @Override + public String remove(String aggKey, String value, String aggregate) { + return aggregate + "-" + value; + } + + @Override + public String merge(String aggr1, String aggr2) { + return "(" + aggr1 + ") + (" + aggr2 + ")"; + } + } + + @Override + public Aggregator<String, String, String> get() { + return new StringCanonizer(); + } + } + + @Test + public void testAggBasic() throws Exception { + final File baseDir = Files.createTempDirectory("test").toFile(); + + try { + final KStreamBuilder builder = new KStreamBuilder(); + String topic1 = "topic1"; + + KStream<String, String> stream1 = builder.stream(strDeserializer, strDeserializer, topic1); + KTable<Windowed<String>, String> table2 = stream1.aggregateByKey(new StringCanonizeSupplier(), + HoppingWindows.of("topic1-Canonized").with(10L).every(5L), + strSerializer, + strSerializer, + strDeserializer, + strDeserializer); + + MockProcessorSupplier<Windowed<String>, String> proc2 = new MockProcessorSupplier<>(); + table2.toStream().process(proc2); + + KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir); + + driver.setTime(0L); + driver.process(topic1, "A", "1"); + driver.setTime(1L); + driver.process(topic1, "B", "2"); + driver.setTime(2L); + driver.process(topic1, "C", "3"); + driver.setTime(3L); + driver.process(topic1, "D", "4"); + driver.setTime(4L); + driver.process(topic1, "A", "1"); + + driver.setTime(5L); + driver.process(topic1, "A", "1"); + driver.setTime(6L); + driver.process(topic1, "B", "2"); + driver.setTime(7L); + driver.process(topic1, "D", "4"); + driver.setTime(8L); + driver.process(topic1, "B", "2"); + driver.setTime(9L); + driver.process(topic1, "C", "3"); + + driver.setTime(10L); + driver.process(topic1, "A", "1"); + driver.setTime(11L); + driver.process(topic1, "B", "2"); + driver.setTime(12L); + driver.process(topic1, "D", "4"); + driver.setTime(13L); + driver.process(topic1, "B", "2"); + driver.setTime(14L); + driver.process(topic1, "C", "3"); + + assertEquals(Utils.mkList( + "[A@0]:0+1", + "[B@0]:0+2", + "[C@0]:0+3", + "[D@0]:0+4", + "[A@0]:0+1+1", + + "[A@0]:0+1+1+1", "[A@5]:0+1", + "[B@0]:0+2+2", "[B@5]:0+2", + "[D@0]:0+4+4", "[D@5]:0+4", + "[B@0]:0+2+2+2", "[B@5]:0+2+2", + "[C@0]:0+3+3", "[C@5]:0+3", + + "[A@5]:0+1+1", "[A@10]:0+1", + "[B@5]:0+2+2+2", "[B@10]:0+2", + "[D@5]:0+4+4", "[D@10]:0+4", + "[B@5]:0+2+2+2+2", "[B@10]:0+2+2", + "[C@5]:0+3+3", "[C@10]:0+3"), proc2.processed); + + } finally { + Utils.delete(baseDir); + } + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java index 90341a8..e763fd2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java @@ -125,7 +125,7 @@ public class KStreamKStreamJoinTest { // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 } // w2 = { 0:Y0, 1:Y1 } // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 } - // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 } + // w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3 } for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); @@ -137,7 +137,7 @@ public class KStreamKStreamJoinTest { // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 } // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3, 0:XX0, 1:XX1, 2:XX2, 3:XX3 } - // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 } + // w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3 } for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); @@ -149,7 +149,7 @@ public class KStreamKStreamJoinTest { // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3, 0:XX0, 1:XX1, 2:XX2, 3:XX3 } // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3, 0:XX0, 1:XX1, 2:XX2, 3:XX3 } - // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3, 0:YYY0, 1:YYY1 } + // w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3, 0:YYY0, 1:YYY1 } for (int i = 0; i < 2; i++) { driver.process(topic2, expectedKeys[i], "YYY" + expectedKeys[i]); http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java index 189cf9d..b5037ee 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java @@ -47,7 +47,7 @@ public class KTableAggregateTest { @Override public String initialValue() { - return ""; + return "0"; } @Override @@ -106,14 +106,14 @@ public class KTableAggregateTest { driver.process(topic1, "C", "8"); assertEquals(Utils.mkList( - "A:+1", - "B:+2", - "A:+1+3", "A:+1+3-1", - "B:+2+4", "B:+2+4-2", - "C:+5", - "D:+6", - "B:+2+4-2+7", "B:+2+4-2+7-4", - "C:+5+8", "C:+5+8-5"), proc2.processed); + "A:0+1", + "B:0+2", + "A:0+1+3", "A:0+1+3-1", + "B:0+2+4", "B:0+2+4-2", + "C:0+5", + "D:0+6", + "B:0+2+4-2+7", "B:0+2+4-2+7-4", + "C:0+5+8", "C:0+5+8-5"), proc2.processed); } finally { Utils.delete(baseDir); http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowsTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowsTest.java new file mode 100644 index 0000000..f9b6ba5 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowsTest.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.streams.kstream.HoppingWindows; +import org.apache.kafka.streams.kstream.TumblingWindows; +import org.apache.kafka.streams.kstream.UnlimitedWindows; +import org.junit.Test; + +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +public class WindowsTest { + + @Test + public void hoppingWindows() { + + HoppingWindows windows = HoppingWindows.of("test").with(12L).every(5L); + + Map<Long, HoppingWindow> matched = windows.windowsFor(21L); + + assertEquals(3, matched.size()); + + assertEquals(new HoppingWindow(10L, 22L), matched.get(10L)); + assertEquals(new HoppingWindow(15L, 27L), matched.get(15L)); + assertEquals(new HoppingWindow(20L, 32L), matched.get(20L)); + } + + @Test + public void tumblineWindows() { + + TumblingWindows windows = TumblingWindows.of("test").with(12L); + + Map<Long, TumblingWindow> matched = windows.windowsFor(21L); + + assertEquals(1, matched.size()); + + assertEquals(new TumblingWindow(12L, 24L), matched.get(12L)); + } + + @Test + public void unlimitedWindows() { + + UnlimitedWindows windows = UnlimitedWindows.of("test").startOn(10L); + + Map<Long, UnlimitedWindow> matched = windows.windowsFor(21L); + + assertEquals(1, matched.size()); + + assertEquals(new UnlimitedWindow(10L), matched.get(10L)); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/test/java/org/apache/kafka/streams/state/RocksDBWindowStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/RocksDBWindowStoreTest.java index 6bfddfe..fc7a4e9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/RocksDBWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/RocksDBWindowStoreTest.java @@ -53,13 +53,11 @@ public class RocksDBWindowStoreTest { private final long windowSize = 3; private final Serdes<Integer, String> serdes = Serdes.withBuiltinTypes("", Integer.class, String.class); - - protected <K, V> WindowStore<K, V> createWindowStore(ProcessorContext context, long windowBefore, long windowAfter, Serdes<K, V> serdes) { - StateStoreSupplier supplier = new RocksDBWindowStoreSupplier<>("window", windowBefore, windowAfter, retentionPeriod, numSegments, serdes, null); + protected <K, V> WindowStore<K, V> createWindowStore(ProcessorContext context, Serdes<K, V> serdes) { + StateStoreSupplier supplier = new RocksDBWindowStoreSupplier<>("window", retentionPeriod, numSegments, true, serdes, null); WindowStore<K, V> store = (WindowStore<K, V>) supplier.get(); store.init(context); return store; - } @Test @@ -83,7 +81,7 @@ public class RocksDBWindowStoreTest { byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer, recordCollector); - WindowStore<Integer, String> store = createWindowStore(context, windowSize, windowSize, serdes); + WindowStore<Integer, String> store = createWindowStore(context, serdes); try { long startTime = segmentSize - 4L; @@ -100,12 +98,12 @@ public class RocksDBWindowStoreTest { context.setTime(startTime + 5L); store.put(5, "five"); - assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime + 0L))); - assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + 1L))); - assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L))); - assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + 3L))); - assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + 4L))); - assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + 5L))); + assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime + 0L - windowSize, startTime + 0L + windowSize))); + assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + 1L - windowSize, startTime + 1L + windowSize))); + assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L - windowSize, startTime + 2L + windowSize))); + assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + 3L - windowSize, startTime + 3L + windowSize))); + assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + 4L - windowSize, startTime + 4L + windowSize))); + assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + 5L - windowSize, startTime + 5L + windowSize))); context.setTime(startTime + 3L); store.put(2, "two+1"); @@ -120,21 +118,21 @@ public class RocksDBWindowStoreTest { context.setTime(startTime + 8L); store.put(2, "two+6"); - assertEquals(Utils.mkList(), toList(store.fetch(2, startTime - 2L))); - assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime - 1L))); - assertEquals(Utils.mkList("two", "two+1"), toList(store.fetch(2, startTime))); - assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(store.fetch(2, startTime + 1L))); - assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(store.fetch(2, startTime + 2L))); - assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4"), toList(store.fetch(2, startTime + 3L))); - assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4", "two+5"), toList(store.fetch(2, startTime + 4L))); - assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 5L))); - assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 6L))); - assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 7L))); - assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 8L))); - assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 9L))); - assertEquals(Utils.mkList("two+5", "two+6"), toList(store.fetch(2, startTime + 10L))); - assertEquals(Utils.mkList("two+6"), toList(store.fetch(2, startTime + 11L))); - assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 12L))); + assertEquals(Utils.mkList(), toList(store.fetch(2, startTime - 2L - windowSize, startTime - 2L + windowSize))); + assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime - 1L - windowSize, startTime - 1L + windowSize))); + assertEquals(Utils.mkList("two", "two+1"), toList(store.fetch(2, startTime - windowSize, startTime + windowSize))); + assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(store.fetch(2, startTime + 1L - windowSize, startTime + 1L + windowSize))); + assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(store.fetch(2, startTime + 2L - windowSize, startTime + 2L + windowSize))); + assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4"), toList(store.fetch(2, startTime + 3L - windowSize, startTime + 3L + windowSize))); + assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4", "two+5"), toList(store.fetch(2, startTime + 4L - windowSize, startTime + 4L + windowSize))); + assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 5L - windowSize, startTime + 5L + windowSize))); + assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 6L - windowSize, startTime + 6L + windowSize))); + assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 7L - windowSize, startTime + 7L + windowSize))); + assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 8L - windowSize, startTime + 8L + windowSize))); + assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 9L - windowSize, startTime + 9L + windowSize))); + assertEquals(Utils.mkList("two+5", "two+6"), toList(store.fetch(2, startTime + 10L - windowSize, startTime + 10L + windowSize))); + assertEquals(Utils.mkList("two+6"), toList(store.fetch(2, startTime + 11L - windowSize, startTime + 11L + windowSize))); + assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 12L - windowSize, startTime + 12L + windowSize))); // Flush the store and verify all current entries were properly flushed ... store.flush(); @@ -179,7 +177,7 @@ public class RocksDBWindowStoreTest { byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer, recordCollector); - WindowStore<Integer, String> store = createWindowStore(context, windowSize, 0, serdes); + WindowStore<Integer, String> store = createWindowStore(context, serdes); try { long startTime = segmentSize - 4L; @@ -196,12 +194,12 @@ public class RocksDBWindowStoreTest { context.setTime(startTime + 5L); store.put(5, "five"); - assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime + 0L))); - assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + 1L))); - assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L))); - assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + 3L))); - assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + 4L))); - assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + 5L))); + assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime + 0L - windowSize, startTime + 0L))); + assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + 1L - windowSize, startTime + 1L))); + assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L - windowSize, startTime + 2L))); + assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + 3L - windowSize, startTime + 3L))); + assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + 4L - windowSize, startTime + 4L))); + assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + 5L - windowSize, startTime + 5L))); context.setTime(startTime + 3L); store.put(2, "two+1"); @@ -216,21 +214,21 @@ public class RocksDBWindowStoreTest { context.setTime(startTime + 8L); store.put(2, "two+6"); - assertEquals(Utils.mkList(), toList(store.fetch(2, startTime - 1L))); - assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 0L))); - assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 1L))); - assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L))); - assertEquals(Utils.mkList("two", "two+1"), toList(store.fetch(2, startTime + 3L))); - assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(store.fetch(2, startTime + 4L))); - assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(store.fetch(2, startTime + 5L))); - assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4"), toList(store.fetch(2, startTime + 6L))); - assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5"), toList(store.fetch(2, startTime + 7L))); - assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 8L))); - assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 9L))); - assertEquals(Utils.mkList("two+5", "two+6"), toList(store.fetch(2, startTime + 10L))); - assertEquals(Utils.mkList("two+6"), toList(store.fetch(2, startTime + 11L))); - assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 12L))); - assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 13L))); + assertEquals(Utils.mkList(), toList(store.fetch(2, startTime - 1L - windowSize, startTime - 1L))); + assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 0L - windowSize, startTime + 0L))); + assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 1L - windowSize, startTime + 1L))); + assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L - windowSize, startTime + 2L))); + assertEquals(Utils.mkList("two", "two+1"), toList(store.fetch(2, startTime + 3L - windowSize, startTime + 3L))); + assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(store.fetch(2, startTime + 4L - windowSize, startTime + 4L))); + assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(store.fetch(2, startTime + 5L - windowSize, startTime + 5L))); + assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4"), toList(store.fetch(2, startTime + 6L - windowSize, startTime + 6L))); + assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5"), toList(store.fetch(2, startTime + 7L - windowSize, startTime + 7L))); + assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 8L - windowSize, startTime + 8L))); + assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 9L - windowSize, startTime + 9L))); + assertEquals(Utils.mkList("two+5", "two+6"), toList(store.fetch(2, startTime + 10L - windowSize, startTime + 10L))); + assertEquals(Utils.mkList("two+6"), toList(store.fetch(2, startTime + 11L - windowSize, startTime + 11L))); + assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 12L - windowSize, startTime + 12L))); + assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 13L - windowSize, startTime + 13L))); // Flush the store and verify all current entries were properly flushed ... store.flush(); @@ -275,7 +273,7 @@ public class RocksDBWindowStoreTest { byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer, recordCollector); - WindowStore<Integer, String> store = createWindowStore(context, 0, windowSize, serdes); + WindowStore<Integer, String> store = createWindowStore(context, serdes); try { long startTime = segmentSize - 4L; @@ -292,12 +290,12 @@ public class RocksDBWindowStoreTest { context.setTime(startTime + 5L); store.put(5, "five"); - assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime + 0L))); - assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + 1L))); - assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L))); - assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + 3L))); - assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + 4L))); - assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + 5L))); + assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime + 0L, startTime + 0L + windowSize))); + assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + 1L, startTime + 1L + windowSize))); + assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L, startTime + 2L + windowSize))); + assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + 3L, startTime + 3L + windowSize))); + assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + 4L, startTime + 4L + windowSize))); + assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + 5L, startTime + 5L + windowSize))); context.setTime(startTime + 3L); store.put(2, "two+1"); @@ -312,21 +310,21 @@ public class RocksDBWindowStoreTest { context.setTime(startTime + 8L); store.put(2, "two+6"); - assertEquals(Utils.mkList(), toList(store.fetch(2, startTime - 2L))); - assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime - 1L))); - assertEquals(Utils.mkList("two", "two+1"), toList(store.fetch(2, startTime))); - assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(store.fetch(2, startTime + 1L))); - assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(store.fetch(2, startTime + 2L))); - assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4"), toList(store.fetch(2, startTime + 3L))); - assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5"), toList(store.fetch(2, startTime + 4L))); - assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 5L))); - assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 6L))); - assertEquals(Utils.mkList("two+5", "two+6"), toList(store.fetch(2, startTime + 7L))); - assertEquals(Utils.mkList("two+6"), toList(store.fetch(2, startTime + 8L))); - assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 9L))); - assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 10L))); - assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 11L))); - assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 12L))); + assertEquals(Utils.mkList(), toList(store.fetch(2, startTime - 2L, startTime - 2L + windowSize))); + assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime - 1L, startTime - 1L + windowSize))); + assertEquals(Utils.mkList("two", "two+1"), toList(store.fetch(2, startTime, startTime + windowSize))); + assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(store.fetch(2, startTime + 1L, startTime + 1L + windowSize))); + assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(store.fetch(2, startTime + 2L, startTime + 2L + windowSize))); + assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4"), toList(store.fetch(2, startTime + 3L, startTime + 3L + windowSize))); + assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5"), toList(store.fetch(2, startTime + 4L, startTime + 4L + windowSize))); + assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 5L, startTime + 5L + windowSize))); + assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 6L, startTime + 6L + windowSize))); + assertEquals(Utils.mkList("two+5", "two+6"), toList(store.fetch(2, startTime + 7L, startTime + 7L + windowSize))); + assertEquals(Utils.mkList("two+6"), toList(store.fetch(2, startTime + 8L, startTime + 8L + windowSize))); + assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 9L, startTime + 9L + windowSize))); + assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 10L, startTime + 10L + windowSize))); + assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 11L, startTime + 11L + windowSize))); + assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 12L, startTime + 12L + windowSize))); // Flush the store and verify all current entries were properly flushed ... store.flush(); @@ -371,14 +369,14 @@ public class RocksDBWindowStoreTest { byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer, recordCollector); - WindowStore<Integer, String> store = createWindowStore(context, windowSize, windowSize, serdes); + WindowStore<Integer, String> store = createWindowStore(context, serdes); try { long startTime = segmentSize - 4L; context.setTime(startTime); store.put(0, "zero"); - assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime))); + assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime - windowSize, startTime + windowSize))); context.setTime(startTime); store.put(0, "zero"); @@ -387,11 +385,11 @@ public class RocksDBWindowStoreTest { context.setTime(startTime); store.put(0, "zero++"); - assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0, startTime))); - assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0, startTime + 1L))); - assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0, startTime + 2L))); - assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0, startTime + 3L))); - assertEquals(Utils.mkList(), toList(store.fetch(0, startTime + 4L))); + assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0, startTime - windowSize, startTime + windowSize))); + assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0, startTime + 1L - windowSize, startTime + 1L + windowSize))); + assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0, startTime + 2L - windowSize, startTime + 2L + windowSize))); + assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0, startTime + 3L - windowSize, startTime + 3L + windowSize))); + assertEquals(Utils.mkList(), toList(store.fetch(0, startTime + 4L - windowSize, startTime + 4L + windowSize))); // Flush the store and verify all current entries were properly flushed ... store.flush(); @@ -430,7 +428,7 @@ public class RocksDBWindowStoreTest { byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer, recordCollector); - WindowStore<Integer, String> store = createWindowStore(context, windowSize, windowSize, serdes); + WindowStore<Integer, String> store = createWindowStore(context, serdes); RocksDBWindowStore<Integer, String> inner = (RocksDBWindowStore<Integer, String>) ((MeteredWindowStore<Integer, String>) store).inner(); try { @@ -461,51 +459,52 @@ public class RocksDBWindowStoreTest { store.put(5, "five"); assertEquals(Utils.mkSet(2L, 3L, 4L), inner.segmentIds()); - assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime))); - assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + incr))); - assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + incr * 2))); - assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3))); - assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4))); - assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5))); + assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime - windowSize, startTime + windowSize))); + assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize))); + assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize))); + assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize))); + assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize))); + assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize))); context.setTime(startTime + incr * 6); store.put(6, "six"); assertEquals(Utils.mkSet(3L, 4L, 5L), inner.segmentIds()); - assertEquals(Utils.mkList(), toList(store.fetch(0, startTime))); - assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr))); - assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + incr * 2))); - assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3))); - assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4))); - assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5))); - assertEquals(Utils.mkList("six"), toList(store.fetch(6, startTime + incr * 6))); + assertEquals(Utils.mkList(), toList(store.fetch(0, startTime - windowSize, startTime + windowSize))); + assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize))); + assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize))); + assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize))); + assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize))); + assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize))); + assertEquals(Utils.mkList("six"), toList(store.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize))); + context.setTime(startTime + incr * 7); store.put(7, "seven"); assertEquals(Utils.mkSet(3L, 4L, 5L), inner.segmentIds()); - assertEquals(Utils.mkList(), toList(store.fetch(0, startTime))); - assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr))); - assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + incr * 2))); - assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3))); - assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4))); - assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5))); - assertEquals(Utils.mkList("six"), toList(store.fetch(6, startTime + incr * 6))); - assertEquals(Utils.mkList("seven"), toList(store.fetch(7, startTime + incr * 7))); + assertEquals(Utils.mkList(), toList(store.fetch(0, startTime - windowSize, startTime + windowSize))); + assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize))); + assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize))); + assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize))); + assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize))); + assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize))); + assertEquals(Utils.mkList("six"), toList(store.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize))); + assertEquals(Utils.mkList("seven"), toList(store.fetch(7, startTime + incr * 7 - windowSize, startTime + incr * 7 + windowSize))); context.setTime(startTime + incr * 8); store.put(8, "eight"); assertEquals(Utils.mkSet(4L, 5L, 6L), inner.segmentIds()); - assertEquals(Utils.mkList(), toList(store.fetch(0, startTime))); - assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr))); - assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + incr * 2))); - assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3))); - assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4))); - assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5))); - assertEquals(Utils.mkList("six"), toList(store.fetch(6, startTime + incr * 6))); - assertEquals(Utils.mkList("seven"), toList(store.fetch(7, startTime + incr * 7))); - assertEquals(Utils.mkList("eight"), toList(store.fetch(8, startTime + incr * 8))); + assertEquals(Utils.mkList(), toList(store.fetch(0, startTime - windowSize, startTime + windowSize))); + assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize))); + assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize))); + assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize))); + assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize))); + assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize))); + assertEquals(Utils.mkList("six"), toList(store.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize))); + assertEquals(Utils.mkList("seven"), toList(store.fetch(7, startTime + incr * 7 - windowSize, startTime + incr * 7 + windowSize))); + assertEquals(Utils.mkList("eight"), toList(store.fetch(8, startTime + incr * 8 - windowSize, startTime + incr * 8 + windowSize))); // check segment directories store.flush(); @@ -546,7 +545,7 @@ public class RocksDBWindowStoreTest { byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer, recordCollector); - WindowStore<Integer, String> store = createWindowStore(context, windowSize, windowSize, serdes); + WindowStore<Integer, String> store = createWindowStore(context, serdes); try { context.setTime(startTime); store.put(0, "zero"); @@ -595,7 +594,7 @@ public class RocksDBWindowStoreTest { byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer, recordCollector); - WindowStore<Integer, String> store = createWindowStore(context, windowSize, windowSize, serdes); + WindowStore<Integer, String> store = createWindowStore(context, serdes); RocksDBWindowStore<Integer, String> inner = (RocksDBWindowStore<Integer, String>) ((MeteredWindowStore<Integer, String>) store).inner(); @@ -604,15 +603,15 @@ public class RocksDBWindowStoreTest { assertEquals(Utils.mkSet(4L, 5L, 6L), inner.segmentIds()); - assertEquals(Utils.mkList(), toList(store.fetch(0, startTime))); - assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr))); - assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + incr * 2))); - assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3))); - assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4))); - assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5))); - assertEquals(Utils.mkList("six"), toList(store.fetch(6, startTime + incr * 6))); - assertEquals(Utils.mkList("seven"), toList(store.fetch(7, startTime + incr * 7))); - assertEquals(Utils.mkList("eight"), toList(store.fetch(8, startTime + incr * 8))); + assertEquals(Utils.mkList(), toList(store.fetch(0, startTime - windowSize, startTime + windowSize))); + assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize))); + assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize))); + assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize))); + assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize))); + assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize))); + assertEquals(Utils.mkList("six"), toList(store.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize))); + assertEquals(Utils.mkList("seven"), toList(store.fetch(7, startTime + incr * 7 - windowSize, startTime + incr * 7 + windowSize))); + assertEquals(Utils.mkList("eight"), toList(store.fetch(8, startTime + incr * 8 - windowSize, startTime + incr * 8 + windowSize))); // check segment directories store.flush(); @@ -633,7 +632,7 @@ public class RocksDBWindowStoreTest { private <E> List<E> toList(WindowStoreIterator<E> iterator) { ArrayList<E> list = new ArrayList<>(); while (iterator.hasNext()) { - list.add(iterator.next()); + list.add(iterator.next().value); } return list; }
