Repository: kafka Updated Branches: refs/heads/trunk e4ef8e664 -> 959cf09e8
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java new file mode 100644 index 0000000..80ad67f --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java @@ -0,0 +1,676 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.clients.producer.MockProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStoreSupplier; +import org.apache.kafka.streams.processor.internals.RecordCollector; +import org.apache.kafka.streams.state.Entry; +import org.apache.kafka.streams.state.Serdes; +import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.streams.state.WindowStoreIterator; +import org.apache.kafka.streams.state.WindowStoreUtil; +import org.apache.kafka.test.MockProcessorContext; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +public class RocksDBWindowStoreTest { + + private final ByteArraySerializer byteArraySerializer = new ByteArraySerializer(); + private final ByteArrayDeserializer byteArrayDeserializer = new ByteArrayDeserializer(); + private final int numSegments = 3; + private final long segmentSize = RocksDBWindowStore.MIN_SEGMENT_INTERVAL; + private final long retentionPeriod = segmentSize * (numSegments - 1); + private final long windowSize = 3; + private final Serdes<Integer, String> serdes = Serdes.withBuiltinTypes("", Integer.class, String.class); + + protected <K, V> WindowStore<K, V> createWindowStore(ProcessorContext context, Serdes<K, V> serdes) { + StateStoreSupplier supplier = new RocksDBWindowStoreSupplier<>("window", retentionPeriod, numSegments, true, serdes, null); + WindowStore<K, V> store = (WindowStore<K, V>) supplier.get(); + store.init(context); + return store; + } + + @Test + public void testPutAndFetch() throws IOException { + File baseDir = Files.createTempDirectory("test").toFile(); + try { + final List<Entry<byte[], byte[]>> changeLog = new ArrayList<>(); + Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer); + RecordCollector recordCollector = new RecordCollector(producer) { + @Override + public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) { + changeLog.add(new Entry<>( + keySerializer.serialize(record.topic(), record.key()), + valueSerializer.serialize(record.topic(), record.value())) + ); + } + }; + + MockProcessorContext context = new MockProcessorContext( + null, baseDir, + byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer, + recordCollector); + + WindowStore<Integer, String> store = createWindowStore(context, serdes); + try { + long startTime = segmentSize - 4L; + + context.setTime(startTime + 0L); + store.put(0, "zero"); + context.setTime(startTime + 1L); + store.put(1, "one"); + context.setTime(startTime + 2L); + store.put(2, "two"); + context.setTime(startTime + 3L); + // (3, "three") is not put + context.setTime(startTime + 4L); + store.put(4, "four"); + context.setTime(startTime + 5L); + store.put(5, "five"); + + assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime + 0L - windowSize, startTime + 0L + windowSize))); + assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + 1L - windowSize, startTime + 1L + windowSize))); + assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L - windowSize, startTime + 2L + windowSize))); + assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + 3L - windowSize, startTime + 3L + windowSize))); + assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + 4L - windowSize, startTime + 4L + windowSize))); + assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + 5L - windowSize, startTime + 5L + windowSize))); + + context.setTime(startTime + 3L); + store.put(2, "two+1"); + context.setTime(startTime + 4L); + store.put(2, "two+2"); + context.setTime(startTime + 5L); + store.put(2, "two+3"); + context.setTime(startTime + 6L); + store.put(2, "two+4"); + context.setTime(startTime + 7L); + store.put(2, "two+5"); + context.setTime(startTime + 8L); + store.put(2, "two+6"); + + assertEquals(Utils.mkList(), toList(store.fetch(2, startTime - 2L - windowSize, startTime - 2L + windowSize))); + assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime - 1L - windowSize, startTime - 1L + windowSize))); + assertEquals(Utils.mkList("two", "two+1"), toList(store.fetch(2, startTime - windowSize, startTime + windowSize))); + assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(store.fetch(2, startTime + 1L - windowSize, startTime + 1L + windowSize))); + assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(store.fetch(2, startTime + 2L - windowSize, startTime + 2L + windowSize))); + assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4"), toList(store.fetch(2, startTime + 3L - windowSize, startTime + 3L + windowSize))); + assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4", "two+5"), toList(store.fetch(2, startTime + 4L - windowSize, startTime + 4L + windowSize))); + assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 5L - windowSize, startTime + 5L + windowSize))); + assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 6L - windowSize, startTime + 6L + windowSize))); + assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 7L - windowSize, startTime + 7L + windowSize))); + assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 8L - windowSize, startTime + 8L + windowSize))); + assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 9L - windowSize, startTime + 9L + windowSize))); + assertEquals(Utils.mkList("two+5", "two+6"), toList(store.fetch(2, startTime + 10L - windowSize, startTime + 10L + windowSize))); + assertEquals(Utils.mkList("two+6"), toList(store.fetch(2, startTime + 11L - windowSize, startTime + 11L + windowSize))); + assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 12L - windowSize, startTime + 12L + windowSize))); + + // Flush the store and verify all current entries were properly flushed ... + store.flush(); + + Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog, startTime); + + assertEquals(Utils.mkSet("zero@0"), entriesByKey.get(0)); + assertEquals(Utils.mkSet("one@1"), entriesByKey.get(1)); + assertEquals(Utils.mkSet("two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"), entriesByKey.get(2)); + assertNull(entriesByKey.get(3)); + assertEquals(Utils.mkSet("four@4"), entriesByKey.get(4)); + assertEquals(Utils.mkSet("five@5"), entriesByKey.get(5)); + assertNull(entriesByKey.get(6)); + + } finally { + store.close(); + } + + } finally { + Utils.delete(baseDir); + } + } + + @Test + public void testPutAndFetchBefore() throws IOException { + File baseDir = Files.createTempDirectory("test").toFile(); + try { + final List<Entry<byte[], byte[]>> changeLog = new ArrayList<>(); + Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer); + RecordCollector recordCollector = new RecordCollector(producer) { + @Override + public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) { + changeLog.add(new Entry<>( + keySerializer.serialize(record.topic(), record.key()), + valueSerializer.serialize(record.topic(), record.value())) + ); + } + }; + + MockProcessorContext context = new MockProcessorContext( + null, baseDir, + byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer, + recordCollector); + + WindowStore<Integer, String> store = createWindowStore(context, serdes); + try { + long startTime = segmentSize - 4L; + + context.setTime(startTime + 0L); + store.put(0, "zero"); + context.setTime(startTime + 1L); + store.put(1, "one"); + context.setTime(startTime + 2L); + store.put(2, "two"); + context.setTime(startTime + 3L); + // (3, "three") is not put + context.setTime(startTime + 4L); + store.put(4, "four"); + context.setTime(startTime + 5L); + store.put(5, "five"); + + assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime + 0L - windowSize, startTime + 0L))); + assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + 1L - windowSize, startTime + 1L))); + assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L - windowSize, startTime + 2L))); + assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + 3L - windowSize, startTime + 3L))); + assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + 4L - windowSize, startTime + 4L))); + assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + 5L - windowSize, startTime + 5L))); + + context.setTime(startTime + 3L); + store.put(2, "two+1"); + context.setTime(startTime + 4L); + store.put(2, "two+2"); + context.setTime(startTime + 5L); + store.put(2, "two+3"); + context.setTime(startTime + 6L); + store.put(2, "two+4"); + context.setTime(startTime + 7L); + store.put(2, "two+5"); + context.setTime(startTime + 8L); + store.put(2, "two+6"); + + assertEquals(Utils.mkList(), toList(store.fetch(2, startTime - 1L - windowSize, startTime - 1L))); + assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 0L - windowSize, startTime + 0L))); + assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 1L - windowSize, startTime + 1L))); + assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L - windowSize, startTime + 2L))); + assertEquals(Utils.mkList("two", "two+1"), toList(store.fetch(2, startTime + 3L - windowSize, startTime + 3L))); + assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(store.fetch(2, startTime + 4L - windowSize, startTime + 4L))); + assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(store.fetch(2, startTime + 5L - windowSize, startTime + 5L))); + assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4"), toList(store.fetch(2, startTime + 6L - windowSize, startTime + 6L))); + assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5"), toList(store.fetch(2, startTime + 7L - windowSize, startTime + 7L))); + assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 8L - windowSize, startTime + 8L))); + assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 9L - windowSize, startTime + 9L))); + assertEquals(Utils.mkList("two+5", "two+6"), toList(store.fetch(2, startTime + 10L - windowSize, startTime + 10L))); + assertEquals(Utils.mkList("two+6"), toList(store.fetch(2, startTime + 11L - windowSize, startTime + 11L))); + assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 12L - windowSize, startTime + 12L))); + assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 13L - windowSize, startTime + 13L))); + + // Flush the store and verify all current entries were properly flushed ... + store.flush(); + + Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog, startTime); + + assertEquals(Utils.mkSet("zero@0"), entriesByKey.get(0)); + assertEquals(Utils.mkSet("one@1"), entriesByKey.get(1)); + assertEquals(Utils.mkSet("two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"), entriesByKey.get(2)); + assertNull(entriesByKey.get(3)); + assertEquals(Utils.mkSet("four@4"), entriesByKey.get(4)); + assertEquals(Utils.mkSet("five@5"), entriesByKey.get(5)); + assertNull(entriesByKey.get(6)); + + } finally { + store.close(); + } + + } finally { + Utils.delete(baseDir); + } + } + + @Test + public void testPutAndFetchAfter() throws IOException { + File baseDir = Files.createTempDirectory("test").toFile(); + try { + final List<Entry<byte[], byte[]>> changeLog = new ArrayList<>(); + Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer); + RecordCollector recordCollector = new RecordCollector(producer) { + @Override + public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) { + changeLog.add(new Entry<>( + keySerializer.serialize(record.topic(), record.key()), + valueSerializer.serialize(record.topic(), record.value())) + ); + } + }; + + MockProcessorContext context = new MockProcessorContext( + null, baseDir, + byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer, + recordCollector); + + WindowStore<Integer, String> store = createWindowStore(context, serdes); + try { + long startTime = segmentSize - 4L; + + context.setTime(startTime + 0L); + store.put(0, "zero"); + context.setTime(startTime + 1L); + store.put(1, "one"); + context.setTime(startTime + 2L); + store.put(2, "two"); + context.setTime(startTime + 3L); + // (3, "three") is not put + context.setTime(startTime + 4L); + store.put(4, "four"); + context.setTime(startTime + 5L); + store.put(5, "five"); + + assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime + 0L, startTime + 0L + windowSize))); + assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + 1L, startTime + 1L + windowSize))); + assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L, startTime + 2L + windowSize))); + assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + 3L, startTime + 3L + windowSize))); + assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + 4L, startTime + 4L + windowSize))); + assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + 5L, startTime + 5L + windowSize))); + + context.setTime(startTime + 3L); + store.put(2, "two+1"); + context.setTime(startTime + 4L); + store.put(2, "two+2"); + context.setTime(startTime + 5L); + store.put(2, "two+3"); + context.setTime(startTime + 6L); + store.put(2, "two+4"); + context.setTime(startTime + 7L); + store.put(2, "two+5"); + context.setTime(startTime + 8L); + store.put(2, "two+6"); + + assertEquals(Utils.mkList(), toList(store.fetch(2, startTime - 2L, startTime - 2L + windowSize))); + assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime - 1L, startTime - 1L + windowSize))); + assertEquals(Utils.mkList("two", "two+1"), toList(store.fetch(2, startTime, startTime + windowSize))); + assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(store.fetch(2, startTime + 1L, startTime + 1L + windowSize))); + assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(store.fetch(2, startTime + 2L, startTime + 2L + windowSize))); + assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4"), toList(store.fetch(2, startTime + 3L, startTime + 3L + windowSize))); + assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5"), toList(store.fetch(2, startTime + 4L, startTime + 4L + windowSize))); + assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 5L, startTime + 5L + windowSize))); + assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 6L, startTime + 6L + windowSize))); + assertEquals(Utils.mkList("two+5", "two+6"), toList(store.fetch(2, startTime + 7L, startTime + 7L + windowSize))); + assertEquals(Utils.mkList("two+6"), toList(store.fetch(2, startTime + 8L, startTime + 8L + windowSize))); + assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 9L, startTime + 9L + windowSize))); + assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 10L, startTime + 10L + windowSize))); + assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 11L, startTime + 11L + windowSize))); + assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 12L, startTime + 12L + windowSize))); + + // Flush the store and verify all current entries were properly flushed ... + store.flush(); + + Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog, startTime); + + assertEquals(Utils.mkSet("zero@0"), entriesByKey.get(0)); + assertEquals(Utils.mkSet("one@1"), entriesByKey.get(1)); + assertEquals(Utils.mkSet("two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"), entriesByKey.get(2)); + assertNull(entriesByKey.get(3)); + assertEquals(Utils.mkSet("four@4"), entriesByKey.get(4)); + assertEquals(Utils.mkSet("five@5"), entriesByKey.get(5)); + assertNull(entriesByKey.get(6)); + + } finally { + store.close(); + } + + } finally { + Utils.delete(baseDir); + } + } + + @Test + public void testPutSameKeyTimestamp() throws IOException { + File baseDir = Files.createTempDirectory("test").toFile(); + try { + final List<Entry<byte[], byte[]>> changeLog = new ArrayList<>(); + Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer); + RecordCollector recordCollector = new RecordCollector(producer) { + @Override + public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) { + changeLog.add(new Entry<>( + keySerializer.serialize(record.topic(), record.key()), + valueSerializer.serialize(record.topic(), record.value())) + ); + } + }; + + MockProcessorContext context = new MockProcessorContext( + null, baseDir, + byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer, + recordCollector); + + WindowStore<Integer, String> store = createWindowStore(context, serdes); + try { + long startTime = segmentSize - 4L; + + context.setTime(startTime); + store.put(0, "zero"); + + assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime - windowSize, startTime + windowSize))); + + context.setTime(startTime); + store.put(0, "zero"); + context.setTime(startTime); + store.put(0, "zero+"); + context.setTime(startTime); + store.put(0, "zero++"); + + assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0, startTime - windowSize, startTime + windowSize))); + assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0, startTime + 1L - windowSize, startTime + 1L + windowSize))); + assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0, startTime + 2L - windowSize, startTime + 2L + windowSize))); + assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0, startTime + 3L - windowSize, startTime + 3L + windowSize))); + assertEquals(Utils.mkList(), toList(store.fetch(0, startTime + 4L - windowSize, startTime + 4L + windowSize))); + + // Flush the store and verify all current entries were properly flushed ... + store.flush(); + + Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog, startTime); + + assertEquals(Utils.mkSet("zero@0", "zero@0", "zero+@0", "zero++@0"), entriesByKey.get(0)); + + } finally { + store.close(); + } + + } finally { + Utils.delete(baseDir); + } + } + + @Test + public void testRolling() throws IOException { + File baseDir = Files.createTempDirectory("test").toFile(); + try { + final List<Entry<byte[], byte[]>> changeLog = new ArrayList<>(); + Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer); + RecordCollector recordCollector = new RecordCollector(producer) { + @Override + public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) { + changeLog.add(new Entry<>( + keySerializer.serialize(record.topic(), record.key()), + valueSerializer.serialize(record.topic(), record.value())) + ); + } + }; + + MockProcessorContext context = new MockProcessorContext( + null, baseDir, + byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer, + recordCollector); + + WindowStore<Integer, String> store = createWindowStore(context, serdes); + RocksDBWindowStore<Integer, String> inner = + (RocksDBWindowStore<Integer, String>) ((MeteredWindowStore<Integer, String>) store).inner(); + try { + long startTime = segmentSize * 2; + long incr = segmentSize / 2; + + context.setTime(startTime); + store.put(0, "zero"); + assertEquals(Utils.mkSet(2L), inner.segmentIds()); + + context.setTime(startTime + incr); + store.put(1, "one"); + assertEquals(Utils.mkSet(2L), inner.segmentIds()); + + context.setTime(startTime + incr * 2); + store.put(2, "two"); + assertEquals(Utils.mkSet(2L, 3L), inner.segmentIds()); + + context.setTime(startTime + incr * 3); + // (3, "three") is not put + assertEquals(Utils.mkSet(2L, 3L), inner.segmentIds()); + + context.setTime(startTime + incr * 4); + store.put(4, "four"); + assertEquals(Utils.mkSet(2L, 3L, 4L), inner.segmentIds()); + + context.setTime(startTime + incr * 5); + store.put(5, "five"); + assertEquals(Utils.mkSet(2L, 3L, 4L), inner.segmentIds()); + + assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime - windowSize, startTime + windowSize))); + assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize))); + assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize))); + assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize))); + assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize))); + assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize))); + + context.setTime(startTime + incr * 6); + store.put(6, "six"); + assertEquals(Utils.mkSet(3L, 4L, 5L), inner.segmentIds()); + + assertEquals(Utils.mkList(), toList(store.fetch(0, startTime - windowSize, startTime + windowSize))); + assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize))); + assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize))); + assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize))); + assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize))); + assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize))); + assertEquals(Utils.mkList("six"), toList(store.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize))); + + + context.setTime(startTime + incr * 7); + store.put(7, "seven"); + assertEquals(Utils.mkSet(3L, 4L, 5L), inner.segmentIds()); + + assertEquals(Utils.mkList(), toList(store.fetch(0, startTime - windowSize, startTime + windowSize))); + assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize))); + assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize))); + assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize))); + assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize))); + assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize))); + assertEquals(Utils.mkList("six"), toList(store.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize))); + assertEquals(Utils.mkList("seven"), toList(store.fetch(7, startTime + incr * 7 - windowSize, startTime + incr * 7 + windowSize))); + + context.setTime(startTime + incr * 8); + store.put(8, "eight"); + assertEquals(Utils.mkSet(4L, 5L, 6L), inner.segmentIds()); + + assertEquals(Utils.mkList(), toList(store.fetch(0, startTime - windowSize, startTime + windowSize))); + assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize))); + assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize))); + assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize))); + assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize))); + assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize))); + assertEquals(Utils.mkList("six"), toList(store.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize))); + assertEquals(Utils.mkList("seven"), toList(store.fetch(7, startTime + incr * 7 - windowSize, startTime + incr * 7 + windowSize))); + assertEquals(Utils.mkList("eight"), toList(store.fetch(8, startTime + incr * 8 - windowSize, startTime + incr * 8 + windowSize))); + + // check segment directories + store.flush(); + assertEquals( + Utils.mkSet(inner.directorySuffix(4L), inner.directorySuffix(5L), inner.directorySuffix(6L)), + segmentDirs(baseDir) + ); + } finally { + store.close(); + } + + } finally { + Utils.delete(baseDir); + } + } + + @Test + public void testRestore() throws IOException { + final List<Entry<byte[], byte[]>> changeLog = new ArrayList<>(); + long startTime = segmentSize * 2; + long incr = segmentSize / 2; + + File baseDir = Files.createTempDirectory("test").toFile(); + try { + Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer); + RecordCollector recordCollector = new RecordCollector(producer) { + @Override + public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) { + changeLog.add(new Entry<>( + keySerializer.serialize(record.topic(), record.key()), + valueSerializer.serialize(record.topic(), record.value())) + ); + } + }; + + MockProcessorContext context = new MockProcessorContext( + null, baseDir, + byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer, + recordCollector); + + WindowStore<Integer, String> store = createWindowStore(context, serdes); + try { + context.setTime(startTime); + store.put(0, "zero"); + context.setTime(startTime + incr); + store.put(1, "one"); + context.setTime(startTime + incr * 2); + store.put(2, "two"); + context.setTime(startTime + incr * 3); + store.put(3, "three"); + context.setTime(startTime + incr * 4); + store.put(4, "four"); + context.setTime(startTime + incr * 5); + store.put(5, "five"); + context.setTime(startTime + incr * 6); + store.put(6, "six"); + context.setTime(startTime + incr * 7); + store.put(7, "seven"); + context.setTime(startTime + incr * 8); + store.put(8, "eight"); + store.flush(); + + } finally { + store.close(); + } + + + } finally { + Utils.delete(baseDir); + } + + File baseDir2 = Files.createTempDirectory("test").toFile(); + try { + Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer); + RecordCollector recordCollector = new RecordCollector(producer) { + @Override + public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) { + changeLog.add(new Entry<>( + keySerializer.serialize(record.topic(), record.key()), + valueSerializer.serialize(record.topic(), record.value())) + ); + } + }; + + MockProcessorContext context = new MockProcessorContext( + null, baseDir, + byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer, + recordCollector); + + WindowStore<Integer, String> store = createWindowStore(context, serdes); + RocksDBWindowStore<Integer, String> inner = + (RocksDBWindowStore<Integer, String>) ((MeteredWindowStore<Integer, String>) store).inner(); + + try { + context.restore("window", changeLog); + + assertEquals(Utils.mkSet(4L, 5L, 6L), inner.segmentIds()); + + assertEquals(Utils.mkList(), toList(store.fetch(0, startTime - windowSize, startTime + windowSize))); + assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize))); + assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize))); + assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize))); + assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize))); + assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize))); + assertEquals(Utils.mkList("six"), toList(store.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize))); + assertEquals(Utils.mkList("seven"), toList(store.fetch(7, startTime + incr * 7 - windowSize, startTime + incr * 7 + windowSize))); + assertEquals(Utils.mkList("eight"), toList(store.fetch(8, startTime + incr * 8 - windowSize, startTime + incr * 8 + windowSize))); + + // check segment directories + store.flush(); + assertEquals( + Utils.mkSet(inner.directorySuffix(4L), inner.directorySuffix(5L), inner.directorySuffix(6L)), + segmentDirs(baseDir) + ); + } finally { + store.close(); + } + + + } finally { + Utils.delete(baseDir2); + } + } + + private <E> List<E> toList(WindowStoreIterator<E> iterator) { + ArrayList<E> list = new ArrayList<>(); + while (iterator.hasNext()) { + list.add(iterator.next().value); + } + return list; + } + + private Set<String> segmentDirs(File baseDir) { + File rocksDbDir = new File(baseDir, "rocksdb"); + String[] subdirs = rocksDbDir.list(); + + HashSet<String> set = new HashSet<>(); + + for (String subdir : subdirs) { + if (subdir.startsWith("window-")) + set.add(subdir.substring(7)); + } + return set; + } + + private Map<Integer, Set<String>> entriesByKey(List<Entry<byte[], byte[]>> changeLog, long startTime) { + HashMap<Integer, Set<String>> entriesByKey = new HashMap<>(); + + for (Entry<byte[], byte[]> entry : changeLog) { + long timestamp = WindowStoreUtil.timestampFromBinaryKey(entry.key()); + Integer key = WindowStoreUtil.keyFromBinaryKey(entry.key(), serdes); + String value = entry.value() == null ? null : serdes.valueFrom(entry.value()); + + Set<String> entries = entriesByKey.get(key); + if (entries == null) { + entries = new HashSet<>(); + entriesByKey.put(key, entries); + } + entries.add(value + "@" + (timestamp - startTime)); + } + + return entriesByKey; + } +}
