Repository: kafka Updated Branches: refs/heads/0.10.2 c9b9acf6a -> 6f72a5a53
http://git-wip-us.apache.org/repos/asf/kafka/blob/6f72a5a5/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java index 84f1734..abaaffd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.state.WindowStore; @@ -58,23 +59,35 @@ public class RocksDBWindowStoreSupplier<K, V> extends AbstractStoreSupplier<K, V } public WindowStore get() { - final RocksDBSegmentedBytesStore bytesStore = new RocksDBSegmentedBytesStore(name, retentionPeriod, numSegments, new WindowStoreKeySchema()); - if (!enableCaching) { - final RocksDBWindowStore<K, V> segmentedStore = new RocksDBWindowStore<>(name, retainDuplicates, keySerde, valueSerde, - logged ? new ChangeLoggingSegmentedBytesStore(bytesStore) - : bytesStore); - return new MeteredWindowStore<>(segmentedStore, "rocksdb-window", time); - } + return maybeWrapCaching( + maybeWrapLogged( + new RocksDBSegmentedBytesStore( + name, + retentionPeriod, + numSegments, + new WindowStoreKeySchema() + ))); - return new CachingWindowStore<>(new MeteredSegmentedBytesStore(logged ? new ChangeLoggingSegmentedBytesStore(bytesStore) - : bytesStore, - "rocksdb-window", - time), - keySerde, valueSerde, windowSize); } @Override public long retentionPeriod() { return retentionPeriod; } + + private SegmentedBytesStore maybeWrapLogged(final SegmentedBytesStore inner) { + if (!logged) { + return inner; + } + return new ChangeLoggingSegmentedBytesStore(inner); + } + + private WindowStore<K, V> maybeWrapCaching(final SegmentedBytesStore inner) { + final MeteredSegmentedBytesStore metered = new MeteredSegmentedBytesStore(inner, "rocksdb-window", time); + if (!enableCaching) { + return new RocksDBWindowStore<>(metered, keySerde, valueSerde, retainDuplicates); + } + final RocksDBWindowStore<Bytes, byte[]> windowed = RocksDBWindowStore.bytesStore(metered, retainDuplicates); + return new CachingWindowStore<>(windowed, keySerde, valueSerde, windowSize); + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/6f72a5a5/streams/src/main/java/org/apache/kafka/streams/state/internals/SerializedKeyValueIterator.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SerializedKeyValueIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SerializedKeyValueIterator.java new file mode 100644 index 0000000..d76e8a4 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SerializedKeyValueIterator.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.StateSerdes; + +import java.util.NoSuchElementException; + +class SerializedKeyValueIterator<K, V> implements KeyValueIterator<K, V> { + + private final KeyValueIterator<Bytes, byte[]> bytesIterator; + private final StateSerdes<K, V> serdes; + + SerializedKeyValueIterator(final KeyValueIterator<Bytes, byte[]> bytesIterator, + final StateSerdes<K, V> serdes) { + + this.bytesIterator = bytesIterator; + this.serdes = serdes; + } + + @Override + public void close() { + bytesIterator.close(); + } + + @Override + public K peekNextKey() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + final Bytes bytes = bytesIterator.peekNextKey(); + return serdes.keyFrom(bytes.get()); + } + + @Override + public boolean hasNext() { + return bytesIterator.hasNext(); + } + + @Override + public KeyValue<K, V> next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + final KeyValue<Bytes, byte[]> next = bytesIterator.next(); + return KeyValue.pair(serdes.keyFrom(next.key.get()), serdes.valueFrom(next.value)); + } + + @Override + public void remove() { + throw new UnsupportedOperationException("remove not supported by SerializedKeyValueIterator"); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/6f72a5a5/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java index 1ea6bef..074cf8a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java @@ -39,7 +39,10 @@ public class WindowStoreUtils { public static <K> byte[] toBinaryKey(K key, final long timestamp, final int seqnum, StateSerdes<K, ?> serdes) { byte[] serializedKey = serdes.rawKey(key); + return toBinaryKey(serializedKey, timestamp, seqnum); + } + static byte[] toBinaryKey(byte[] serializedKey, final long timestamp, final int seqnum) { ByteBuffer buf = ByteBuffer.allocate(serializedKey.length + TIMESTAMP_SIZE + SEQNUM_SIZE); buf.put(serializedKey); buf.putLong(timestamp); http://git-wip-us.apache.org/repos/asf/kafka/blob/6f72a5a5/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java new file mode 100644 index 0000000..3d80b98 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.streams.errors.InvalidStateStoreException; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; + +/** + * A storage engine wrapper for utilities like logging, caching, and metering. + */ +interface WrappedStateStore extends StateStore { + + /** + * Return the inner storage engine + * + * @return wrapped inner storage engine + */ + StateStore inner(); + + abstract class AbstractWrappedStateStore implements WrappedStateStore { + final StateStore innerState; + + AbstractWrappedStateStore(StateStore inner) { + this.innerState = inner; + } + + @Override + public void init(ProcessorContext context, StateStore root) { + innerState.init(context, root); + } + + @Override + public String name() { + return innerState.name(); + } + + @Override + public boolean persistent() { + return innerState.persistent(); + } + + @Override + public boolean isOpen() { + return innerState.isOpen(); + } + + void validateStoreOpen() { + if (!innerState.isOpen()) { + throw new InvalidStateStoreException("Store " + innerState.name() + " is currently closed."); + } + } + + @Override + public StateStore inner() { + if (innerState instanceof WrappedStateStore) { + return ((WrappedStateStore) innerState).inner(); + } + return innerState; + } + + @Override + public void flush() { + innerState.flush(); + } + + @Override + public void close() { + innerState.close(); + } + } + + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/6f72a5a5/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java index 729e190..b6d8a97 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.Aggregator; import org.apache.kafka.streams.kstream.ForeachAction; import org.apache.kafka.streams.kstream.Initializer; @@ -40,9 +41,14 @@ import org.apache.kafka.test.TestUtils; import org.junit.Before; import org.junit.Test; +import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; public class KGroupedStreamImplTest { @@ -337,4 +343,38 @@ public class KGroupedStreamImplTest { public void shouldNotAcceptNullStoreStoreSupplierNameWhenCountingSessionWindows() throws Exception { groupedStream.count(SessionWindows.with(90), (StateStoreSupplier<SessionStore>) null); } -} + + @Test + public void shouldCountWindowed() throws Exception { + final List<KeyValue<Windowed<String>, Long>> results = new ArrayList<>(); + groupedStream.count( + TimeWindows.of(500L), + "aggregate-by-key-windowed") + .foreach(new ForeachAction<Windowed<String>, Long>() { + @Override + public void apply(final Windowed<String> key, final Long value) { + results.add(KeyValue.pair(key, value)); + } + }); + + final KStreamTestDriver driver = new KStreamTestDriver(builder, TestUtils.tempDirectory(), 0); + driver.setTime(0); + driver.process(TOPIC, "1", "A"); + driver.process(TOPIC, "2", "B"); + driver.process(TOPIC, "3", "C"); + driver.setTime(500); + driver.process(TOPIC, "1", "A"); + driver.process(TOPIC, "1", "A"); + driver.process(TOPIC, "2", "B"); + driver.process(TOPIC, "2", "B"); + assertThat(results, equalTo(Arrays.asList( + KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), 1L), + KeyValue.pair(new Windowed<>("2", new TimeWindow(0, 500)), 1L), + KeyValue.pair(new Windowed<>("3", new TimeWindow(0, 500)), 1L), + KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), 1L), + KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), 2L), + KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), 1L), + KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), 2L) + ))); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/6f72a5a5/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java index c603aa0..5035f70 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java @@ -59,7 +59,8 @@ public class CachingSessionStoreTest { @Before public void setUp() throws Exception { underlying = new RocksDBSegmentedBytesStore("test", 60000, 3, new SessionKeySchema()); - cachingStore = new CachingSessionStore<>(underlying, + final RocksDBSessionStore<Bytes, byte[]> sessionStore = new RocksDBSessionStore<>(underlying, Serdes.Bytes(), Serdes.ByteArray()); + cachingStore = new CachingSessionStore<>(sessionStore, Serdes.String(), Serdes.Long()); cache = new ThreadCache("testCache", MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics())); http://git-wip-us.apache.org/repos/asf/kafka/blob/6f72a5a5/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java index 37fc9a0..1de1002 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java @@ -54,13 +54,15 @@ public class CachingWindowStoreTest { private String topic; private static final long DEFAULT_TIMESTAMP = 10L; private WindowStoreKeySchema keySchema; + private RocksDBWindowStore<Bytes, byte[]> windowStore; @Before public void setUp() throws Exception { keySchema = new WindowStoreKeySchema(); underlying = new RocksDBSegmentedBytesStore("test", 30000, 3, keySchema); + windowStore = new RocksDBWindowStore<>(underlying, Serdes.Bytes(), Serdes.ByteArray(), false); cacheListener = new CachingKeyValueStoreTest.CacheFlushListenerStub<>(); - cachingStore = new CachingWindowStore<>(underlying, + cachingStore = new CachingWindowStore<>(windowStore, Serdes.String(), Serdes.String(), WINDOW_SIZE); @@ -72,6 +74,7 @@ public class CachingWindowStoreTest { cachingStore.init(context, cachingStore); } + @Test public void shouldPutFetchFromCache() throws Exception { cachingStore.put("a", "a"); http://git-wip-us.apache.org/repos/asf/kafka/blob/6f72a5a5/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java new file mode 100644 index 0000000..82fb831 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; +import org.apache.kafka.test.InMemoryKeyValueStore; +import org.apache.kafka.test.MockProcessorContext; +import org.apache.kafka.test.NoOpRecordCollector; +import org.apache.kafka.test.TestUtils; +import org.junit.Before; +import org.junit.Test; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; + +public class ChangeLoggingKeyValueBytesStoreTest { + + private final InMemoryKeyValueStore<Bytes, byte[]> inner = new InMemoryKeyValueStore<>("kv"); + private final ChangeLoggingKeyValueBytesStore store = new ChangeLoggingKeyValueBytesStore(inner); + private final Map sent = new HashMap<>(); + private final Bytes hi = Bytes.wrap("hi".getBytes()); + private final Bytes hello = Bytes.wrap("hello".getBytes()); + private final byte[] there = "there".getBytes(); + private final byte[] world = "world".getBytes(); + + @Before + public void before() { + final NoOpRecordCollector collector = new NoOpRecordCollector() { + @Override + public <K, V> void send(final ProducerRecord<K, V> record, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) { + sent.put(record.key(), record.value()); + } + }; + final MockProcessorContext context = new MockProcessorContext(null, + TestUtils.tempDirectory(), + Serdes.String(), + Serdes.Long(), + collector, + new ThreadCache("testCache", 0, new MockStreamsMetrics(new Metrics()))); + context.setTime(0); + store.init(context, store); + } + + @Test + public void shouldWriteKeyValueBytesToInnerStoreOnPut() throws Exception { + store.put(hi, there); + assertThat(inner.get(hi), equalTo(there)); + } + + @Test + public void shouldLogChangeOnPut() throws Exception { + store.put(hi, there); + assertThat((byte[]) sent.get(hi), equalTo(there)); + } + + @Test + public void shouldWriteAllKeyValueToInnerStoreOnPutAll() throws Exception { + store.putAll(Arrays.asList(KeyValue.pair(hi, there), + KeyValue.pair(hello, world))); + assertThat(inner.get(hi), equalTo(there)); + assertThat(inner.get(hello), equalTo(world)); + } + + @Test + public void shouldLogChangesOnPutAll() throws Exception { + store.putAll(Arrays.asList(KeyValue.pair(hi, there), + KeyValue.pair(hello, world))); + assertThat((byte[]) sent.get(hi), equalTo(there)); + assertThat((byte[]) sent.get(hello), equalTo(world)); + } + + @Test + public void shouldPutNullOnDelete() throws Exception { + store.put(hi, there); + store.delete(hi); + assertThat(inner.get(hi), nullValue()); + } + + @Test + public void shouldReturnOldValueOnDelete() throws Exception { + store.put(hi, there); + assertThat(store.delete(hi), equalTo(there)); + } + + @Test + public void shouldLogKeyNullOnDelete() throws Exception { + store.put(hi, there); + store.delete(hi); + assertThat(sent.get(hi), nullValue()); + } + + @Test + public void shouldWriteToInnerOnPutIfAbsentNoPreviousValue() throws Exception { + store.putIfAbsent(hi, there); + assertThat(inner.get(hi), equalTo(there)); + } + + @Test + public void shouldNotWriteToInnerOnPutIfAbsentWhenValueForKeyExists() throws Exception { + store.put(hi, there); + store.putIfAbsent(hi, world); + assertThat(inner.get(hi), equalTo(there)); + } + + @Test + public void shouldWriteToChangelogOnPutIfAbsentWhenNoPreviousValue() throws Exception { + store.putIfAbsent(hi, there); + assertThat((byte[]) sent.get(hi), equalTo(there)); + } + + @Test + public void shouldNotWriteToChangeLogOnPutIfAbsentWhenValueForKeyExists() throws Exception { + store.put(hi, there); + store.putIfAbsent(hi, world); + assertThat((byte[]) sent.get(hi), equalTo(there)); + } + + @Test + public void shouldReturnCurrentValueOnPutIfAbsent() throws Exception { + store.put(hi, there); + assertThat(store.putIfAbsent(hi, world), equalTo(there)); + } + + @Test + public void shouldReturnNullOnPutIfAbsentWhenNoPreviousValue() throws Exception { + assertThat(store.putIfAbsent(hi, there), is(nullValue())); + } + + @Test + public void shouldReturnValueOnGetWhenExists() throws Exception { + store.put(hello, world); + assertThat(store.get(hello), equalTo(world)); + } + + @Test + public void shouldReturnNullOnGetWhenDoesntExist() throws Exception { + assertThat(store.get(hello), is(nullValue())); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/6f72a5a5/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStoreTest.java new file mode 100644 index 0000000..8815c5a --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStoreTest.java @@ -0,0 +1,207 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.test.InMemoryKeyValueStore; +import org.apache.kafka.test.MockProcessorContext; +import org.apache.kafka.test.NoOpRecordCollector; +import org.apache.kafka.test.TestUtils; +import org.junit.Before; +import org.junit.Test; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertFalse; + +public class ChangeLoggingKeyValueStoreTest { + + private final InMemoryKeyValueStore<Bytes, byte[]> inner = new InMemoryKeyValueStore<>("kv"); + private final Serde<String> keySerde = Serdes.String(); + private final Serde<String> valueSerde = Serdes.String(); + private final ChangeLoggingKeyValueStore<String, String> store + = new ChangeLoggingKeyValueStore<>(inner, keySerde, valueSerde); + private final Map sent = new HashMap<>(); + private final String hi = "hi"; + private final Bytes hiBytes = Bytes.wrap(hi.getBytes()); + private final String there = "there"; + private final byte[] thereBytes = "there".getBytes(); + private final String hello = "hello"; + private final String world = "world"; + + @Before + public void before() { + final NoOpRecordCollector collector = new NoOpRecordCollector() { + @Override + public <K, V> void send(final ProducerRecord<K, V> record, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) { + sent.put(record.key(), record.value()); + } + }; + final MockProcessorContext context = new MockProcessorContext(null, + TestUtils.tempDirectory(), + Serdes.String(), + Serdes.Long(), + collector, + new ThreadCache("testCache", 0, new MockStreamsMetrics(new Metrics()))); + context.setTime(0); + store.init(context, store); + } + + @Test + public void shouldWriteKeyValueBytesToInnerStoreOnPut() throws Exception { + store.put(hi, there); + assertThat(deserializedValueFromInner(hi), equalTo(there)); + } + + @Test + public void shouldLogChangeOnPut() throws Exception { + store.put(hi, there); + assertThat((byte[]) sent.get(hiBytes), equalTo(thereBytes)); + } + + @Test + public void shouldWriteAllKeyValueToInnerStoreOnPutAll() throws Exception { + store.putAll(Arrays.asList(KeyValue.pair(hello, world), + KeyValue.pair(hi, there))); + assertThat(deserializedValueFromInner(hello), equalTo(world)); + assertThat(deserializedValueFromInner(hi), equalTo(there)); + } + + @Test + public void shouldLogChangesOnPutAll() throws Exception { + store.putAll(Arrays.asList(KeyValue.pair(hi, there), + KeyValue.pair(hello, world))); + assertThat((byte[]) sent.get(hiBytes), equalTo(thereBytes)); + assertThat((byte[]) sent.get(Bytes.wrap(hello.getBytes())), equalTo(world.getBytes())); + } + + @Test + public void shouldPutNullOnDelete() throws Exception { + store.put(hi, there); + store.delete(hi); + assertThat(inner.get(hiBytes), nullValue()); + } + + @Test + public void shouldReturnOldValueOnDelete() throws Exception { + store.put(hi, there); + assertThat(store.delete(hi), equalTo(there)); + } + + @Test + public void shouldReturnNullOnDeleteIfNoOldValue() throws Exception { + assertThat(store.delete(hi), is(nullValue())); + } + + @Test + public void shouldLogKeyNullOnDelete() throws Exception { + store.put(hi, there); + store.delete(hi); + assertThat(sent.get(hi), nullValue()); + } + + @Test + public void shouldWriteToInnerOnPutIfAbsentNoPreviousValue() throws Exception { + store.putIfAbsent(hi, there); + assertThat(inner.get(hiBytes), equalTo(thereBytes)); + } + + @Test + public void shouldNotWriteToInnerOnPutIfAbsentWhenValueForKeyExists() throws Exception { + store.put(hi, there); + store.putIfAbsent(hi, world); + assertThat(inner.get(hiBytes), equalTo(thereBytes)); + } + + @Test + public void shouldWriteToChangelogOnPutIfAbsentWhenNoPreviousValue() throws Exception { + store.putIfAbsent(hi, there); + assertThat((byte[]) sent.get(hiBytes), equalTo(thereBytes)); + } + + @Test + public void shouldNotWriteToChangeLogOnPutIfAbsentWhenValueForKeyExists() throws Exception { + store.put(hi, there); + store.putIfAbsent(hi, world); + assertThat((byte[]) sent.get(hiBytes), equalTo(thereBytes)); + } + + @Test + public void shouldReturnCurrentValueOnPutIfAbsent() throws Exception { + store.put(hi, there); + assertThat(store.putIfAbsent(hi, world), equalTo(there)); + } + + @Test + public void shouldReturnNullOnPutIfAbsentWhenNoPreviousValue() throws Exception { + assertThat(store.putIfAbsent(hi, there), is(nullValue())); + } + + @Test + public void shouldQueryRange() throws Exception { + store.put(hello, world); + store.put(hi, there); + store.put("zooom", "home"); + final KeyValueIterator<String, String> range = store.range(hello, hi); + assertThat(range.next(), equalTo(KeyValue.pair(hello, world))); + assertThat(range.next(), equalTo(KeyValue.pair(hi, there))); + assertFalse(range.hasNext()); + } + + @Test + public void shouldReturnAllKeyValues() throws Exception { + store.put(hello, world); + store.put(hi, there); + final String zooom = "zooom"; + final String home = "home"; + store.put(zooom, home); + final KeyValueIterator<String, String> all = store.all(); + assertThat(all.next(), equalTo(KeyValue.pair(hello, world))); + assertThat(all.next(), equalTo(KeyValue.pair(hi, there))); + assertThat(all.next(), equalTo(KeyValue.pair(zooom, home))); + assertFalse(all.hasNext()); + } + + @Test + public void shouldReturnValueOnGetWhenExists() throws Exception { + store.put(hello, world); + assertThat(store.get(hello), equalTo(world)); + } + + @Test + public void shouldReturnNullOnGetWhenDoesntExist() throws Exception { + assertThat(store.get(hello), is(nullValue())); + } + + private String deserializedValueFromInner(final String key) { + return valueSerde.deserializer().deserialize("blah", inner.get(Bytes.wrap(key.getBytes()))); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/6f72a5a5/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java index 50845e8..0ebdd5c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.streams.KeyValue; import org.apache.kafka.test.InMemoryKeyValueStore; import org.junit.Before; import org.junit.Test; @@ -37,7 +38,7 @@ public class DelegatingPeekingKeyValueIteratorTest { } @Test - public void shouldPeekNext() throws Exception { + public void shouldPeekNextKey() throws Exception { store.put("A", "A"); final DelegatingPeekingKeyValueIterator<String, String> peekingIterator = new DelegatingPeekingKeyValueIterator<>(name, store.all()); assertEquals("A", peekingIterator.peekNextKey()); @@ -46,6 +47,15 @@ public class DelegatingPeekingKeyValueIteratorTest { } @Test + public void shouldPeekNext() throws Exception { + store.put("A", "A"); + final DelegatingPeekingKeyValueIterator<String, String> peekingIterator = new DelegatingPeekingKeyValueIterator<>(name, store.all()); + assertEquals(KeyValue.pair("A", "A"), peekingIterator.peekNext()); + assertEquals(KeyValue.pair("A", "A"), peekingIterator.peekNext()); + assertTrue(peekingIterator.hasNext()); + } + + @Test public void shouldPeekAndIterate() throws Exception { final String[] kvs = {"a", "b", "c", "d", "e", "f"}; for (String kv : kvs) { http://git-wip-us.apache.org/repos/asf/kafka/blob/6f72a5a5/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIteratorTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIteratorTest.java new file mode 100644 index 0000000..e7c2eb3 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIteratorTest.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.internals.SessionKeySerde; +import org.apache.kafka.streams.kstream.internals.SessionWindow; +import org.apache.kafka.streams.state.StateSerdes; +import org.apache.kafka.test.KeyValueIteratorStub; +import org.junit.Test; + +import java.util.Collections; +import java.util.Iterator; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class MergedSortedCacheSessionStoreIteratorTest { + + private final String storeKey = "a"; + private final String cacheKey = "b"; + + private final SessionWindow storeWindow = new SessionWindow(0, 1); + private final Iterator<KeyValue<Windowed<Bytes>, byte[]>> storeKvs = Collections.singleton( + KeyValue.pair(new Windowed<>(Bytes.wrap(storeKey.getBytes()), storeWindow), storeKey.getBytes())).iterator(); + private final SessionWindow cacheWindow = new SessionWindow(10, 20); + private final Iterator<KeyValue<Bytes, LRUCacheEntry>> cacheKvs = Collections.singleton(KeyValue.pair( + SessionKeySerde.toBinary( + new Windowed<>(cacheKey, cacheWindow), Serdes.String().serializer()), new LRUCacheEntry(cacheKey.getBytes()))) + .iterator(); + + @Test + public void shouldHaveNextFromStore() throws Exception { + final MergedSortedCacheSessionStoreIterator<String, String> mergeIterator + = createIterator(storeKvs, Collections.<KeyValue<Bytes, LRUCacheEntry>>emptyIterator()); + assertTrue(mergeIterator.hasNext()); + } + + @Test + public void shouldGetNextFromStore() throws Exception { + final MergedSortedCacheSessionStoreIterator<String, String> mergeIterator + = createIterator(storeKvs, Collections.<KeyValue<Bytes, LRUCacheEntry>>emptyIterator()); + assertThat(mergeIterator.next(), equalTo(KeyValue.pair(new Windowed<>(storeKey, storeWindow), storeKey))); + } + + @Test + public void shouldPeekNextKeyFromStore() throws Exception { + final MergedSortedCacheSessionStoreIterator<String, String> mergeIterator + = createIterator(storeKvs, Collections.<KeyValue<Bytes, LRUCacheEntry>>emptyIterator()); + assertThat(mergeIterator.peekNextKey(), equalTo(new Windowed<>(storeKey, storeWindow))); + } + + @Test + public void shouldHaveNextFromCache() throws Exception { + final MergedSortedCacheSessionStoreIterator<String, String> mergeIterator + = createIterator(Collections.<KeyValue<Windowed<Bytes>, byte[]>>emptyIterator(), + cacheKvs); + assertTrue(mergeIterator.hasNext()); + } + + @Test + public void shouldGetNextFromCache() throws Exception { + final MergedSortedCacheSessionStoreIterator<String, String> mergeIterator + = createIterator(Collections.<KeyValue<Windowed<Bytes>, byte[]>>emptyIterator(), cacheKvs); + assertThat(mergeIterator.next(), equalTo(KeyValue.pair(new Windowed<>(cacheKey, cacheWindow), cacheKey))); + } + + @Test + public void shouldPeekNextKeyFromCache() throws Exception { + final MergedSortedCacheSessionStoreIterator<String, String> mergeIterator + = createIterator(Collections.<KeyValue<Windowed<Bytes>, byte[]>>emptyIterator(), cacheKvs); + assertThat(mergeIterator.peekNextKey(), equalTo(new Windowed<>(cacheKey, cacheWindow))); + } + + @Test + public void shouldIterateBothStoreAndCache() throws Exception { + final MergedSortedCacheSessionStoreIterator<String, String> iterator = createIterator(storeKvs, cacheKvs); + assertThat(iterator.next(), equalTo(KeyValue.pair(new Windowed<>(storeKey, storeWindow), storeKey))); + assertThat(iterator.next(), equalTo(KeyValue.pair(new Windowed<>(cacheKey, cacheWindow), cacheKey))); + assertFalse(iterator.hasNext()); + } + + private MergedSortedCacheSessionStoreIterator<String, String> createIterator(final Iterator<KeyValue<Windowed<Bytes>, byte[]>> storeKvs, + final Iterator<KeyValue<Bytes, LRUCacheEntry>> cacheKvs) { + final DelegatingPeekingKeyValueIterator<Windowed<Bytes>, byte[]> storeIterator + = new DelegatingPeekingKeyValueIterator<>("store", new KeyValueIteratorStub<>(storeKvs)); + + final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator + = new DelegatingPeekingKeyValueIterator<>("cache", new KeyValueIteratorStub<>(cacheKvs)); + return new MergedSortedCacheSessionStoreIterator<>(cacheIterator, storeIterator, new StateSerdes<>("name", Serdes.String(), Serdes.String())); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/6f72a5a5/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIteratorTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIteratorTest.java index b04f248..376fca8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIteratorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIteratorTest.java @@ -19,7 +19,6 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.state.KeyValueIterator; @@ -30,23 +29,25 @@ import org.junit.Test; import java.util.ArrayList; import java.util.List; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; public class MergedSortedCacheWindowStoreIteratorTest { + private final List<KeyValue<Long, byte[]>> windowStoreKvPairs = new ArrayList<>(); + private final ThreadCache cache = new ThreadCache("testCache", 1000000L, new MockStreamsMetrics(new Metrics())); + private final String namespace = "one"; + private final StateSerdes<String, String> stateSerdes = new StateSerdes<>("foo", Serdes.String(), Serdes.String()); + @Test public void shouldIterateOverValueFromBothIterators() throws Exception { - final List<KeyValue<Bytes, byte[]>> storeValues = new ArrayList<>(); - final ThreadCache cache = new ThreadCache("testCache", 1000000L, new MockStreamsMetrics(new Metrics())); - final String namespace = "one"; - final StateSerdes<String, String> stateSerdes = new StateSerdes<>("foo", Serdes.String(), Serdes.String()); final List<KeyValue<Long, byte[]>> expectedKvPairs = new ArrayList<>(); - for (long t = 0; t < 100; t += 20) { final byte[] v1Bytes = String.valueOf(t).getBytes(); - final KeyValue<Bytes, byte[]> v1 = KeyValue.pair(Bytes.wrap(WindowStoreUtils.toBinaryKey("a", t, 0, stateSerdes)), v1Bytes); - storeValues.add(v1); + final KeyValue<Long, byte[]> v1 = KeyValue.pair(t, v1Bytes); + windowStoreKvPairs.add(v1); expectedKvPairs.add(KeyValue.pair(t, v1Bytes)); final byte[] keyBytes = WindowStoreUtils.toBinaryKey("a", t + 10, 0, stateSerdes); final byte[] valBytes = String.valueOf(t + 10).getBytes(); @@ -56,11 +57,11 @@ public class MergedSortedCacheWindowStoreIteratorTest { byte[] binaryFrom = WindowStoreUtils.toBinaryKey("a", 0, 0, stateSerdes); byte[] binaryTo = WindowStoreUtils.toBinaryKey("a", 100, 0, stateSerdes); - final KeyValueIterator<Bytes, byte[]> storeIterator = new DelegatingPeekingKeyValueIterator<>("name", new KeyValueIteratorStub<>(storeValues.iterator())); + final KeyValueIterator<Long, byte[]> storeIterator = new DelegatingPeekingKeyValueIterator<>("name", new KeyValueIteratorStub<>(windowStoreKvPairs.iterator())); final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(namespace, binaryFrom, binaryTo); - final MergedSortedCachedWindowStoreIterator<Bytes, byte[]> iterator = new MergedSortedCachedWindowStoreIterator<>(cacheIterator, storeIterator, new StateSerdes<>("name", Serdes.Bytes(), Serdes.ByteArray())); + final MergedSortedCacheWindowStoreIterator<byte[]> iterator = new MergedSortedCacheWindowStoreIterator<>(cacheIterator, storeIterator, new StateSerdes<>("name", Serdes.Long(), Serdes.ByteArray())); int index = 0; while (iterator.hasNext()) { final KeyValue<Long, byte[]> next = iterator.next(); @@ -70,4 +71,18 @@ public class MergedSortedCacheWindowStoreIteratorTest { } } + @Test + public void shouldPeekNextKey() throws Exception { + windowStoreKvPairs.add(KeyValue.pair(10L, "a".getBytes())); + cache.put(namespace, WindowStoreUtils.toBinaryKey("a", 0, 0, stateSerdes), new LRUCacheEntry("b".getBytes())); + byte[] binaryFrom = WindowStoreUtils.toBinaryKey("a", 0, 0, stateSerdes); + byte[] binaryTo = WindowStoreUtils.toBinaryKey("a", 100, 0, stateSerdes); + final KeyValueIterator<Long, byte[]> storeIterator = new DelegatingPeekingKeyValueIterator<>("name", new KeyValueIteratorStub<>(windowStoreKvPairs.iterator())); + final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(namespace, binaryFrom, binaryTo); + final MergedSortedCacheWindowStoreIterator<byte[]> iterator = new MergedSortedCacheWindowStoreIterator<>(cacheIterator, storeIterator, new StateSerdes<>("name", Serdes.Long(), Serdes.ByteArray())); + assertThat(iterator.peekNextKey(), equalTo(0L)); + iterator.next(); + assertThat(iterator.peekNextKey(), equalTo(10L)); + } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/6f72a5a5/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java index 2082e00..a2ce96c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java @@ -105,6 +105,11 @@ public class ReadOnlyWindowStoreStub<K, V> implements ReadOnlyWindowStore<K, V>, } @Override + public Long peekNextKey() { + throw new UnsupportedOperationException("peekNextKey not supported in stub"); + } + + @Override public boolean hasNext() { return underlying.hasNext(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/6f72a5a5/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java new file mode 100644 index 0000000..3d9a56c --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.streams.StreamsMetrics; +import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.test.MockProcessorContext; +import org.apache.kafka.test.NoOpRecordCollector; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.IsInstanceOf.instanceOf; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class RocksDBKeyValueStoreSupplierTest { + + private static final String STORE_NAME = "name"; + private final ThreadCache cache = new ThreadCache("test", 1024, new MockStreamsMetrics(new Metrics())); + private final MockProcessorContext context = new MockProcessorContext(null, + TestUtils.tempDirectory(), + Serdes.String(), + Serdes.String(), + new NoOpRecordCollector(), + cache); + private KeyValueStore<String, String> store; + + @After + public void close() { + store.close(); + } + + @Test + public void shouldCreateLoggingEnabledStoreWhenStoreLogged() throws Exception { + store = createStore(true, false); + final List<ProducerRecord> logged = new ArrayList<>(); + final NoOpRecordCollector collector = new NoOpRecordCollector() { + @Override + public <K, V> void send(final ProducerRecord<K, V> record, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) { + logged.add(record); + } + }; + final MockProcessorContext context = new MockProcessorContext(null, + TestUtils.tempDirectory(), + Serdes.String(), + Serdes.String(), + collector, + cache); + context.setTime(1); + store.init(context, store); + store.put("a", "b"); + assertFalse(logged.isEmpty()); + } + + @Test + public void shouldNotBeLoggingEnabledStoreWhenLoggingNotEnabled() throws Exception { + store = createStore(false, false); + final List<ProducerRecord> logged = new ArrayList<>(); + final NoOpRecordCollector collector = new NoOpRecordCollector() { + @Override + public <K, V> void send(final ProducerRecord<K, V> record, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) { + logged.add(record); + } + }; + final MockProcessorContext context = new MockProcessorContext(null, + TestUtils.tempDirectory(), + Serdes.String(), + Serdes.String(), + collector, + cache); + context.setTime(1); + store.init(context, store); + store.put("a", "b"); + assertTrue(logged.isEmpty()); + } + + @Test + public void shouldReturnCachedKeyValueStoreWhenCachingEnabled() throws Exception { + store = createStore(false, true); + store.init(context, store); + context.setTime(1); + store.put("a", "b"); + store.put("b", "c"); + assertThat(store, is(instanceOf(CachingKeyValueStore.class))); + assertThat(cache.size(), is(2L)); + } + + @Test + public void shouldReturnMeteredStoreWhenCachingAndLoggingDisabled() throws Exception { + store = createStore(false, false); + assertThat(store, is(instanceOf(MeteredKeyValueStore.class))); + } + + @Test + public void shouldReturnMeteredStoreWhenCachingDisabled() throws Exception { + store = createStore(true, false); + assertThat(store, is(instanceOf(MeteredKeyValueStore.class))); + } + + @SuppressWarnings("unchecked") + @Test + public void shouldHaveMeteredStoreWhenCached() throws Exception { + store = createStore(false, true); + store.init(context, store); + final StreamsMetrics metrics = context.metrics(); + assertFalse(metrics.metrics().isEmpty()); + } + + @SuppressWarnings("unchecked") + @Test + public void shouldHaveMeteredStoreWhenLogged() throws Exception { + store = createStore(true, false); + store.init(context, store); + final StreamsMetrics metrics = context.metrics(); + assertFalse(metrics.metrics().isEmpty()); + } + + @SuppressWarnings("unchecked") + private KeyValueStore<String, String> createStore(final boolean logged, final boolean cached) { + return new RocksDBKeyValueStoreSupplier<>(STORE_NAME, + Serdes.String(), + Serdes.String(), + logged, + Collections.EMPTY_MAP, + cached).get(); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/6f72a5a5/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java new file mode 100644 index 0000000..28196a2 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java @@ -0,0 +1,169 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.streams.StreamsMetrics; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.internals.SessionWindow; +import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; +import org.apache.kafka.streams.state.SessionStore; +import org.apache.kafka.test.MockProcessorContext; +import org.apache.kafka.test.NoOpRecordCollector; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.IsInstanceOf.instanceOf; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class RocksDBSessionStoreSupplierTest { + + private static final String STORE_NAME = "name"; + private final ThreadCache cache = new ThreadCache("test", 1024, new MockStreamsMetrics(new Metrics())); + private final MockProcessorContext context = new MockProcessorContext(null, + TestUtils.tempDirectory(), + Serdes.String(), + Serdes.String(), + new NoOpRecordCollector(), + cache); + + private SessionStore<String, String> store; + + @After + public void close() { + store.close(); + } + + @Test + public void shouldCreateLoggingEnabledStoreWhenStoreLogged() throws Exception { + store = createStore(true, false); + final List<ProducerRecord> logged = new ArrayList<>(); + final NoOpRecordCollector collector = new NoOpRecordCollector() { + @Override + public <K, V> void send(final ProducerRecord<K, V> record, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) { + logged.add(record); + } + }; + final MockProcessorContext context = new MockProcessorContext(null, + TestUtils.tempDirectory(), + Serdes.String(), + Serdes.String(), + collector, + cache); + context.setTime(1); + store.init(context, store); + store.put(new Windowed<>("a", new SessionWindow(0, 10)), "b"); + assertFalse(logged.isEmpty()); + } + + @Test + public void shouldNotBeLoggingEnabledStoreWhenLoggingNotEnabled() throws Exception { + store = createStore(false, false); + final List<ProducerRecord> logged = new ArrayList<>(); + final NoOpRecordCollector collector = new NoOpRecordCollector() { + @Override + public <K, V> void send(final ProducerRecord<K, V> record, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) { + logged.add(record); + } + }; + final MockProcessorContext context = new MockProcessorContext(null, + TestUtils.tempDirectory(), + Serdes.String(), + Serdes.String(), + collector, + cache); + context.setTime(1); + store.init(context, store); + store.put(new Windowed<>("a", new SessionWindow(0, 10)), "b"); + assertTrue(logged.isEmpty()); + } + + @Test + public void shouldReturnCachedSessionStoreWhenCachingEnabled() throws Exception { + store = createStore(false, true); + store.init(context, store); + context.setTime(1); + store.put(new Windowed<>("a", new SessionWindow(0, 10)), "b"); + store.put(new Windowed<>("b", new SessionWindow(0, 10)), "c"); + assertThat(store, is(instanceOf(CachingSessionStore.class))); + assertThat(cache.size(), is(2L)); + } + + @Test + public void shouldReturnRocksDbStoreWhenCachingAndLoggingDisabled() throws Exception { + store = createStore(false, false); + assertThat(store, is(instanceOf(RocksDBSessionStore.class))); + } + + @Test + public void shouldReturnRocksDbStoreWhenCachingDisabled() throws Exception { + store = createStore(true, false); + assertThat(store, is(instanceOf(RocksDBSessionStore.class))); + } + + @SuppressWarnings("unchecked") + @Test + public void shouldHaveMeteredStoreWhenCached() throws Exception { + store = createStore(false, true); + store.init(context, store); + final StreamsMetrics metrics = context.metrics(); + assertFalse(metrics.metrics().isEmpty()); + } + + @SuppressWarnings("unchecked") + @Test + public void shouldHaveMeteredStoreWhenLogged() throws Exception { + store = createStore(true, false); + store.init(context, store); + final StreamsMetrics metrics = context.metrics(); + assertFalse(metrics.metrics().isEmpty()); + } + + @SuppressWarnings("unchecked") + @Test + public void shouldHaveMeteredStoreWhenNotLoggedOrCached() throws Exception { + store = createStore(false, false); + store.init(context, store); + final StreamsMetrics metrics = context.metrics(); + assertFalse(metrics.metrics().isEmpty()); + } + + + + private SessionStore<String, String> createStore(final boolean logged, final boolean cached) { + return new RocksDBSessionStoreSupplier<>(STORE_NAME, + 10, + Serdes.String(), + Serdes.String(), + logged, + Collections.<String, String>emptyMap(), + cached).get(); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/6f72a5a5/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java new file mode 100644 index 0000000..897ec62 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.streams.StreamsMetrics; +import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; +import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.test.MockProcessorContext; +import org.apache.kafka.test.NoOpRecordCollector; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.IsInstanceOf.instanceOf; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class RocksDBWindowStoreSupplierTest { + + private static final String STORE_NAME = "name"; + private WindowStore<String, String> store; + private final ThreadCache cache = new ThreadCache("test", 1024, new MockStreamsMetrics(new Metrics())); + private final MockProcessorContext context = new MockProcessorContext(null, + TestUtils.tempDirectory(), + Serdes.String(), + Serdes.String(), + new NoOpRecordCollector(), + cache); + + @After + public void close() { + store.close(); + } + + @Test + public void shouldCreateLoggingEnabledStoreWhenWindowStoreLogged() throws Exception { + store = createStore(true, false); + final List<ProducerRecord> logged = new ArrayList<>(); + final NoOpRecordCollector collector = new NoOpRecordCollector() { + @Override + public <K, V> void send(final ProducerRecord<K, V> record, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) { + logged.add(record); + } + }; + final MockProcessorContext context = new MockProcessorContext(null, + TestUtils.tempDirectory(), + Serdes.String(), + Serdes.String(), + collector, + cache); + context.setTime(1); + store.init(context, store); + store.put("a", "b"); + assertFalse(logged.isEmpty()); + } + + @Test + public void shouldNotBeLoggingEnabledStoreWhenLogginNotEnabled() throws Exception { + store = createStore(false, false); + final List<ProducerRecord> logged = new ArrayList<>(); + final NoOpRecordCollector collector = new NoOpRecordCollector() { + @Override + public <K, V> void send(final ProducerRecord<K, V> record, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) { + logged.add(record); + } + }; + final MockProcessorContext context = new MockProcessorContext(null, + TestUtils.tempDirectory(), + Serdes.String(), + Serdes.String(), + collector, + cache); + context.setTime(1); + store.init(context, store); + store.put("a", "b"); + assertTrue(logged.isEmpty()); + } + + @Test + public void shouldBeCachedWindowStoreWhenCachingEnabled() throws Exception { + store = createStore(false, true); + store.init(context, store); + context.setTime(1); + store.put("a", "b"); + store.put("b", "c"); + assertThat(store, is(instanceOf(CachingWindowStore.class))); + assertThat(context.getCache().size(), is(2L)); + } + + @Test + public void shouldReturnRocksDbStoreWhenCachingAndLoggingDisabled() throws Exception { + store = createStore(false, false); + assertThat(store, is(instanceOf(RocksDBWindowStore.class))); + } + + @Test + public void shouldReturnRocksDbStoreWhenCachingDisabled() throws Exception { + store = createStore(true, false); + assertThat(store, is(instanceOf(RocksDBWindowStore.class))); + } + + @SuppressWarnings("unchecked") + @Test + public void shouldHaveMeteredStoreWhenCached() throws Exception { + store = createStore(false, true); + store.init(context, store); + final StreamsMetrics metrics = context.metrics(); + assertFalse(metrics.metrics().isEmpty()); + } + + @SuppressWarnings("unchecked") + @Test + public void shouldHaveMeteredStoreWhenLogged() throws Exception { + store = createStore(true, false); + store.init(context, store); + final StreamsMetrics metrics = context.metrics(); + assertFalse(metrics.metrics().isEmpty()); + } + + @SuppressWarnings("unchecked") + @Test + public void shouldHaveMeteredStoreWhenNotLoggedOrCached() throws Exception { + store = createStore(false, false); + store.init(context, store); + final StreamsMetrics metrics = context.metrics(); + assertFalse(metrics.metrics().isEmpty()); + } + + @SuppressWarnings("unchecked") + private WindowStore<String, String> createStore(final boolean logged, final boolean cached) { + return new RocksDBWindowStoreSupplier<>(STORE_NAME, + 10, + 3, + false, + Serdes.String(), + Serdes.String(), + 10, + logged, + Collections.<String, String>emptyMap(), + cached).get(); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/6f72a5a5/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java index a522592..79223de 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java @@ -490,8 +490,6 @@ public class RocksDBWindowStoreTest { recordCollector, cache); WindowStore<Integer, String> store = createWindowStore(context, false, true); - RocksDBWindowStore<Integer, String> inner = - (RocksDBWindowStore<Integer, String>) ((MeteredWindowStore<Integer, String>) store).inner(); try { // to validate segments final Segments segments = new Segments(windowName, retentionPeriod, numSegments); http://git-wip-us.apache.org/repos/asf/kafka/blob/6f72a5a5/streams/src/test/java/org/apache/kafka/streams/state/internals/SerializedKeyValueIteratorTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SerializedKeyValueIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SerializedKeyValueIteratorTest.java new file mode 100644 index 0000000..8c0d2fe --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SerializedKeyValueIteratorTest.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.state.StateSerdes; +import org.apache.kafka.test.KeyValueIteratorStub; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.NoSuchElementException; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class SerializedKeyValueIteratorTest { + + private final StateSerdes<String, String> serdes = new StateSerdes<>("blah", Serdes.String(), Serdes.String()); + private final Iterator<KeyValue<Bytes, byte[]>> iterator + = Arrays.asList(KeyValue.pair(Bytes.wrap("hi".getBytes()), "there".getBytes()), + KeyValue.pair(Bytes.wrap("hello".getBytes()), "world".getBytes())) + .iterator(); + private final DelegatingPeekingKeyValueIterator<Bytes, byte[]> peeking + = new DelegatingPeekingKeyValueIterator<>("store", new KeyValueIteratorStub<>(iterator)); + private final SerializedKeyValueIterator<String, String> serializedKeyValueIterator + = new SerializedKeyValueIterator<>(peeking, serdes); + + @Test + public void shouldReturnTrueOnHasNextWhenMoreResults() { + assertTrue(serializedKeyValueIterator.hasNext()); + } + + @Test + public void shouldReturnNextValueWhenItExists() throws Exception { + assertThat(serializedKeyValueIterator.next(), equalTo(KeyValue.pair("hi", "there"))); + assertThat(serializedKeyValueIterator.next(), equalTo(KeyValue.pair("hello", "world"))); + } + + @Test + public void shouldReturnFalseOnHasNextWhenNoMoreResults() throws Exception { + advanceIteratorToEnd(); + assertFalse(serializedKeyValueIterator.hasNext()); + } + + @Test + public void shouldThrowNoSuchElementOnNextWhenIteratorExhausted() throws Exception { + advanceIteratorToEnd(); + try { + serializedKeyValueIterator.next(); + fail("Expected NoSuchElementException on exhausted iterator"); + } catch (final NoSuchElementException nse) { + // pass + } + } + + @Test + public void shouldPeekNextKey() throws Exception { + assertThat(serializedKeyValueIterator.peekNextKey(), equalTo("hi")); + serializedKeyValueIterator.next(); + assertThat(serializedKeyValueIterator.peekNextKey(), equalTo("hello")); + } + + @Test(expected = UnsupportedOperationException.class) + public void shouldThrowUnsupportedOperationOnRemove() throws Exception { + serializedKeyValueIterator.remove(); + } + + private void advanceIteratorToEnd() { + serializedKeyValueIterator.next(); + serializedKeyValueIterator.next(); + } + + +} \ No newline at end of file