KAFKA-3452 Follow-up: Refactoring StateStore hierarchies This is a follow up of https://github.com/apache/kafka/pull/2166 - refactoring the store hierarchies as requested
Author: Damian Guy <damian....@gmail.com> Reviewers: Guozhang Wang <wangg...@gmail.com> Closes #2360 from dguy/state-store-refactor Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/73b7ae00 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/73b7ae00 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/73b7ae00 Branch: refs/heads/trunk Commit: 73b7ae0019d387407375f3865e263225c986a6ce Parents: 825f225 Author: Damian Guy <damian....@gmail.com> Authored: Tue Jan 17 14:13:46 2017 -0800 Committer: Guozhang Wang <wangg...@gmail.com> Committed: Tue Jan 17 14:13:46 2017 -0800 ---------------------------------------------------------------------- .../kstream/internals/SessionKeySerde.java | 17 ++ .../streams/state/WindowStoreIterator.java | 3 +- .../AbstractMergedSortedCacheStoreIterator.java | 166 +++++++++++++++ .../state/internals/CachingKeyValueStore.java | 10 +- .../state/internals/CachingSessionStore.java | 63 ++---- .../state/internals/CachingWindowStore.java | 64 +++--- .../ChangeLoggingKeyValueBytesStore.java | 93 +++++++++ .../internals/ChangeLoggingKeyValueStore.java | 127 ++++++++++++ .../ChangeLoggingSegmentedBytesStore.java | 28 +-- .../internals/CompositeReadOnlyWindowStore.java | 5 + .../DelegatingPeekingKeyValueIterator.java | 10 +- .../MergedSortedCacheKeyValueStoreIterator.java | 130 ++---------- .../MergedSortedCacheSessionStoreIterator.java | 71 +++++++ .../MergedSortedCacheWindowStoreIterator.java | 58 ++++++ .../MergedSortedCachedWindowStoreIterator.java | 107 ---------- .../state/internals/MeteredKeyValueStore.java | 27 +-- .../internals/MeteredSegmentedBytesStore.java | 27 +-- .../state/internals/MeteredWindowStore.java | 180 ---------------- .../internals/RocksDBKeyValueStoreSupplier.java | 54 +++-- .../internals/RocksDBSessionStoreSupplier.java | 54 +++-- .../streams/state/internals/RocksDBStore.java | 28 +-- .../state/internals/RocksDBWindowStore.java | 25 ++- .../internals/RocksDBWindowStoreSupplier.java | 37 ++-- .../internals/SerializedKeyValueIterator.java | 70 +++++++ .../state/internals/WindowStoreUtils.java | 3 + .../state/internals/WrappedStateStore.java | 90 ++++++++ .../internals/KGroupedStreamImplTest.java | 42 +++- .../internals/CachingSessionStoreTest.java | 3 +- .../state/internals/CachingWindowStoreTest.java | 5 +- .../ChangeLoggingKeyValueBytesStoreTest.java | 165 +++++++++++++++ .../ChangeLoggingKeyValueStoreTest.java | 207 +++++++++++++++++++ .../DelegatingPeekingKeyValueIteratorTest.java | 12 +- ...rgedSortedCacheSessionStoreIteratorTest.java | 113 ++++++++++ ...ergedSortedCacheWindowStoreIteratorTest.java | 35 +++- .../internals/ReadOnlyWindowStoreStub.java | 5 + .../RocksDBKeyValueStoreSupplierTest.java | 155 ++++++++++++++ .../RocksDBSessionStoreSupplierTest.java | 169 +++++++++++++++ .../RocksDBWindowStoreSupplierTest.java | 168 +++++++++++++++ .../state/internals/RocksDBWindowStoreTest.java | 2 - .../SerializedKeyValueIteratorTest.java | 95 +++++++++ 40 files changed, 2081 insertions(+), 642 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/73b7ae00/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java index 48213d6..d9a3528 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.kstream.Window; import org.apache.kafka.streams.kstream.Windowed; import java.nio.ByteBuffer; @@ -146,4 +147,20 @@ public class SessionKeySerde<K> implements Serde<Windowed<K>> { buf.putLong(sessionKey.window().start()); return new Bytes(buf.array()); } + + public static Bytes bytesToBinary(final Windowed<Bytes> sessionKey) { + final byte[] bytes = sessionKey.key().get(); + ByteBuffer buf = ByteBuffer.allocate(bytes.length + 2 * TIMESTAMP_SIZE); + buf.put(bytes); + buf.putLong(sessionKey.window().end()); + buf.putLong(sessionKey.window().start()); + return new Bytes(buf.array()); + } + + public static Window extractWindow(final byte [] binaryKey) { + final ByteBuffer buffer = ByteBuffer.wrap(binaryKey); + final long start = buffer.getLong(binaryKey.length - TIMESTAMP_SIZE); + final long end = buffer.getLong(binaryKey.length - 2 * TIMESTAMP_SIZE); + return new TimeWindow(start, end); + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/73b7ae00/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java index b6e6d0c..958b778 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java @@ -22,7 +22,6 @@ package org.apache.kafka.streams.state; import org.apache.kafka.streams.KeyValue; import java.io.Closeable; -import java.util.Iterator; /** * Iterator interface of {@link KeyValue} with key typed {@link Long} used for {@link WindowStore#fetch(Object, long, long)}. @@ -32,7 +31,7 @@ import java.util.Iterator; * * @param <E> Type of values */ -public interface WindowStoreIterator<E> extends Iterator<KeyValue<Long, E>>, Closeable { +public interface WindowStoreIterator<V> extends KeyValueIterator<Long, V>, Closeable { @Override void close(); http://git-wip-us.apache.org/repos/asf/kafka/blob/73b7ae00/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java new file mode 100644 index 0000000..009dad0 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java @@ -0,0 +1,166 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.StateSerdes; + +import java.util.NoSuchElementException; + +/** + * Merges two iterators. Assumes each of them is sorted by key + * + * @param <K> + * @param <V> + */ +abstract class AbstractMergedSortedCacheStoreIterator<K, KS, V> implements KeyValueIterator<K, V> { + private final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator; + private final KeyValueIterator<KS, byte[]> storeIterator; + protected final StateSerdes<K, V> serdes; + + AbstractMergedSortedCacheStoreIterator(final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator, + final KeyValueIterator<KS, byte[]> storeIterator, + final StateSerdes<K, V> serdes) { + this.cacheIterator = cacheIterator; + this.storeIterator = storeIterator; + this.serdes = serdes; + } + + abstract int compare(final Bytes cacheKey, final KS storeKey); + + abstract K deserializeStoreKey(final KS key); + + abstract KeyValue<K, V> deserializeStorePair(final KeyValue<KS, byte[]> pair); + + abstract K deserializeCacheKey(final Bytes cacheKey); + + private boolean isDeletedCacheEntry(final KeyValue<Bytes, LRUCacheEntry> nextFromCache) { + return nextFromCache.value.value == null; + } + + @Override + public boolean hasNext() { + // skip over items deleted from cache, and corresponding store items if they have the same key + while (cacheIterator.hasNext() && isDeletedCacheEntry(cacheIterator.peekNext())) { + if (storeIterator.hasNext()) { + final KS nextStoreKey = storeIterator.peekNextKey(); + // advance the store iterator if the key is the same as the deleted cache key + if (compare(cacheIterator.peekNextKey(), nextStoreKey) == 0) { + storeIterator.next(); + } + } + cacheIterator.next(); + } + + return cacheIterator.hasNext() || storeIterator.hasNext(); + } + + @Override + public KeyValue<K, V> next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + + final Bytes nextCacheKey = cacheIterator.hasNext() ? cacheIterator.peekNextKey() : null; + final KS nextStoreKey = storeIterator.hasNext() ? storeIterator.peekNextKey() : null; + + if (nextCacheKey == null) { + return nextStoreValue(nextStoreKey); + } + + if (nextStoreKey == null) { + return nextCacheValue(nextCacheKey); + } + + final int comparison = compare(nextCacheKey, nextStoreKey); + if (comparison > 0) { + return nextStoreValue(nextStoreKey); + } else if (comparison < 0) { + return nextCacheValue(nextCacheKey); + } else { + // skip the same keyed element + storeIterator.next(); + return nextCacheValue(nextCacheKey); + } + } + + private KeyValue<K, V> nextStoreValue(KS nextStoreKey) { + final KeyValue<KS, byte[]> next = storeIterator.next(); + + if (!next.key.equals(nextStoreKey)) { + throw new IllegalStateException("Next record key is not the peeked key value; this should not happen"); + } + + return deserializeStorePair(next); + } + + private KeyValue<K, V> nextCacheValue(Bytes nextCacheKey) { + final KeyValue<Bytes, LRUCacheEntry> next = cacheIterator.next(); + + if (!next.key.equals(nextCacheKey)) { + throw new IllegalStateException("Next record key is not the peeked key value; this should not happen"); + } + + return KeyValue.pair(deserializeCacheKey(next.key), serdes.valueFrom(next.value.value)); + } + + @Override + public K peekNextKey() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + + final Bytes nextCacheKey = cacheIterator.hasNext() ? cacheIterator.peekNextKey() : null; + final KS nextStoreKey = storeIterator.hasNext() ? storeIterator.peekNextKey() : null; + + if (nextCacheKey == null) { + return deserializeStoreKey(nextStoreKey); + } + + if (nextStoreKey == null) { + return serdes.keyFrom(nextCacheKey.get()); + } + + final int comparison = compare(nextCacheKey, nextStoreKey); + if (comparison > 0) { + return deserializeStoreKey(nextStoreKey); + } else if (comparison < 0) { + return deserializeCacheKey(nextCacheKey); + } else { + // skip the same keyed element + storeIterator.next(); + return deserializeCacheKey(nextCacheKey); + } + } + + @Override + public void remove() { + throw new UnsupportedOperationException("remove() is not supported"); + } + + @Override + public void close() { + cacheIterator.close(); + storeIterator.close(); + } +} + http://git-wip-us.apache.org/repos/asf/kafka/blob/73b7ae00/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java index fdb03fd..9a0a976 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java @@ -31,7 +31,7 @@ import org.apache.kafka.streams.state.StateSerdes; import java.util.List; -class CachingKeyValueStore<K, V> implements KeyValueStore<K, V>, CachedStateStore<K, V> { +class CachingKeyValueStore<K, V> implements WrappedStateStore, KeyValueStore<K, V>, CachedStateStore<K, V> { private final KeyValueStore<Bytes, byte[]> underlying; private final Serde<K> keySerde; @@ -234,4 +234,12 @@ class CachingKeyValueStore<K, V> implements KeyValueStore<K, V>, CachedStateStor KeyValueStore<Bytes, byte[]> underlying() { return underlying; } + + @Override + public StateStore inner() { + if (underlying instanceof WrappedStateStore) { + return ((WrappedStateStore) underlying).inner(); + } + return underlying; + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/73b7ae00/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java index 17c4ee0..fec6609 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java @@ -19,7 +19,6 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.CacheFlushListener; import org.apache.kafka.streams.kstream.internals.SessionKeySerde; @@ -35,21 +34,22 @@ import java.util.List; import java.util.NoSuchElementException; -class CachingSessionStore<K, AGG> implements SessionStore<K, AGG>, CachedStateStore<Windowed<K>, AGG> { +class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractWrappedStateStore implements SessionStore<K, AGG>, CachedStateStore<Windowed<K>, AGG> { - private final SegmentedBytesStore bytesStore; + private final SessionStore<Bytes, byte[]> bytesStore; private final SessionKeySchema keySchema; private Serde<K> keySerde; private final Serde<AGG> aggSerde; private InternalProcessorContext context; private String name; - private StateSerdes<Windowed<K>, AGG> serdes; + private StateSerdes<K, AGG> serdes; private ThreadCache cache; private CacheFlushListener<Windowed<K>, AGG> flushListener; - CachingSessionStore(final SegmentedBytesStore bytesStore, + CachingSessionStore(final SessionStore<Bytes, byte[]> bytesStore, final Serde<K> keySerde, final Serde<AGG> aggSerde) { + super(bytesStore); this.bytesStore = bytesStore; this.keySerde = keySerde; this.aggSerde = aggSerde; @@ -65,12 +65,12 @@ class CachingSessionStore<K, AGG> implements SessionStore<K, AGG>, CachedStateS keySchema.lowerRange(binarySessionId, earliestSessionEndTime).get(), keySchema.upperRange(binarySessionId, latestSessionStartTime).get()); - final KeyValueIterator<Bytes, byte[]> storeIterator = bytesStore.fetch(binarySessionId, earliestSessionEndTime, latestSessionStartTime); + final KeyValueIterator<Windowed<Bytes>, byte[]> storeIterator = bytesStore.findSessions(binarySessionId, earliestSessionEndTime, latestSessionStartTime); final HasNextCondition hasNextCondition = keySchema.hasNextCondition(binarySessionId, - earliestSessionEndTime, - latestSessionStartTime); + earliestSessionEndTime, + latestSessionStartTime); final PeekingKeyValueIterator<Bytes, LRUCacheEntry> filteredCacheIterator = new FilteredCacheIterator(cacheIterator, hasNextCondition); - return new MergedSortedCacheKeyValueStoreIterator<>(filteredCacheIterator, storeIterator, serdes); + return new MergedSortedCacheSessionStoreIterator<>(filteredCacheIterator, storeIterator, serdes); } @@ -92,11 +92,6 @@ class CachingSessionStore<K, AGG> implements SessionStore<K, AGG>, CachedStateS return findSessions(key, 0, Long.MAX_VALUE); } - - public String name() { - return bytesStore.name(); - } - @SuppressWarnings("unchecked") public void init(final ProcessorContext context, final StateStore root) { bytesStore.init(context, root); @@ -107,14 +102,9 @@ class CachingSessionStore<K, AGG> implements SessionStore<K, AGG>, CachedStateS private void initInternal(final InternalProcessorContext context) { this.context = context; - if (keySerde == null) { - keySerde = (Serde<K>) context.keySerde(); - } - - - this.serdes = (StateSerdes<Windowed<K>, AGG>) new StateSerdes<>(bytesStore.name(), - new SessionKeySerde<>(keySerde), - aggSerde == null ? context.valueSerde() : aggSerde); + this.serdes = new StateSerdes<>(bytesStore.name(), + keySerde == null ? (Serde<K>) context.keySerde() : keySerde, + aggSerde == null ? (Serde<AGG>) context.valueSerde() : aggSerde); this.name = context.taskId() + "-" + bytesStore.name(); @@ -135,27 +125,27 @@ class CachingSessionStore<K, AGG> implements SessionStore<K, AGG>, CachedStateS final RecordContext current = context.recordContext(); context.setRecordContext(entry.recordContext()); try { + final Windowed<K> key = SessionKeySerde.from(binaryKey.get(), keySerde.deserializer()); if (flushListener != null) { - final Windowed<K> key = SessionKeySerde.from(binaryKey.get(), keySerde.deserializer()); final AGG newValue = serdes.valueFrom(entry.newValue()); final AGG oldValue = fetchPrevious(binaryKey); if (!(newValue == null && oldValue == null)) { flushListener.apply(key, newValue == null ? null : newValue, oldValue); } - } - bytesStore.put(binaryKey, entry.newValue()); + bytesStore.put(new Windowed<>(Bytes.wrap(serdes.rawKey(key.key())), key.window()), entry.newValue()); } finally { context.setRecordContext(current); } } private AGG fetchPrevious(final Bytes key) { - final byte[] bytes = bytesStore.get(key); - if (bytes == null) { - return null; + try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = bytesStore.fetch(key)) { + if (!iterator.hasNext()) { + return null; + } + return serdes.valueFrom(iterator.next().value); } - return serdes.valueFrom(bytes); } @@ -170,25 +160,10 @@ class CachingSessionStore<K, AGG> implements SessionStore<K, AGG>, CachedStateS cache.close(name); } - public boolean persistent() { - return bytesStore.persistent(); - } - - public boolean isOpen() { - return bytesStore.isOpen(); - } - public void setFlushListener(CacheFlushListener<Windowed<K>, AGG> flushListener) { this.flushListener = flushListener; } - private void validateStoreOpen() { - if (!isOpen()) { - throw new InvalidStateStoreException("Store " + this.name + " is currently closed"); - } - } - - private static class FilteredCacheIterator implements PeekingKeyValueIterator<Bytes, LRUCacheEntry> { private final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator; private final HasNextCondition hasNextCondition; http://git-wip-us.apache.org/repos/asf/kafka/blob/73b7ae00/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java index bd252f1..d471761 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java @@ -17,8 +17,8 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; -import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.CacheFlushListener; import org.apache.kafka.streams.kstream.internals.TimeWindow; @@ -26,16 +26,15 @@ import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.RecordContext; -import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.StateSerdes; import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.state.WindowStoreIterator; import java.util.List; -class CachingWindowStore<K, V> implements WindowStore<K, V>, CachedStateStore<Windowed<K>, V> { +class CachingWindowStore<K, V> extends WrappedStateStore.AbstractWrappedStateStore implements WindowStore<K, V>, CachedStateStore<Windowed<K>, V> { - private final SegmentedBytesStore underlying; + private final WindowStore<Bytes, byte[]> underlying; private final Serde<K> keySerde; private final Serde<V> valueSerde; private CacheFlushListener<Windowed<K>, V> flushListener; @@ -45,21 +44,17 @@ class CachingWindowStore<K, V> implements WindowStore<K, V>, CachedStateStore<Wi private InternalProcessorContext context; private StateSerdes<K, V> serdes; - CachingWindowStore(final SegmentedBytesStore underlying, + CachingWindowStore(final WindowStore<Bytes, byte[]> underlying, final Serde<K> keySerde, final Serde<V> valueSerde, final long windowSize) { + super(underlying); this.underlying = underlying; this.keySerde = keySerde; this.valueSerde = valueSerde; this.windowSize = windowSize; } - @Override - public String name() { - return underlying.name(); - } - @SuppressWarnings("unchecked") @Override public void init(final ProcessorContext context, final StateStore root) { @@ -80,13 +75,14 @@ class CachingWindowStore<K, V> implements WindowStore<K, V>, CachedStateStore<Wi @Override public void apply(final List<ThreadCache.DirtyEntry> entries) { for (ThreadCache.DirtyEntry entry : entries) { - final byte[] binaryKey = entry.key().get(); - final Bytes key = WindowStoreUtils.bytesKeyFromBinaryKey(binaryKey); - final long timestamp = WindowStoreUtils.timestampFromBinaryKey(binaryKey); - final Windowed<K> windowedKey = new Windowed<>(WindowStoreUtils.keyFromBinaryKey(binaryKey, serdes), + final byte[] binaryWindowKey = entry.key().get(); + final long timestamp = WindowStoreUtils.timestampFromBinaryKey(binaryWindowKey); + + final Windowed<K> windowedKey = new Windowed<>(WindowStoreUtils.keyFromBinaryKey(binaryWindowKey, serdes), new TimeWindow(timestamp, timestamp + windowSize)); - maybeForward(entry, Bytes.wrap(binaryKey), windowedKey, (InternalProcessorContext) context); - underlying.put(Bytes.wrap(WindowStoreUtils.toBinaryKey(key, timestamp, 0, WindowStoreUtils.INNER_SERDES)), entry.newValue()); + final Bytes key = WindowStoreUtils.bytesKeyFromBinaryKey(binaryWindowKey); + maybeForward(entry, key, windowedKey, (InternalProcessorContext) context); + underlying.put(key, entry.newValue(), timestamp); } } }); @@ -102,7 +98,7 @@ class CachingWindowStore<K, V> implements WindowStore<K, V>, CachedStateStore<Wi context.setRecordContext(entry.recordContext()); try { flushListener.apply(windowedKey, - serdes.valueFrom(entry.newValue()), fetchPrevious(key)); + serdes.valueFrom(entry.newValue()), fetchPrevious(key, windowedKey.window().start())); } finally { context.setRecordContext(current); } @@ -128,16 +124,6 @@ class CachingWindowStore<K, V> implements WindowStore<K, V>, CachedStateStore<Wi } @Override - public boolean persistent() { - return underlying.persistent(); - } - - @Override - public boolean isOpen() { - return underlying.isOpen(); - } - - @Override public synchronized void put(final K key, final V value) { put(key, value, context.timestamp()); } @@ -158,23 +144,21 @@ class CachingWindowStore<K, V> implements WindowStore<K, V>, CachedStateStore<Wi byte[] binaryFrom = WindowStoreUtils.toBinaryKey(key, timeFrom, 0, serdes); byte[] binaryTo = WindowStoreUtils.toBinaryKey(key, timeTo, 0, serdes); - final KeyValueIterator<Bytes, byte[]> underlyingIterator = underlying.fetch(Bytes.wrap(serdes.rawKey(key)), timeFrom, timeTo); + final WindowStoreIterator<byte[]> underlyingIterator = underlying.fetch(Bytes.wrap(serdes.rawKey(key)), timeFrom, timeTo); final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(name, binaryFrom, binaryTo); - return new MergedSortedCachedWindowStoreIterator<>(cacheIterator, new DelegatingPeekingKeyValueIterator<>(name, underlyingIterator), serdes); + return new MergedSortedCacheWindowStoreIterator<>(cacheIterator, + underlyingIterator, + new StateSerdes<>(serdes.stateName(), Serdes.Long(), serdes.valueSerde())); } - private V fetchPrevious(final Bytes key) { - final byte[] result = underlying.get(key); - if (result == null) { - return null; - } - return serdes.valueFrom(result); - } - - private void validateStoreOpen() { - if (!isOpen()) { - throw new InvalidStateStoreException("Store " + this.name + " is currently closed"); + private V fetchPrevious(final Bytes key, final long timestamp) { + try (final WindowStoreIterator<byte[]> iter = underlying.fetch(key, timestamp, timestamp)) { + if (!iter.hasNext()) { + return null; + } else { + return serdes.valueFrom(iter.next().value); + } } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/73b7ae00/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java new file mode 100644 index 0000000..e31d04b --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; + +import java.util.List; + +public class ChangeLoggingKeyValueBytesStore extends WrappedStateStore.AbstractWrappedStateStore implements KeyValueStore<Bytes, byte[]> { + private final KeyValueStore<Bytes, byte[]> inner; + private StoreChangeLogger<Bytes, byte[]> changeLogger; + + public ChangeLoggingKeyValueBytesStore(final KeyValueStore<Bytes, byte[]> inner) { + super(inner); + this.inner = inner; + } + + @Override + public void init(final ProcessorContext context, final StateStore root) { + inner.init(context, root); + this.changeLogger = new StoreChangeLogger<>(inner.name(), context, WindowStoreUtils.INNER_SERDES); + } + + + @Override + public void put(final Bytes key, final byte[] value) { + inner.put(key, value); + changeLogger.logChange(key, value); + } + + @Override + public byte[] putIfAbsent(final Bytes key, final byte[] value) { + final byte[] previous = get(key); + if (previous == null) { + put(key, value); + } + return previous; + } + + @Override + public void putAll(final List<KeyValue<Bytes, byte[]>> entries) { + inner.putAll(entries); + for (KeyValue<Bytes, byte[]> entry : entries) { + changeLogger.logChange(entry.key, entry.value); + } + } + + @Override + public byte[] delete(final Bytes key) { + final byte[] oldValue = inner.get(key); + put(key, null); + return oldValue; + } + + @Override + public byte[] get(final Bytes key) { + return inner.get(key); + } + + @Override + public KeyValueIterator<Bytes, byte[]> range(final Bytes from, final Bytes to) { + return inner.range(from, to); + } + + @Override + public KeyValueIterator<Bytes, byte[]> all() { + return inner.all(); + } + + @Override + public long approximateNumEntries() { + return inner.approximateNumEntries(); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/73b7ae00/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStore.java new file mode 100644 index 0000000..11cf802 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStore.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.StateSerdes; + +import java.util.ArrayList; +import java.util.List; + +class ChangeLoggingKeyValueStore<K, V> extends WrappedStateStore.AbstractWrappedStateStore implements KeyValueStore<K, V> { + private final ChangeLoggingKeyValueBytesStore innerBytes; + private final Serde keySerde; + private final Serde valueSerde; + private StateSerdes<K, V> serdes; + + ChangeLoggingKeyValueStore(final KeyValueStore<Bytes, byte[]> bytesStore, + final Serde keySerde, + final Serde valueSerde) { + this(new ChangeLoggingKeyValueBytesStore(bytesStore), keySerde, valueSerde); + } + + private ChangeLoggingKeyValueStore(final ChangeLoggingKeyValueBytesStore bytesStore, + final Serde keySerde, + final Serde valueSerde) { + super(bytesStore); + this.innerBytes = bytesStore; + this.keySerde = keySerde; + this.valueSerde = valueSerde; + } + + @Override + public String name() { + return null; + } + + @SuppressWarnings("unchecked") + @Override + public void init(final ProcessorContext context, final StateStore root) { + innerBytes.init(context, root); + + this.serdes = new StateSerdes<>(innerBytes.name(), + keySerde == null ? (Serde<K>) context.keySerde() : keySerde, + valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde); + } + + @Override + public void put(final K key, final V value) { + final Bytes bytesKey = Bytes.wrap(serdes.rawKey(key)); + final byte[] bytesValue = serdes.rawValue(value); + innerBytes.put(bytesKey, bytesValue); + } + + @Override + public V putIfAbsent(final K key, final V value) { + final V v = get(key); + if (v == null) { + put(key, value); + } + return v; + } + + @Override + public void putAll(final List<KeyValue<K, V>> entries) { + final List<KeyValue<Bytes, byte[]>> keyValues = new ArrayList<>(); + for (final KeyValue<K, V> entry : entries) { + keyValues.add(KeyValue.pair(Bytes.wrap(serdes.rawKey(entry.key)), serdes.rawValue(entry.value))); + } + innerBytes.putAll(keyValues); + } + + @Override + public V delete(final K key) { + final byte[] oldValue = innerBytes.delete(Bytes.wrap(serdes.rawKey(key))); + if (oldValue == null) { + return null; + } + return serdes.valueFrom(oldValue); + } + + @Override + public V get(final K key) { + final byte[] rawValue = innerBytes.get(Bytes.wrap(serdes.rawKey(key))); + if (rawValue == null) { + return null; + } + return serdes.valueFrom(rawValue); + } + + @Override + public KeyValueIterator<K, V> range(final K from, final K to) { + return new SerializedKeyValueIterator<>(innerBytes.range(Bytes.wrap(serdes.rawKey(from)), + Bytes.wrap(serdes.rawKey(to))), + serdes); + } + + @Override + public KeyValueIterator<K, V> all() { + return new SerializedKeyValueIterator<>(innerBytes.all(), serdes); + } + + @Override + public long approximateNumEntries() { + return innerBytes.approximateNumEntries(); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/73b7ae00/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java index 14b8f17..21c2866 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java @@ -25,13 +25,14 @@ import org.apache.kafka.streams.state.KeyValueIterator; * Simple wrapper around a {@link SegmentedBytesStore} to support writing * updates to a changelog */ -class ChangeLoggingSegmentedBytesStore implements SegmentedBytesStore { +class ChangeLoggingSegmentedBytesStore extends WrappedStateStore.AbstractWrappedStateStore implements SegmentedBytesStore { private final SegmentedBytesStore bytesStore; private StoreChangeLogger<Bytes, byte[]> changeLogger; ChangeLoggingSegmentedBytesStore(final SegmentedBytesStore bytesStore) { + super(bytesStore); this.bytesStore = bytesStore; } @@ -60,10 +61,6 @@ class ChangeLoggingSegmentedBytesStore implements SegmentedBytesStore { return bytesStore.get(key); } - @Override - public String name() { - return bytesStore.name(); - } @Override @SuppressWarnings("unchecked") @@ -71,25 +68,4 @@ class ChangeLoggingSegmentedBytesStore implements SegmentedBytesStore { bytesStore.init(context, root); changeLogger = new StoreChangeLogger<>(name(), context, WindowStoreUtils.INNER_SERDES); } - - @Override - public void flush() { - bytesStore.flush(); - } - - @Override - public void close() { - bytesStore.close(); - } - - @Override - public boolean persistent() { - return bytesStore.persistent(); - } - - @Override - public boolean isOpen() { - return bytesStore.isOpen(); - } - } http://git-wip-us.apache.org/repos/asf/kafka/blob/73b7ae00/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java index b33c0f0..e0f1ec8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java @@ -62,6 +62,11 @@ public class CompositeReadOnlyWindowStore<K, V> implements ReadOnlyWindowStore<K } @Override + public Long peekNextKey() { + throw new NoSuchElementException(); + } + + @Override public boolean hasNext() { return false; } http://git-wip-us.apache.org/repos/asf/kafka/blob/73b7ae00/streams/src/main/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIterator.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIterator.java index eb57ace..f3101b1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIterator.java @@ -22,7 +22,7 @@ import org.apache.kafka.streams.state.KeyValueIterator; import java.util.NoSuchElementException; -public class DelegatingPeekingKeyValueIterator<K, V> implements KeyValueIterator<K, V> { +public class DelegatingPeekingKeyValueIterator<K, V> implements KeyValueIterator<K, V>, PeekingKeyValueIterator<K, V> { private final String storeName; private final KeyValueIterator<K, V> underlying; private KeyValue<K, V> next; @@ -78,4 +78,12 @@ public class DelegatingPeekingKeyValueIterator<K, V> implements KeyValueIterator public void remove() { throw new UnsupportedOperationException("remove not supported"); } + + @Override + public KeyValue<K, V> peekNext() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + return next; + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/73b7ae00/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIterator.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIterator.java index c9a6866..b860e16 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIterator.java @@ -21,141 +21,37 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.StateSerdes; -import java.util.Comparator; -import java.util.NoSuchElementException; - /** * Merges two iterators. Assumes each of them is sorted by key * * @param <K> * @param <V> */ -class MergedSortedCacheKeyValueStoreIterator<K, V> implements KeyValueIterator<K, V> { - private final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator; - private final KeyValueIterator<Bytes, byte[]> storeIterator; - private final StateSerdes<K, V> serdes; - private final Comparator<byte[]> comparator = Bytes.BYTES_LEXICO_COMPARATOR; - - public MergedSortedCacheKeyValueStoreIterator(final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator, - final KeyValueIterator<Bytes, byte[]> storeIterator, - final StateSerdes<K, V> serdes) { - this.cacheIterator = cacheIterator; - this.storeIterator = storeIterator; - this.serdes = serdes; - } - - @Override - public boolean hasNext() { - while (cacheIterator.hasNext() && isDeletedCacheEntry(cacheIterator.peekNext())) { - if (storeIterator.hasNext()) { - final Bytes storeKey = storeIterator.peekNextKey(); - // advance the store iterator if the key is the same as the deleted cache key - if (storeKey.equals(cacheIterator.peekNextKey())) { - storeIterator.next(); - } - } - // skip over items deleted from cache - cacheIterator.next(); - } - return cacheIterator.hasNext() || storeIterator.hasNext(); - } +class MergedSortedCacheKeyValueStoreIterator<K, V> extends AbstractMergedSortedCacheStoreIterator<K, Bytes, V> { - - private boolean isDeletedCacheEntry(final KeyValue<Bytes, LRUCacheEntry> nextFromCache) { - return nextFromCache.value.value == null; + MergedSortedCacheKeyValueStoreIterator(final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator, + final KeyValueIterator<Bytes, byte[]> storeIterator, + final StateSerdes<K, V> serdes) { + super(cacheIterator, storeIterator, serdes); } - @Override - public KeyValue<K, V> next() { - - return internalNext(new NextValueFunction<KeyValue<K, V>>() { - @Override - public KeyValue<K, V> apply(final byte[] cacheKey, final byte[] storeKey) { - if (cacheKey == null) { - return nextStoreValue(); - } - - if (storeKey == null) { - return nextCacheValue(); - } - - final int comparison = comparator.compare(cacheKey, storeKey); - if (comparison > 0) { - return nextStoreValue(); - } else if (comparison < 0) { - return nextCacheValue(); - } else { - storeIterator.next(); - return nextCacheValue(); - } - } - }); + public KeyValue<K, V> deserializeStorePair(KeyValue<Bytes, byte[]> pair) { + return KeyValue.pair(serdes.keyFrom(pair.key.get()), serdes.valueFrom(pair.value)); } @Override - public K peekNextKey() { - return internalNext(new NextValueFunction<K>() { - @Override - public K apply(final byte[] cacheKey, final byte[] storeKey) { - if (cacheKey == null) { - return serdes.keyFrom(storeKey); - } - - if (storeKey == null) { - return serdes.keyFrom(cacheKey); - } - - final int comparison = comparator.compare(cacheKey, storeKey); - if (comparison > 0) { - return serdes.keyFrom(storeKey); - } else { - return serdes.keyFrom(cacheKey); - } - } - }); - } - - interface NextValueFunction<T> { - T apply(final byte[] cacheKey, final byte [] storeKey); - } - - private <T> T internalNext(final NextValueFunction<T> nextValueFunction) { - if (!hasNext()) { - throw new NoSuchElementException(); - } - - byte[] nextCacheKey = null; - if (cacheIterator.hasNext()) { - nextCacheKey = cacheIterator.peekNextKey().get(); - } - - byte[] nextStoreKey = null; - if (storeIterator.hasNext()) { - nextStoreKey = storeIterator.peekNextKey().get(); - } - - return nextValueFunction.apply(nextCacheKey, nextStoreKey); - } - - private KeyValue<K, V> nextCacheValue() { - final KeyValue<Bytes, LRUCacheEntry> next = cacheIterator.next(); - return KeyValue.pair(serdes.keyFrom(next.key.get()), serdes.valueFrom(next.value.value)); - } - - private KeyValue<K, V> nextStoreValue() { - final KeyValue<Bytes, byte[]> next = storeIterator.next(); - return KeyValue.pair(serdes.keyFrom(next.key.get()), serdes.valueFrom(next.value)); + K deserializeCacheKey(final Bytes cacheKey) { + return serdes.keyFrom(cacheKey.get()); } @Override - public void remove() { - throw new UnsupportedOperationException("remove not supported"); + public K deserializeStoreKey(Bytes key) { + return serdes.keyFrom(key.get()); } @Override - public void close() { - cacheIterator.close(); - storeIterator.close(); + public int compare(Bytes cacheKey, Bytes storeKey) { + return cacheKey.compareTo(storeKey); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/73b7ae00/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java new file mode 100644 index 0000000..db64621 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.internals.SessionKeySerde; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.StateSerdes; + +/** + * Merges two iterators. Assumes each of them is sorted by key + * + * @param <K> + * @param <AGG> + */ +class MergedSortedCacheSessionStoreIterator<K, AGG> extends AbstractMergedSortedCacheStoreIterator<Windowed<K>, Windowed<Bytes>, AGG> { + private final StateSerdes<K, AGG> rawSerdes; + + + MergedSortedCacheSessionStoreIterator(final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator, + final KeyValueIterator<Windowed<Bytes>, byte[]> storeIterator, + final StateSerdes<K, AGG> serdes) { + super(cacheIterator, storeIterator, new StateSerdes<>(serdes.stateName(), + new SessionKeySerde<>(serdes.keySerde()), + serdes.valueSerde())); + + rawSerdes = serdes; + } + + @Override + public KeyValue<Windowed<K>, AGG> deserializeStorePair(KeyValue<Windowed<Bytes>, byte[]> pair) { + final K key = rawSerdes.keyFrom(pair.key.key().get()); + return KeyValue.pair(new Windowed<>(key, pair.key.window()), serdes.valueFrom(pair.value)); + } + + @Override + Windowed<K> deserializeCacheKey(final Bytes cacheKey) { + return SessionKeySerde.from(cacheKey.get(), rawSerdes.keyDeserializer()); + } + + @Override + public Windowed<K> deserializeStoreKey(Windowed<Bytes> key) { + final K originalKey = rawSerdes.keyFrom(key.key().get()); + return new Windowed<K>(originalKey, key.window()); + } + + @Override + public int compare(Bytes cacheKey, Windowed<Bytes> storeKey) { + Bytes storeKeyBytes = SessionKeySerde.bytesToBinary(storeKey); + return cacheKey.compareTo(storeKeyBytes); + } +} + http://git-wip-us.apache.org/repos/asf/kafka/blob/73b7ae00/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIterator.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIterator.java new file mode 100644 index 0000000..a9d0973 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIterator.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.StateSerdes; +import org.apache.kafka.streams.state.WindowStoreIterator; + +/** + * Merges two iterators. Assumes each of them is sorted by key + * + * @param <V> + */ +class MergedSortedCacheWindowStoreIterator<V> extends AbstractMergedSortedCacheStoreIterator<Long, Long, V> implements WindowStoreIterator<V> { + + MergedSortedCacheWindowStoreIterator(final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator, + final KeyValueIterator<Long, byte[]> storeIterator, + final StateSerdes<Long, V> serdes) { + super(cacheIterator, storeIterator, serdes); + } + + @Override + public KeyValue<Long, V> deserializeStorePair(final KeyValue<Long, byte[]> pair) { + return KeyValue.pair(pair.key, serdes.valueFrom(pair.value)); + } + + @Override + Long deserializeCacheKey(final Bytes cacheKey) { + return WindowStoreUtils.timestampFromBinaryKey(cacheKey.get()); + } + + @Override + public Long deserializeStoreKey(final Long key) { + return key; + } + + @Override + public int compare(final Bytes cacheKey, final Long storeKey) { + final Long cacheTimestamp = WindowStoreUtils.timestampFromBinaryKey(cacheKey.get()); + return cacheTimestamp.compareTo(storeKey); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/73b7ae00/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCachedWindowStoreIterator.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCachedWindowStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCachedWindowStoreIterator.java deleted file mode 100644 index e210e73..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCachedWindowStoreIterator.java +++ /dev/null @@ -1,107 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.state.internals; - -import org.apache.kafka.common.utils.Bytes; -import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.state.KeyValueIterator; -import org.apache.kafka.streams.state.StateSerdes; -import org.apache.kafka.streams.state.WindowStoreIterator; - -import java.util.NoSuchElementException; - -/** - * Merges two iterators. Assumes each of them is sorted by key - * - * @param <K> - * @param <V> - */ -class MergedSortedCachedWindowStoreIterator<K, V> implements WindowStoreIterator<V> { - private final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator; - private final KeyValueIterator<Bytes, byte[]> storeIterator; - private final StateSerdes<K, V> serdes; - - public MergedSortedCachedWindowStoreIterator(final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator, - final KeyValueIterator<Bytes, byte[]> storeIterator, - final StateSerdes<K, V> serdes) { - this.cacheIterator = cacheIterator; - this.storeIterator = storeIterator; - this.serdes = serdes; - } - - @Override - public boolean hasNext() { - return cacheIterator.hasNext() || storeIterator.hasNext(); - } - - - @Override - public KeyValue<Long, V> next() { - if (!hasNext()) { - throw new NoSuchElementException(); - } - - Long nextCacheTimestamp = null; - if (cacheIterator.hasNext()) { - nextCacheTimestamp = WindowStoreUtils.timestampFromBinaryKey(cacheIterator.peekNextKey().get()); - } - - Long nextStoreTimestamp = null; - if (storeIterator.hasNext()) { - nextStoreTimestamp = WindowStoreUtils.timestampFromBinaryKey(storeIterator.peekNextKey().get()); - } - - if (nextCacheTimestamp == null) { - return nextStoreValue(nextStoreTimestamp); - } - - if (nextStoreTimestamp == null) { - return nextCacheValue(nextCacheTimestamp); - } - - final int comparison = nextCacheTimestamp.compareTo(nextStoreTimestamp); - if (comparison > 0) { - return nextStoreValue(nextStoreTimestamp); - } else if (comparison < 0) { - return nextCacheValue(nextCacheTimestamp); - } else { - storeIterator.next(); - return nextCacheValue(nextCacheTimestamp); - } - } - - private KeyValue<Long, V> nextCacheValue(final Long timestamp) { - final KeyValue<Bytes, LRUCacheEntry> next = cacheIterator.next(); - return KeyValue.pair(timestamp, serdes.valueFrom(next.value.value)); - } - - private KeyValue<Long, V> nextStoreValue(final Long timestamp) { - final KeyValue<Bytes, byte[]> next = storeIterator.next(); - return KeyValue.pair(timestamp, serdes.valueFrom(next.value)); - } - - @Override - public void remove() { - throw new UnsupportedOperationException("remove not supported"); - } - - @Override - public void close() { - cacheIterator.close(); - storeIterator.close(); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/73b7ae00/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java index 926e5d4..7dc2d33 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java @@ -35,7 +35,7 @@ import java.util.List; * @param <K> * @param <V> */ -public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> { +public class MeteredKeyValueStore<K, V> extends WrappedStateStore.AbstractWrappedStateStore implements KeyValueStore<K, V> { protected final KeyValueStore<K, V> inner; protected final String metricScope; @@ -102,18 +102,16 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> { }; // always wrap the store with the metered store - public MeteredKeyValueStore(final KeyValueStore<K, V> inner, String metricScope, Time time) { + public MeteredKeyValueStore(final KeyValueStore<K, V> inner, + final String metricScope, + final Time time) { + super(inner); this.inner = inner; this.metricScope = metricScope; this.time = time != null ? time : Time.SYSTEM; } @Override - public String name() { - return inner.name(); - } - - @Override public void init(ProcessorContext context, StateStore root) { final String name = name(); this.context = context; @@ -134,16 +132,6 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> { } @Override - public boolean persistent() { - return inner.persistent(); - } - - @Override - public boolean isOpen() { - return inner.isOpen(); - } - - @Override public V get(K key) { this.key = key; metrics.measureLatencyNs(time, getDelegate, this.getTime); @@ -194,11 +182,6 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> { } @Override - public void close() { - inner.close(); - } - - @Override public void flush() { metrics.measureLatencyNs(time, flushDelegate, this.flushTime); } http://git-wip-us.apache.org/repos/asf/kafka/blob/73b7ae00/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStore.java index e0ed03e..4eb3936 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStore.java @@ -27,7 +27,7 @@ import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.state.KeyValueIterator; -class MeteredSegmentedBytesStore implements SegmentedBytesStore { +class MeteredSegmentedBytesStore extends WrappedStateStore.AbstractWrappedStateStore implements SegmentedBytesStore { private final SegmentedBytesStore inner; private final String metricScope; @@ -40,18 +40,16 @@ class MeteredSegmentedBytesStore implements SegmentedBytesStore { private Sensor getTime; private Sensor removeTime; - MeteredSegmentedBytesStore(final SegmentedBytesStore inner, String metricScope, Time time) { + MeteredSegmentedBytesStore(final SegmentedBytesStore inner, + final String metricScope, + final Time time) { + super(inner); this.inner = inner; this.metricScope = metricScope; this.time = time != null ? time : new SystemTime(); } @Override - public String name() { - return inner.name(); - } - - @Override public void init(ProcessorContext context, StateStore root) { final String name = name(); this.metrics = context.metrics(); @@ -72,16 +70,6 @@ class MeteredSegmentedBytesStore implements SegmentedBytesStore { } @Override - public boolean persistent() { - return inner.persistent(); - } - - @Override - public boolean isOpen() { - return inner.isOpen(); - } - - @Override public byte[] get(final Bytes key) { final long startNs = time.nanoseconds(); try { @@ -117,11 +105,6 @@ class MeteredSegmentedBytesStore implements SegmentedBytesStore { } @Override - public void close() { - inner.close(); - } - - @Override public void flush() { final long startNs = time.nanoseconds(); try { http://git-wip-us.apache.org/repos/asf/kafka/blob/73b7ae00/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java deleted file mode 100644 index c725c1a..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java +++ /dev/null @@ -1,180 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.state.internals; - -import org.apache.kafka.common.metrics.Sensor; -import org.apache.kafka.common.utils.Time; -import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.processor.StateStore; -import org.apache.kafka.streams.processor.internals.StreamsMetricsImpl; -import org.apache.kafka.streams.state.WindowStore; -import org.apache.kafka.streams.state.WindowStoreIterator; - -public class MeteredWindowStore<K, V> implements WindowStore<K, V> { - - protected final WindowStore<K, V> inner; - protected final String metricScope; - protected final Time time; - - private Sensor putTime; - private Sensor fetchTime; - private Sensor flushTime; - private Sensor restoreTime; - private StreamsMetricsImpl metrics; - - private ProcessorContext context; - private StateStore root; - private Runnable initDelegate = new Runnable() { - @Override - public void run() { - inner.init(context, root); - } - }; - - private K key; - private V value; - private long timestamp; - private Runnable putDelegate = new Runnable() { - @Override - public void run() { - inner.put(key, value); - } - }; - private Runnable putTsDelegate = new Runnable() { - @Override - public void run() { - inner.put(key, value, timestamp); - } - }; - private Runnable flushDelegate = new Runnable() { - @Override - public void run() { - inner.flush(); - } - }; - - // always wrap the store with the metered store - public MeteredWindowStore(final WindowStore<K, V> inner, String metricScope, Time time) { - this.inner = inner; - this.metricScope = metricScope; - this.time = time != null ? time : Time.SYSTEM; - } - - @Override - public String name() { - return inner.name(); - } - - @Override - public void init(ProcessorContext context, StateStore root) { - final String name = name(); - this.context = context; - this.root = root; - this.metrics = (StreamsMetricsImpl) context.metrics(); - this.putTime = this.metrics.addLatencySensor(metricScope, name, "put", Sensor.RecordingLevel.DEBUG); - this.fetchTime = this.metrics.addLatencySensor(metricScope, name, "fetch", Sensor.RecordingLevel.DEBUG); - this.flushTime = this.metrics.addLatencySensor(metricScope, name, "flush", Sensor.RecordingLevel.DEBUG); - this.restoreTime = this.metrics.addLatencySensor(metricScope, name, "restore", Sensor.RecordingLevel.DEBUG); - - // register and possibly restore the state from the logs - metrics.measureLatencyNs(time, initDelegate, this.restoreTime); - } - - @Override - public boolean persistent() { - return inner.persistent(); - } - - @Override - public boolean isOpen() { - return inner.isOpen(); - } - - @Override - public WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo) { - return new MeteredWindowStoreIterator<>(this.inner.fetch(key, timeFrom, timeTo), this.fetchTime); - } - - @Override - public void put(K key, V value) { - this.key = key; - this.value = value; - metrics.measureLatencyNs(time, putDelegate, this.putTime); - } - - @Override - public void put(K key, V value, long timestamp) { - this.key = key; - this.value = value; - this.timestamp = timestamp; - metrics.measureLatencyNs(time, putTsDelegate, this.putTime); - } - - @Override - public void close() { - inner.close(); - } - - @Override - public void flush() { - metrics.measureLatencyNs(time, flushDelegate, this.flushTime); - } - - private class MeteredWindowStoreIterator<E> implements WindowStoreIterator<E> { - - private final WindowStoreIterator<E> iter; - private final Sensor sensor; - private final long startNs; - - public MeteredWindowStoreIterator(WindowStoreIterator<E> iter, Sensor sensor) { - this.iter = iter; - this.sensor = sensor; - this.startNs = time.nanoseconds(); - } - - @Override - public boolean hasNext() { - return iter.hasNext(); - } - - @Override - public KeyValue<Long, E> next() { - return iter.next(); - } - - @Override - public void remove() { - iter.remove(); - } - - @Override - public void close() { - try { - iter.close(); - } finally { - metrics.recordLatency(this.sensor, this.startNs, time.nanoseconds()); - } - } - - } - - WindowStore<K, V> inner() { - return inner; - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/73b7ae00/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java index 164b352..b72bbed 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java @@ -35,28 +35,54 @@ import java.util.Map; public class RocksDBKeyValueStoreSupplier<K, V> extends AbstractStoreSupplier<K, V, KeyValueStore> { - private final boolean enableCaching; + private static final String METRICS_SCOPE = "rocksdb-state"; + private final boolean cached; - public RocksDBKeyValueStoreSupplier(String name, Serde<K> keySerde, Serde<V> valueSerde, boolean logged, Map<String, String> logConfig, boolean enableCaching) { - this(name, keySerde, valueSerde, null, logged, logConfig, enableCaching); + public RocksDBKeyValueStoreSupplier(String name, Serde<K> keySerde, Serde<V> valueSerde, boolean logged, Map<String, String> logConfig, boolean cached) { + this(name, keySerde, valueSerde, null, logged, logConfig, cached); } - public RocksDBKeyValueStoreSupplier(String name, Serde<K> keySerde, Serde<V> valueSerde, Time time, boolean logged, Map<String, String> logConfig, boolean enableCaching) { + public RocksDBKeyValueStoreSupplier(String name, Serde<K> keySerde, Serde<V> valueSerde, Time time, boolean logged, Map<String, String> logConfig, boolean cached) { super(name, keySerde, valueSerde, time, logged, logConfig); - this.enableCaching = enableCaching; + this.cached = cached; } public KeyValueStore get() { - if (!enableCaching) { - RocksDBStore<K, V> store = new RocksDBStore<>(name, keySerde, valueSerde); - return new MeteredKeyValueStore<>(logged ? store.enableLogging() : store, "rocksdb-state", time); + if (!cached && !logged) { + return new MeteredKeyValueStore<>( + new RocksDBStore<>(name, keySerde, valueSerde), METRICS_SCOPE, time); + } + + // when cached, logged, or both we use a bytes store as the inner most store + final RocksDBStore<Bytes, byte[]> rocks = new RocksDBStore<>(name, + Serdes.Bytes(), + Serdes.ByteArray()); + + if (cached && logged) { + return new CachingKeyValueStore<>( + new MeteredKeyValueStore<>( + new ChangeLoggingKeyValueBytesStore(rocks), + METRICS_SCOPE, + time), + keySerde, + valueSerde); + } + + if (cached) { + return new CachingKeyValueStore<>( + new MeteredKeyValueStore<>(rocks, METRICS_SCOPE, time), + keySerde, + valueSerde); + + } else { + // logged + return new MeteredKeyValueStore<>( + new ChangeLoggingKeyValueStore<>(rocks, keySerde, valueSerde), + METRICS_SCOPE, + time); } - final RocksDBStore<Bytes, byte[]> store = new RocksDBStore<>(name, Serdes.Bytes(), Serdes.ByteArray()); - return new CachingKeyValueStore<>(new MeteredKeyValueStore<>(logged ? store.enableLogging() : store, - "rocksdb-state", - time), - keySerde, - valueSerde); } + + } http://git-wip-us.apache.org/repos/asf/kafka/blob/73b7ae00/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java index 7645472..10ebf65 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -18,6 +18,8 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.state.SessionStore; @@ -35,13 +37,14 @@ import java.util.Map; public class RocksDBSessionStoreSupplier<K, V> extends AbstractStoreSupplier<K, V, SessionStore> implements WindowStoreSupplier<SessionStore> { private static final int NUM_SEGMENTS = 3; + public static final String METRIC_SCOPE = "rocksdb-session-store"; private final long retentionPeriod; - private final boolean enableCaching; + private final boolean cached; - public RocksDBSessionStoreSupplier(String name, long retentionPeriod, Serde<K> keySerde, Serde<V> valueSerde, boolean logged, Map<String, String> logConfig, boolean enableCaching) { + public RocksDBSessionStoreSupplier(String name, long retentionPeriod, Serde<K> keySerde, Serde<V> valueSerde, boolean logged, Map<String, String> logConfig, boolean cached) { super(name, keySerde, valueSerde, Time.SYSTEM, logged, logConfig); this.retentionPeriod = retentionPeriod; - this.enableCaching = enableCaching; + this.cached = cached; } public String name() { @@ -49,16 +52,41 @@ public class RocksDBSessionStoreSupplier<K, V> extends AbstractStoreSupplier<K, } public SessionStore<K, V> get() { - final RocksDBSegmentedBytesStore bytesStore = new RocksDBSegmentedBytesStore(name, + final SessionKeySchema keySchema = new SessionKeySchema(); + final RocksDBSegmentedBytesStore segmented = new RocksDBSegmentedBytesStore(name, retentionPeriod, NUM_SEGMENTS, - new SessionKeySchema()); - final MeteredSegmentedBytesStore metered = new MeteredSegmentedBytesStore(logged ? new ChangeLoggingSegmentedBytesStore(bytesStore) - : bytesStore, "rocksdb-session-store", time); - if (enableCaching) { - return new CachingSessionStore<>(metered, keySerde, valueSerde); + keySchema + ); + + if (cached && logged) { + final ChangeLoggingSegmentedBytesStore logged = new ChangeLoggingSegmentedBytesStore(segmented); + final MeteredSegmentedBytesStore metered = new MeteredSegmentedBytesStore(logged, + METRIC_SCOPE, time); + final RocksDBSessionStore<Bytes, byte[]> sessionStore + = new RocksDBSessionStore<>(metered, Serdes.Bytes(), Serdes.ByteArray()); + + return new CachingSessionStore<>(sessionStore, keySerde, valueSerde); + } + + if (cached) { + final MeteredSegmentedBytesStore metered = new MeteredSegmentedBytesStore(segmented, + METRIC_SCOPE, time); + final RocksDBSessionStore<Bytes, byte[]> sessionStore + = new RocksDBSessionStore<>(metered, Serdes.Bytes(), Serdes.ByteArray()); + + return new CachingSessionStore<>(sessionStore, keySerde, valueSerde); } - return new RocksDBSessionStore<>(metered, keySerde, valueSerde); + + if (logged) { + final ChangeLoggingSegmentedBytesStore logged = new ChangeLoggingSegmentedBytesStore(segmented); + final MeteredSegmentedBytesStore metered = new MeteredSegmentedBytesStore(logged, + METRIC_SCOPE, time); + return new RocksDBSessionStore<>(metered, keySerde, valueSerde); + } + + return new RocksDBSessionStore<>( + new MeteredSegmentedBytesStore(segmented, METRIC_SCOPE, time), keySerde, valueSerde); } http://git-wip-us.apache.org/repos/asf/kafka/blob/73b7ae00/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index 8b838d0..3f8d509 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -92,24 +92,20 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { private WriteOptions wOptions; private FlushOptions fOptions; - private boolean loggingEnabled = false; - - private StoreChangeLogger<Bytes, byte[]> changeLogger; - protected volatile boolean open = false; - public KeyValueStore<K, V> enableLogging() { - loggingEnabled = true; - - return this; - } - public RocksDBStore(String name, Serde<K> keySerde, Serde<V> valueSerde) { + public RocksDBStore(final String name, + final Serde<K> keySerde, + final Serde<V> valueSerde) { this(name, DB_FILE_DIR, keySerde, valueSerde); } - public RocksDBStore(String name, String parentDir, Serde<K> keySerde, Serde<V> valueSerde) { + public RocksDBStore(final String name, + final String parentDir, + final Serde<K> keySerde, + final Serde<V> valueSerde) { this.name = name; this.parentDir = parentDir; this.keySerde = keySerde; @@ -159,10 +155,9 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { // open the DB dir openDB(context); - this.changeLogger = this.loggingEnabled ? new StoreChangeLogger<>(name, context, WindowStoreUtils.INNER_SERDES) : null; // value getter should always read directly from rocksDB // since it is only for values that are already flushed - context.register(root, loggingEnabled, new StateRestoreCallback() { + context.register(root, false, new StateRestoreCallback() { @Override public void restore(byte[] key, byte[] value) { @@ -235,10 +230,6 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { byte[] rawKey = serdes.rawKey(key); byte[] rawValue = serdes.rawValue(value); putInternal(rawKey, rawValue); - - if (loggingEnabled) { - changeLogger.logChange(Bytes.wrap(rawKey), rawValue); - } } @Override @@ -278,9 +269,6 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { } else { final byte[] value = serdes.rawValue(entry.value); batch.put(rawKey, value); - if (loggingEnabled) { - changeLogger.logChange(Bytes.wrap(rawKey), value); - } } } db.write(wOptions, batch); http://git-wip-us.apache.org/repos/asf/kafka/blob/73b7ae00/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java index a2a420e..80c4796 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java @@ -20,6 +20,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; @@ -33,7 +34,6 @@ import java.util.NoSuchElementException; class RocksDBWindowStore<K, V> implements WindowStore<K, V> { - private final String name; private final SegmentedBytesStore bytesStore; private final boolean retainDuplicates; private final Serde<K> keySerde; @@ -43,8 +43,15 @@ class RocksDBWindowStore<K, V> implements WindowStore<K, V> { private StateSerdes<K, V> serdes; - RocksDBWindowStore(String name, boolean retainDuplicates, Serde<K> keySerde, Serde<V> valueSerde, final SegmentedBytesStore bytesStore) { - this.name = name; + static RocksDBWindowStore<Bytes, byte[]> bytesStore(final SegmentedBytesStore inner, final boolean retainDuplicates) { + return new RocksDBWindowStore<>(inner, Serdes.Bytes(), Serdes.ByteArray(), retainDuplicates); + } + + + RocksDBWindowStore(final SegmentedBytesStore bytesStore, + final Serde<K> keySerde, + final Serde<V> valueSerde, + final boolean retainDuplicates) { this.keySerde = keySerde; this.valueSerde = valueSerde; this.retainDuplicates = retainDuplicates; @@ -54,7 +61,7 @@ class RocksDBWindowStore<K, V> implements WindowStore<K, V> { @Override public String name() { - return name; + return bytesStore.name(); } @Override @@ -62,7 +69,7 @@ class RocksDBWindowStore<K, V> implements WindowStore<K, V> { public void init(final ProcessorContext context, final StateStore root) { this.context = context; // construct the serde - this.serdes = new StateSerdes<>(name, + this.serdes = new StateSerdes<>(bytesStore.name(), keySerde == null ? (Serde<K>) context.keySerde() : keySerde, valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde); @@ -147,6 +154,14 @@ class RocksDBWindowStore<K, V> implements WindowStore<K, V> { public void close() { actual.close(); } + + @Override + public Long peekNextKey() { + if (!actual.hasNext()) { + throw new NoSuchElementException(); + } + return WindowStoreUtils.timestampFromBinaryKey(actual.peekNextKey().get()); + } } }