Repository: kafka
Updated Branches:
  refs/heads/trunk e4ef8e664 -> 959cf09e8


http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
new file mode 100644
index 0000000..80ad67f
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -0,0 +1,676 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.clients.producer.MockProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.state.Entry;
+import org.apache.kafka.streams.state.Serdes;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.apache.kafka.streams.state.WindowStoreUtil;
+import org.apache.kafka.test.MockProcessorContext;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class RocksDBWindowStoreTest {
+
+    private final ByteArraySerializer byteArraySerializer = new 
ByteArraySerializer();
+    private final ByteArrayDeserializer byteArrayDeserializer = new 
ByteArrayDeserializer();
+    private final int numSegments = 3;
+    private final long segmentSize = RocksDBWindowStore.MIN_SEGMENT_INTERVAL;
+    private final long retentionPeriod = segmentSize * (numSegments - 1);
+    private final long windowSize = 3;
+    private final Serdes<Integer, String> serdes = Serdes.withBuiltinTypes("", 
Integer.class, String.class);
+
+    protected <K, V> WindowStore<K, V> createWindowStore(ProcessorContext 
context, Serdes<K, V> serdes) {
+        StateStoreSupplier supplier = new 
RocksDBWindowStoreSupplier<>("window", retentionPeriod, numSegments, true, 
serdes, null);
+        WindowStore<K, V> store = (WindowStore<K, V>) supplier.get();
+        store.init(context);
+        return store;
+    }
+
+    @Test
+    public void testPutAndFetch() throws IOException {
+        File baseDir = Files.createTempDirectory("test").toFile();
+        try {
+            final List<Entry<byte[], byte[]>> changeLog = new ArrayList<>();
+            Producer<byte[], byte[]> producer = new MockProducer<>(true, 
byteArraySerializer, byteArraySerializer);
+            RecordCollector recordCollector = new RecordCollector(producer) {
+                @Override
+                public <K1, V1> void send(ProducerRecord<K1, V1> record, 
Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
+                    changeLog.add(new Entry<>(
+                                    keySerializer.serialize(record.topic(), 
record.key()),
+                                    valueSerializer.serialize(record.topic(), 
record.value()))
+                    );
+                }
+            };
+
+            MockProcessorContext context = new MockProcessorContext(
+                    null, baseDir,
+                    byteArraySerializer, byteArrayDeserializer, 
byteArraySerializer, byteArrayDeserializer,
+                    recordCollector);
+
+            WindowStore<Integer, String> store = createWindowStore(context, 
serdes);
+            try {
+                long startTime = segmentSize - 4L;
+
+                context.setTime(startTime + 0L);
+                store.put(0, "zero");
+                context.setTime(startTime + 1L);
+                store.put(1, "one");
+                context.setTime(startTime + 2L);
+                store.put(2, "two");
+                context.setTime(startTime + 3L);
+                // (3, "three") is not put
+                context.setTime(startTime + 4L);
+                store.put(4, "four");
+                context.setTime(startTime + 5L);
+                store.put(5, "five");
+
+                assertEquals(Utils.mkList("zero"), toList(store.fetch(0, 
startTime + 0L - windowSize, startTime + 0L + windowSize)));
+                assertEquals(Utils.mkList("one"), toList(store.fetch(1, 
startTime + 1L - windowSize, startTime + 1L + windowSize)));
+                assertEquals(Utils.mkList("two"), toList(store.fetch(2, 
startTime + 2L - windowSize, startTime + 2L + windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + 
3L - windowSize, startTime + 3L + windowSize)));
+                assertEquals(Utils.mkList("four"), toList(store.fetch(4, 
startTime + 4L - windowSize, startTime + 4L + windowSize)));
+                assertEquals(Utils.mkList("five"), toList(store.fetch(5, 
startTime + 5L - windowSize, startTime + 5L + windowSize)));
+
+                context.setTime(startTime + 3L);
+                store.put(2, "two+1");
+                context.setTime(startTime + 4L);
+                store.put(2, "two+2");
+                context.setTime(startTime + 5L);
+                store.put(2, "two+3");
+                context.setTime(startTime + 6L);
+                store.put(2, "two+4");
+                context.setTime(startTime + 7L);
+                store.put(2, "two+5");
+                context.setTime(startTime + 8L);
+                store.put(2, "two+6");
+
+                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime - 
2L - windowSize, startTime - 2L + windowSize)));
+                assertEquals(Utils.mkList("two"), toList(store.fetch(2, 
startTime - 1L - windowSize, startTime - 1L + windowSize)));
+                assertEquals(Utils.mkList("two", "two+1"), 
toList(store.fetch(2, startTime - windowSize, startTime + windowSize)));
+                assertEquals(Utils.mkList("two", "two+1", "two+2"), 
toList(store.fetch(2, startTime + 1L - windowSize, startTime + 1L + 
windowSize)));
+                assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), 
toList(store.fetch(2, startTime + 2L - windowSize, startTime + 2L + 
windowSize)));
+                assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", 
"two+4"), toList(store.fetch(2, startTime + 3L - windowSize, startTime + 3L + 
windowSize)));
+                assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", 
"two+4", "two+5"), toList(store.fetch(2, startTime + 4L - windowSize, startTime 
+ 4L + windowSize)));
+                assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", 
"two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 5L - windowSize, 
startTime + 5L + windowSize)));
+                assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4", 
"two+5", "two+6"), toList(store.fetch(2, startTime + 6L - windowSize, startTime 
+ 6L + windowSize)));
+                assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5", 
"two+6"), toList(store.fetch(2, startTime + 7L - windowSize, startTime + 7L + 
windowSize)));
+                assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), 
toList(store.fetch(2, startTime + 8L - windowSize, startTime + 8L + 
windowSize)));
+                assertEquals(Utils.mkList("two+4", "two+5", "two+6"), 
toList(store.fetch(2, startTime + 9L - windowSize, startTime + 9L + 
windowSize)));
+                assertEquals(Utils.mkList("two+5", "two+6"), 
toList(store.fetch(2, startTime + 10L - windowSize, startTime + 10L + 
windowSize)));
+                assertEquals(Utils.mkList("two+6"), toList(store.fetch(2, 
startTime + 11L - windowSize, startTime + 11L + windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 
12L - windowSize, startTime + 12L + windowSize)));
+
+                // Flush the store and verify all current entries were 
properly flushed ...
+                store.flush();
+
+                Map<Integer, Set<String>> entriesByKey = 
entriesByKey(changeLog, startTime);
+
+                assertEquals(Utils.mkSet("zero@0"), entriesByKey.get(0));
+                assertEquals(Utils.mkSet("one@1"), entriesByKey.get(1));
+                assertEquals(Utils.mkSet("two@2", "two+1@3", "two+2@4", 
"two+3@5", "two+4@6", "two+5@7", "two+6@8"), entriesByKey.get(2));
+                assertNull(entriesByKey.get(3));
+                assertEquals(Utils.mkSet("four@4"), entriesByKey.get(4));
+                assertEquals(Utils.mkSet("five@5"), entriesByKey.get(5));
+                assertNull(entriesByKey.get(6));
+
+            } finally {
+                store.close();
+            }
+
+        } finally {
+            Utils.delete(baseDir);
+        }
+    }
+
+    @Test
+    public void testPutAndFetchBefore() throws IOException {
+        File baseDir = Files.createTempDirectory("test").toFile();
+        try {
+            final List<Entry<byte[], byte[]>> changeLog = new ArrayList<>();
+            Producer<byte[], byte[]> producer = new MockProducer<>(true, 
byteArraySerializer, byteArraySerializer);
+            RecordCollector recordCollector = new RecordCollector(producer) {
+                @Override
+                public <K1, V1> void send(ProducerRecord<K1, V1> record, 
Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
+                    changeLog.add(new Entry<>(
+                                    keySerializer.serialize(record.topic(), 
record.key()),
+                                    valueSerializer.serialize(record.topic(), 
record.value()))
+                    );
+                }
+            };
+
+            MockProcessorContext context = new MockProcessorContext(
+                    null, baseDir,
+                    byteArraySerializer, byteArrayDeserializer, 
byteArraySerializer, byteArrayDeserializer,
+                    recordCollector);
+
+            WindowStore<Integer, String> store = createWindowStore(context, 
serdes);
+            try {
+                long startTime = segmentSize - 4L;
+
+                context.setTime(startTime + 0L);
+                store.put(0, "zero");
+                context.setTime(startTime + 1L);
+                store.put(1, "one");
+                context.setTime(startTime + 2L);
+                store.put(2, "two");
+                context.setTime(startTime + 3L);
+                // (3, "three") is not put
+                context.setTime(startTime + 4L);
+                store.put(4, "four");
+                context.setTime(startTime + 5L);
+                store.put(5, "five");
+
+                assertEquals(Utils.mkList("zero"), toList(store.fetch(0, 
startTime + 0L - windowSize, startTime + 0L)));
+                assertEquals(Utils.mkList("one"), toList(store.fetch(1, 
startTime + 1L - windowSize, startTime + 1L)));
+                assertEquals(Utils.mkList("two"), toList(store.fetch(2, 
startTime + 2L - windowSize, startTime + 2L)));
+                assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + 
3L - windowSize, startTime + 3L)));
+                assertEquals(Utils.mkList("four"), toList(store.fetch(4, 
startTime + 4L - windowSize, startTime + 4L)));
+                assertEquals(Utils.mkList("five"), toList(store.fetch(5, 
startTime + 5L - windowSize, startTime + 5L)));
+
+                context.setTime(startTime + 3L);
+                store.put(2, "two+1");
+                context.setTime(startTime + 4L);
+                store.put(2, "two+2");
+                context.setTime(startTime + 5L);
+                store.put(2, "two+3");
+                context.setTime(startTime + 6L);
+                store.put(2, "two+4");
+                context.setTime(startTime + 7L);
+                store.put(2, "two+5");
+                context.setTime(startTime + 8L);
+                store.put(2, "two+6");
+
+                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime - 
1L - windowSize, startTime - 1L)));
+                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 
0L - windowSize, startTime + 0L)));
+                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 
1L - windowSize, startTime + 1L)));
+                assertEquals(Utils.mkList("two"), toList(store.fetch(2, 
startTime + 2L - windowSize, startTime + 2L)));
+                assertEquals(Utils.mkList("two", "two+1"), 
toList(store.fetch(2, startTime + 3L - windowSize, startTime + 3L)));
+                assertEquals(Utils.mkList("two", "two+1", "two+2"), 
toList(store.fetch(2, startTime + 4L - windowSize, startTime + 4L)));
+                assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), 
toList(store.fetch(2, startTime + 5L - windowSize, startTime + 5L)));
+                assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4"), 
toList(store.fetch(2, startTime + 6L - windowSize, startTime + 6L)));
+                assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5"), 
toList(store.fetch(2, startTime + 7L - windowSize, startTime + 7L)));
+                assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), 
toList(store.fetch(2, startTime + 8L - windowSize, startTime + 8L)));
+                assertEquals(Utils.mkList("two+4", "two+5", "two+6"), 
toList(store.fetch(2, startTime + 9L - windowSize, startTime + 9L)));
+                assertEquals(Utils.mkList("two+5", "two+6"), 
toList(store.fetch(2, startTime + 10L - windowSize, startTime + 10L)));
+                assertEquals(Utils.mkList("two+6"), toList(store.fetch(2, 
startTime + 11L - windowSize, startTime + 11L)));
+                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 
12L - windowSize, startTime + 12L)));
+                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 
13L - windowSize, startTime + 13L)));
+
+                // Flush the store and verify all current entries were 
properly flushed ...
+                store.flush();
+
+                Map<Integer, Set<String>> entriesByKey = 
entriesByKey(changeLog, startTime);
+
+                assertEquals(Utils.mkSet("zero@0"), entriesByKey.get(0));
+                assertEquals(Utils.mkSet("one@1"), entriesByKey.get(1));
+                assertEquals(Utils.mkSet("two@2", "two+1@3", "two+2@4", 
"two+3@5", "two+4@6", "two+5@7", "two+6@8"), entriesByKey.get(2));
+                assertNull(entriesByKey.get(3));
+                assertEquals(Utils.mkSet("four@4"), entriesByKey.get(4));
+                assertEquals(Utils.mkSet("five@5"), entriesByKey.get(5));
+                assertNull(entriesByKey.get(6));
+
+            } finally {
+                store.close();
+            }
+
+        } finally {
+            Utils.delete(baseDir);
+        }
+    }
+
+    @Test
+    public void testPutAndFetchAfter() throws IOException {
+        File baseDir = Files.createTempDirectory("test").toFile();
+        try {
+            final List<Entry<byte[], byte[]>> changeLog = new ArrayList<>();
+            Producer<byte[], byte[]> producer = new MockProducer<>(true, 
byteArraySerializer, byteArraySerializer);
+            RecordCollector recordCollector = new RecordCollector(producer) {
+                @Override
+                public <K1, V1> void send(ProducerRecord<K1, V1> record, 
Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
+                    changeLog.add(new Entry<>(
+                                    keySerializer.serialize(record.topic(), 
record.key()),
+                                    valueSerializer.serialize(record.topic(), 
record.value()))
+                    );
+                }
+            };
+
+            MockProcessorContext context = new MockProcessorContext(
+                    null, baseDir,
+                    byteArraySerializer, byteArrayDeserializer, 
byteArraySerializer, byteArrayDeserializer,
+                    recordCollector);
+
+            WindowStore<Integer, String> store = createWindowStore(context, 
serdes);
+            try {
+                long startTime = segmentSize - 4L;
+
+                context.setTime(startTime + 0L);
+                store.put(0, "zero");
+                context.setTime(startTime + 1L);
+                store.put(1, "one");
+                context.setTime(startTime + 2L);
+                store.put(2, "two");
+                context.setTime(startTime + 3L);
+                // (3, "three") is not put
+                context.setTime(startTime + 4L);
+                store.put(4, "four");
+                context.setTime(startTime + 5L);
+                store.put(5, "five");
+
+                assertEquals(Utils.mkList("zero"), toList(store.fetch(0, 
startTime + 0L, startTime + 0L + windowSize)));
+                assertEquals(Utils.mkList("one"), toList(store.fetch(1, 
startTime + 1L, startTime + 1L + windowSize)));
+                assertEquals(Utils.mkList("two"), toList(store.fetch(2, 
startTime + 2L, startTime + 2L + windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + 
3L, startTime + 3L + windowSize)));
+                assertEquals(Utils.mkList("four"), toList(store.fetch(4, 
startTime + 4L, startTime + 4L + windowSize)));
+                assertEquals(Utils.mkList("five"), toList(store.fetch(5, 
startTime + 5L, startTime + 5L + windowSize)));
+
+                context.setTime(startTime + 3L);
+                store.put(2, "two+1");
+                context.setTime(startTime + 4L);
+                store.put(2, "two+2");
+                context.setTime(startTime + 5L);
+                store.put(2, "two+3");
+                context.setTime(startTime + 6L);
+                store.put(2, "two+4");
+                context.setTime(startTime + 7L);
+                store.put(2, "two+5");
+                context.setTime(startTime + 8L);
+                store.put(2, "two+6");
+
+                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime - 
2L, startTime - 2L + windowSize)));
+                assertEquals(Utils.mkList("two"), toList(store.fetch(2, 
startTime - 1L, startTime - 1L + windowSize)));
+                assertEquals(Utils.mkList("two", "two+1"), 
toList(store.fetch(2, startTime, startTime + windowSize)));
+                assertEquals(Utils.mkList("two", "two+1", "two+2"), 
toList(store.fetch(2, startTime + 1L, startTime + 1L + windowSize)));
+                assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), 
toList(store.fetch(2, startTime + 2L, startTime + 2L + windowSize)));
+                assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4"), 
toList(store.fetch(2, startTime + 3L, startTime + 3L + windowSize)));
+                assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5"), 
toList(store.fetch(2, startTime + 4L, startTime + 4L + windowSize)));
+                assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), 
toList(store.fetch(2, startTime + 5L, startTime + 5L + windowSize)));
+                assertEquals(Utils.mkList("two+4", "two+5", "two+6"), 
toList(store.fetch(2, startTime + 6L, startTime + 6L + windowSize)));
+                assertEquals(Utils.mkList("two+5", "two+6"), 
toList(store.fetch(2, startTime + 7L, startTime + 7L + windowSize)));
+                assertEquals(Utils.mkList("two+6"), toList(store.fetch(2, 
startTime + 8L, startTime + 8L + windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 
9L, startTime + 9L + windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 
10L, startTime + 10L + windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 
11L, startTime + 11L + windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 
12L, startTime + 12L + windowSize)));
+
+                // Flush the store and verify all current entries were 
properly flushed ...
+                store.flush();
+
+                Map<Integer, Set<String>> entriesByKey = 
entriesByKey(changeLog, startTime);
+
+                assertEquals(Utils.mkSet("zero@0"), entriesByKey.get(0));
+                assertEquals(Utils.mkSet("one@1"), entriesByKey.get(1));
+                assertEquals(Utils.mkSet("two@2", "two+1@3", "two+2@4", 
"two+3@5", "two+4@6", "two+5@7", "two+6@8"), entriesByKey.get(2));
+                assertNull(entriesByKey.get(3));
+                assertEquals(Utils.mkSet("four@4"), entriesByKey.get(4));
+                assertEquals(Utils.mkSet("five@5"), entriesByKey.get(5));
+                assertNull(entriesByKey.get(6));
+
+            } finally {
+                store.close();
+            }
+
+        } finally {
+            Utils.delete(baseDir);
+        }
+    }
+
+    @Test
+    public void testPutSameKeyTimestamp() throws IOException {
+        File baseDir = Files.createTempDirectory("test").toFile();
+        try {
+            final List<Entry<byte[], byte[]>> changeLog = new ArrayList<>();
+            Producer<byte[], byte[]> producer = new MockProducer<>(true, 
byteArraySerializer, byteArraySerializer);
+            RecordCollector recordCollector = new RecordCollector(producer) {
+                @Override
+                public <K1, V1> void send(ProducerRecord<K1, V1> record, 
Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
+                    changeLog.add(new Entry<>(
+                                    keySerializer.serialize(record.topic(), 
record.key()),
+                                    valueSerializer.serialize(record.topic(), 
record.value()))
+                    );
+                }
+            };
+
+            MockProcessorContext context = new MockProcessorContext(
+                    null, baseDir,
+                    byteArraySerializer, byteArrayDeserializer, 
byteArraySerializer, byteArrayDeserializer,
+                    recordCollector);
+
+            WindowStore<Integer, String> store = createWindowStore(context, 
serdes);
+            try {
+                long startTime = segmentSize - 4L;
+
+                context.setTime(startTime);
+                store.put(0, "zero");
+
+                assertEquals(Utils.mkList("zero"), toList(store.fetch(0, 
startTime - windowSize, startTime + windowSize)));
+
+                context.setTime(startTime);
+                store.put(0, "zero");
+                context.setTime(startTime);
+                store.put(0, "zero+");
+                context.setTime(startTime);
+                store.put(0, "zero++");
+
+                assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), 
toList(store.fetch(0, startTime - windowSize, startTime + windowSize)));
+                assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), 
toList(store.fetch(0, startTime + 1L - windowSize, startTime + 1L + 
windowSize)));
+                assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), 
toList(store.fetch(0, startTime + 2L - windowSize, startTime + 2L + 
windowSize)));
+                assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), 
toList(store.fetch(0, startTime + 3L - windowSize, startTime + 3L + 
windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(0, startTime + 
4L - windowSize, startTime + 4L + windowSize)));
+
+                // Flush the store and verify all current entries were 
properly flushed ...
+                store.flush();
+
+                Map<Integer, Set<String>> entriesByKey = 
entriesByKey(changeLog, startTime);
+
+                assertEquals(Utils.mkSet("zero@0", "zero@0", "zero+@0", 
"zero++@0"), entriesByKey.get(0));
+
+            } finally {
+                store.close();
+            }
+
+        } finally {
+            Utils.delete(baseDir);
+        }
+    }
+
+    @Test
+    public void testRolling() throws IOException {
+        File baseDir = Files.createTempDirectory("test").toFile();
+        try {
+            final List<Entry<byte[], byte[]>> changeLog = new ArrayList<>();
+            Producer<byte[], byte[]> producer = new MockProducer<>(true, 
byteArraySerializer, byteArraySerializer);
+            RecordCollector recordCollector = new RecordCollector(producer) {
+                @Override
+                public <K1, V1> void send(ProducerRecord<K1, V1> record, 
Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
+                    changeLog.add(new Entry<>(
+                                    keySerializer.serialize(record.topic(), 
record.key()),
+                                    valueSerializer.serialize(record.topic(), 
record.value()))
+                    );
+                }
+            };
+
+            MockProcessorContext context = new MockProcessorContext(
+                    null, baseDir,
+                    byteArraySerializer, byteArrayDeserializer, 
byteArraySerializer, byteArrayDeserializer,
+                    recordCollector);
+
+            WindowStore<Integer, String> store = createWindowStore(context, 
serdes);
+            RocksDBWindowStore<Integer, String> inner =
+                    (RocksDBWindowStore<Integer, String>) 
((MeteredWindowStore<Integer, String>) store).inner();
+            try {
+                long startTime = segmentSize * 2;
+                long incr = segmentSize / 2;
+
+                context.setTime(startTime);
+                store.put(0, "zero");
+                assertEquals(Utils.mkSet(2L), inner.segmentIds());
+
+                context.setTime(startTime + incr);
+                store.put(1, "one");
+                assertEquals(Utils.mkSet(2L), inner.segmentIds());
+
+                context.setTime(startTime + incr * 2);
+                store.put(2, "two");
+                assertEquals(Utils.mkSet(2L, 3L), inner.segmentIds());
+
+                context.setTime(startTime + incr * 3);
+                // (3, "three") is not put
+                assertEquals(Utils.mkSet(2L, 3L), inner.segmentIds());
+
+                context.setTime(startTime + incr * 4);
+                store.put(4, "four");
+                assertEquals(Utils.mkSet(2L, 3L, 4L), inner.segmentIds());
+
+                context.setTime(startTime + incr * 5);
+                store.put(5, "five");
+                assertEquals(Utils.mkSet(2L, 3L, 4L), inner.segmentIds());
+
+                assertEquals(Utils.mkList("zero"), toList(store.fetch(0, 
startTime - windowSize, startTime + windowSize)));
+                assertEquals(Utils.mkList("one"), toList(store.fetch(1, 
startTime + incr - windowSize, startTime + incr + windowSize)));
+                assertEquals(Utils.mkList("two"), toList(store.fetch(2, 
startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + 
incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
+                assertEquals(Utils.mkList("four"), toList(store.fetch(4, 
startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
+                assertEquals(Utils.mkList("five"), toList(store.fetch(5, 
startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
+
+                context.setTime(startTime + incr * 6);
+                store.put(6, "six");
+                assertEquals(Utils.mkSet(3L, 4L, 5L), inner.segmentIds());
+
+                assertEquals(Utils.mkList(), toList(store.fetch(0, startTime - 
windowSize, startTime + windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + 
incr - windowSize, startTime + incr + windowSize)));
+                assertEquals(Utils.mkList("two"), toList(store.fetch(2, 
startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + 
incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
+                assertEquals(Utils.mkList("four"), toList(store.fetch(4, 
startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
+                assertEquals(Utils.mkList("five"), toList(store.fetch(5, 
startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
+                assertEquals(Utils.mkList("six"), toList(store.fetch(6, 
startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize)));
+
+
+                context.setTime(startTime + incr * 7);
+                store.put(7, "seven");
+                assertEquals(Utils.mkSet(3L, 4L, 5L), inner.segmentIds());
+
+                assertEquals(Utils.mkList(), toList(store.fetch(0, startTime - 
windowSize, startTime + windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + 
incr - windowSize, startTime + incr + windowSize)));
+                assertEquals(Utils.mkList("two"), toList(store.fetch(2, 
startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + 
incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
+                assertEquals(Utils.mkList("four"), toList(store.fetch(4, 
startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
+                assertEquals(Utils.mkList("five"), toList(store.fetch(5, 
startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
+                assertEquals(Utils.mkList("six"), toList(store.fetch(6, 
startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize)));
+                assertEquals(Utils.mkList("seven"), toList(store.fetch(7, 
startTime + incr * 7 - windowSize, startTime + incr * 7 + windowSize)));
+
+                context.setTime(startTime + incr * 8);
+                store.put(8, "eight");
+                assertEquals(Utils.mkSet(4L, 5L, 6L), inner.segmentIds());
+
+                assertEquals(Utils.mkList(), toList(store.fetch(0, startTime - 
windowSize, startTime + windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + 
incr - windowSize, startTime + incr + windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 
incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + 
incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
+                assertEquals(Utils.mkList("four"), toList(store.fetch(4, 
startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
+                assertEquals(Utils.mkList("five"), toList(store.fetch(5, 
startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
+                assertEquals(Utils.mkList("six"), toList(store.fetch(6, 
startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize)));
+                assertEquals(Utils.mkList("seven"), toList(store.fetch(7, 
startTime + incr * 7 - windowSize, startTime + incr * 7 + windowSize)));
+                assertEquals(Utils.mkList("eight"), toList(store.fetch(8, 
startTime + incr * 8 - windowSize, startTime + incr * 8 + windowSize)));
+
+                // check segment directories
+                store.flush();
+                assertEquals(
+                        Utils.mkSet(inner.directorySuffix(4L), 
inner.directorySuffix(5L), inner.directorySuffix(6L)),
+                        segmentDirs(baseDir)
+                );
+            } finally {
+                store.close();
+            }
+
+        } finally {
+            Utils.delete(baseDir);
+        }
+    }
+
+    @Test
+    public void testRestore() throws IOException {
+        final List<Entry<byte[], byte[]>> changeLog = new ArrayList<>();
+        long startTime = segmentSize * 2;
+        long incr = segmentSize / 2;
+
+        File baseDir = Files.createTempDirectory("test").toFile();
+        try {
+            Producer<byte[], byte[]> producer = new MockProducer<>(true, 
byteArraySerializer, byteArraySerializer);
+            RecordCollector recordCollector = new RecordCollector(producer) {
+                @Override
+                public <K1, V1> void send(ProducerRecord<K1, V1> record, 
Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
+                    changeLog.add(new Entry<>(
+                                    keySerializer.serialize(record.topic(), 
record.key()),
+                                    valueSerializer.serialize(record.topic(), 
record.value()))
+                    );
+                }
+            };
+
+            MockProcessorContext context = new MockProcessorContext(
+                    null, baseDir,
+                    byteArraySerializer, byteArrayDeserializer, 
byteArraySerializer, byteArrayDeserializer,
+                    recordCollector);
+
+            WindowStore<Integer, String> store = createWindowStore(context, 
serdes);
+            try {
+                context.setTime(startTime);
+                store.put(0, "zero");
+                context.setTime(startTime + incr);
+                store.put(1, "one");
+                context.setTime(startTime + incr * 2);
+                store.put(2, "two");
+                context.setTime(startTime + incr * 3);
+                store.put(3, "three");
+                context.setTime(startTime + incr * 4);
+                store.put(4, "four");
+                context.setTime(startTime + incr * 5);
+                store.put(5, "five");
+                context.setTime(startTime + incr * 6);
+                store.put(6, "six");
+                context.setTime(startTime + incr * 7);
+                store.put(7, "seven");
+                context.setTime(startTime + incr * 8);
+                store.put(8, "eight");
+                store.flush();
+
+            } finally {
+                store.close();
+            }
+
+
+        } finally {
+            Utils.delete(baseDir);
+        }
+
+        File baseDir2 = Files.createTempDirectory("test").toFile();
+        try {
+            Producer<byte[], byte[]> producer = new MockProducer<>(true, 
byteArraySerializer, byteArraySerializer);
+            RecordCollector recordCollector = new RecordCollector(producer) {
+                @Override
+                public <K1, V1> void send(ProducerRecord<K1, V1> record, 
Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
+                    changeLog.add(new Entry<>(
+                                    keySerializer.serialize(record.topic(), 
record.key()),
+                                    valueSerializer.serialize(record.topic(), 
record.value()))
+                    );
+                }
+            };
+
+            MockProcessorContext context = new MockProcessorContext(
+                    null, baseDir,
+                    byteArraySerializer, byteArrayDeserializer, 
byteArraySerializer, byteArrayDeserializer,
+                    recordCollector);
+
+            WindowStore<Integer, String> store = createWindowStore(context, 
serdes);
+            RocksDBWindowStore<Integer, String> inner =
+                    (RocksDBWindowStore<Integer, String>) 
((MeteredWindowStore<Integer, String>) store).inner();
+
+            try {
+                context.restore("window", changeLog);
+
+                assertEquals(Utils.mkSet(4L, 5L, 6L), inner.segmentIds());
+
+                assertEquals(Utils.mkList(), toList(store.fetch(0, startTime - 
windowSize, startTime + windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + 
incr - windowSize, startTime + incr + windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 
incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + 
incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
+                assertEquals(Utils.mkList("four"), toList(store.fetch(4, 
startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
+                assertEquals(Utils.mkList("five"), toList(store.fetch(5, 
startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
+                assertEquals(Utils.mkList("six"), toList(store.fetch(6, 
startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize)));
+                assertEquals(Utils.mkList("seven"), toList(store.fetch(7, 
startTime + incr * 7 - windowSize, startTime + incr * 7 + windowSize)));
+                assertEquals(Utils.mkList("eight"), toList(store.fetch(8, 
startTime + incr * 8 - windowSize, startTime + incr * 8 + windowSize)));
+
+                // check segment directories
+                store.flush();
+                assertEquals(
+                        Utils.mkSet(inner.directorySuffix(4L), 
inner.directorySuffix(5L), inner.directorySuffix(6L)),
+                        segmentDirs(baseDir)
+                );
+            } finally {
+                store.close();
+            }
+
+
+        } finally {
+            Utils.delete(baseDir2);
+        }
+    }
+
+    private <E> List<E> toList(WindowStoreIterator<E> iterator) {
+        ArrayList<E> list = new ArrayList<>();
+        while (iterator.hasNext()) {
+            list.add(iterator.next().value);
+        }
+        return list;
+    }
+
+    private Set<String> segmentDirs(File baseDir) {
+        File rocksDbDir = new File(baseDir, "rocksdb");
+        String[] subdirs = rocksDbDir.list();
+
+        HashSet<String> set = new HashSet<>();
+
+        for (String subdir : subdirs) {
+            if (subdir.startsWith("window-"))
+            set.add(subdir.substring(7));
+        }
+        return set;
+    }
+
+    private Map<Integer, Set<String>> entriesByKey(List<Entry<byte[], byte[]>> 
changeLog, long startTime) {
+        HashMap<Integer, Set<String>> entriesByKey = new HashMap<>();
+
+        for (Entry<byte[], byte[]> entry : changeLog) {
+            long timestamp = 
WindowStoreUtil.timestampFromBinaryKey(entry.key());
+            Integer key = WindowStoreUtil.keyFromBinaryKey(entry.key(), 
serdes);
+            String value = entry.value() == null ? null : 
serdes.valueFrom(entry.value());
+
+            Set<String> entries = entriesByKey.get(key);
+            if (entries == null) {
+                entries = new HashSet<>();
+                entriesByKey.put(key, entries);
+            }
+            entries.add(value + "@" + (timestamp - startTime));
+        }
+
+        return entriesByKey;
+    }
+}

Reply via email to