This is an automated email from the ASF dual-hosted git repository. bbejeck pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new a8f2307 KAFKA-7918: Inline generic parameters Pt. II: RocksDB Bytes Store and Memory LRU Caches (#6327) a8f2307 is described below commit a8f2307164ce0f1f47c458eee8f54173f7218a16 Author: A. Sophie Blee-Goldman <ableegold...@gmail.com> AuthorDate: Wed Feb 27 07:08:08 2019 -0800 KAFKA-7918: Inline generic parameters Pt. II: RocksDB Bytes Store and Memory LRU Caches (#6327) Second PR in series to inline the generic parameters of the following bytes stores Reviewers: Matthias J. Sax <mj...@apache.org>, Bill Bejeck <bbej...@gmail.com> --- .../org/apache/kafka/streams/state/Stores.java | 3 +- .../internals/ChangeLoggingKeyValueBytesStore.java | 2 +- .../streams/state/internals/MemoryLRUCache.java | 76 +++++++--------------- .../state/internals/MemoryNavigableLRUCache.java | 39 +++++------ .../state/internals/RocksDBSessionStore.java | 68 +++++++------------ .../state/internals/RocksDBWindowStore.java | 53 ++++++--------- .../RocksDbSessionBytesStoreSupplier.java | 3 +- .../internals/RocksDbWindowBytesStoreSupplier.java | 11 ++-- .../internals/WindowStoreIteratorWrapper.java | 47 ++++--------- .../internals/WrappedSessionStoreIterator.java | 17 ++--- .../state/internals/CachingSessionStoreTest.java | 2 +- .../state/internals/CachingWindowStoreTest.java | 4 +- .../state/internals/RocksDBSessionStoreTest.java | 35 ++++------ .../state/internals/RocksDBWindowStoreTest.java | 18 ----- 14 files changed, 126 insertions(+), 252 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java index 46a9d45..113e531 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java @@ -18,7 +18,6 @@ package org.apache.kafka.streams.state; import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.common.serialization.Serde; -import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.internals.ApiUtils; @@ -134,7 +133,7 @@ public class Stores { @Override public KeyValueStore<Bytes, byte[]> get() { - return new MemoryNavigableLRUCache<>(name, maxCacheSize, Serdes.Bytes(), Serdes.ByteArray()); + return new MemoryNavigableLRUCache(name, maxCacheSize); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java index 7567e78..aa931bf 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java @@ -44,7 +44,7 @@ public class ChangeLoggingKeyValueBytesStore extends WrappedStateStore<KeyValueS // if the inner store is an LRU cache, add the eviction listener to log removed record if (wrapped() instanceof MemoryLRUCache) { - ((MemoryLRUCache<Bytes, byte[]>) wrapped()).setWhenEldestRemoved((key, value) -> { + ((MemoryLRUCache) wrapped()).setWhenEldestRemoved((key, value) -> { // pass null to indicate removal changeLogger.logChange(key, null); }); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java index f0c3c8c..d69df13 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java @@ -16,14 +16,12 @@ */ package org.apache.kafka.streams.state.internals; -import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; -import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.streams.state.StateSerdes; import java.util.LinkedHashMap; import java.util.List; @@ -32,46 +30,31 @@ import java.util.Objects; /** * An in-memory LRU cache store based on HashSet and HashMap. - * - * * Note that the use of array-typed keys is discouraged because they result in incorrect ordering behavior. - * If you intend to work on byte arrays as key, for example, you may want to wrap them with the {@code Bytes} class, - * i.e. use {@code RocksDBStore<Bytes, ...>} rather than {@code RocksDBStore<byte[], ...>}. - * - * @param <K> The key type - * @param <V> The value type */ -public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> { +public class MemoryLRUCache implements KeyValueStore<Bytes, byte[]> { - public interface EldestEntryRemovalListener<K, V> { - void apply(K key, V value); + public interface EldestEntryRemovalListener { + void apply(Bytes key, byte[] value); } - private final Serde<K> keySerde; - private final Serde<V> valueSerde; private final String name; - protected final Map<K, V> map; + protected final Map<Bytes, byte[]> map; - private StateSerdes<K, V> serdes; private boolean restoring = false; // TODO: this is a sub-optimal solution to avoid logging during restoration. // in the future we should augment the StateRestoreCallback with onComplete etc to better resolve this. private volatile boolean open = true; - private EldestEntryRemovalListener<K, V> listener; + private EldestEntryRemovalListener listener; - MemoryLRUCache(final String name, - final int maxCacheSize, - final Serde<K> keySerde, - final Serde<V> valueSerde) { + MemoryLRUCache(final String name, final int maxCacheSize) { this.name = name; - this.keySerde = keySerde; - this.valueSerde = valueSerde; // leave room for one extra entry to handle adding an entry before the oldest can be removed - this.map = new LinkedHashMap<K, V>(maxCacheSize + 1, 1.01f, true) { + this.map = new LinkedHashMap<Bytes, byte[]>(maxCacheSize + 1, 1.01f, true) { private static final long serialVersionUID = 1L; @Override - protected boolean removeEldestEntry(final Map.Entry<K, V> eldest) { + protected boolean removeEldestEntry(final Map.Entry<Bytes, byte[]> eldest) { final boolean evict = super.size() > maxCacheSize; if (evict && !restoring && listener != null) { listener.apply(eldest.getKey(), eldest.getValue()); @@ -81,7 +64,7 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> { }; } - void setWhenEldestRemoved(final EldestEntryRemovalListener<K, V> listener) { + void setWhenEldestRemoved(final EldestEntryRemovalListener listener) { this.listener = listener; } @@ -91,24 +74,12 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> { } @Override - @SuppressWarnings("unchecked") - public void init(final ProcessorContext context, - final StateStore root) { - // construct the serde - this.serdes = new StateSerdes<>( - ProcessorStateManager.storeChangelogTopic(context.applicationId(), name), - keySerde == null ? (Serde<K>) context.keySerde() : keySerde, - valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde); + public void init(final ProcessorContext context, final StateStore root) { // register the store context.register(root, (key, value) -> { restoring = true; - // check value for null, to avoid deserialization error. - if (value == null) { - delete(serdes.keyFrom(key)); - } else { - put(serdes.keyFrom(key), serdes.valueFrom(value)); - } + put(Bytes.wrap(key), value); restoring = false; }); } @@ -124,28 +95,26 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> { } @Override - public synchronized V get(final K key) { + public synchronized byte[] get(final Bytes key) { Objects.requireNonNull(key); return this.map.get(key); } @Override - public synchronized void put(final K key, - final V value) { + public synchronized void put(final Bytes key, final byte[] value) { Objects.requireNonNull(key); if (value == null) { - this.map.remove(key); + delete(key); } else { this.map.put(key, value); } } @Override - public synchronized V putIfAbsent(final K key, - final V value) { + public synchronized byte[] putIfAbsent(final Bytes key, final byte[] value) { Objects.requireNonNull(key); - final V originalValue = get(key); + final byte[] originalValue = get(key); if (originalValue == null) { put(key, value); } @@ -153,14 +122,14 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> { } @Override - public void putAll(final List<KeyValue<K, V>> entries) { - for (final KeyValue<K, V> entry : entries) { + public void putAll(final List<KeyValue<Bytes, byte[]>> entries) { + for (final KeyValue<Bytes, byte[]> entry : entries) { put(entry.key, entry.value); } } @Override - public synchronized V delete(final K key) { + public synchronized byte[] delete(final Bytes key) { Objects.requireNonNull(key); return this.map.remove(key); } @@ -169,8 +138,7 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> { * @throws UnsupportedOperationException at every invocation */ @Override - public KeyValueIterator<K, V> range(final K from, - final K to) { + public KeyValueIterator<Bytes, byte[]> range(final Bytes from, final Bytes to) { throw new UnsupportedOperationException("MemoryLRUCache does not support range() function."); } @@ -178,7 +146,7 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> { * @throws UnsupportedOperationException at every invocation */ @Override - public KeyValueIterator<K, V> all() { + public KeyValueIterator<Bytes, byte[]> all() { throw new UnsupportedOperationException("MemoryLRUCache does not support all() function."); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java index d7b7b11..c3cc834 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java @@ -16,7 +16,7 @@ */ package org.apache.kafka.streams.state.internals; -import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.state.KeyValueIterator; @@ -24,36 +24,37 @@ import java.util.Iterator; import java.util.Map; import java.util.TreeMap; -public class MemoryNavigableLRUCache<K, V> extends MemoryLRUCache<K, V> { +public class MemoryNavigableLRUCache extends MemoryLRUCache { - - public MemoryNavigableLRUCache(final String name, final int maxCacheSize, final Serde<K> keySerde, final Serde<V> valueSerde) { - super(name, maxCacheSize, keySerde, valueSerde); + public MemoryNavigableLRUCache(final String name, final int maxCacheSize) { + super(name, maxCacheSize); } @Override - public KeyValueIterator<K, V> range(final K from, final K to) { - final TreeMap<K, V> treeMap = toTreeMap(); - return new DelegatingPeekingKeyValueIterator<>(name(), new MemoryNavigableLRUCache.CacheIterator<>(treeMap.navigableKeySet().subSet(from, true, to, true).iterator(), treeMap)); + public KeyValueIterator<Bytes, byte[]> range(final Bytes from, final Bytes to) { + final TreeMap<Bytes, byte[]> treeMap = toTreeMap(); + return new DelegatingPeekingKeyValueIterator<>(name(), + new MemoryNavigableLRUCache.CacheIterator(treeMap.navigableKeySet() + .subSet(from, true, to, true).iterator(), treeMap)); } @Override - public KeyValueIterator<K, V> all() { - final TreeMap<K, V> treeMap = toTreeMap(); - return new MemoryNavigableLRUCache.CacheIterator<>(treeMap.navigableKeySet().iterator(), treeMap); + public KeyValueIterator<Bytes, byte[]> all() { + final TreeMap<Bytes, byte[]> treeMap = toTreeMap(); + return new MemoryNavigableLRUCache.CacheIterator(treeMap.navigableKeySet().iterator(), treeMap); } - private synchronized TreeMap<K, V> toTreeMap() { + private synchronized TreeMap<Bytes, byte[]> toTreeMap() { return new TreeMap<>(this.map); } - private static class CacheIterator<K, V> implements KeyValueIterator<K, V> { - private final Iterator<K> keys; - private final Map<K, V> entries; - private K lastKey; + private static class CacheIterator implements KeyValueIterator<Bytes, byte[]> { + private final Iterator<Bytes> keys; + private final Map<Bytes, byte[]> entries; + private Bytes lastKey; - public CacheIterator(final Iterator<K> keys, final Map<K, V> entries) { + private CacheIterator(final Iterator<Bytes> keys, final Map<Bytes, byte[]> entries) { this.keys = keys; this.entries = entries; } @@ -64,7 +65,7 @@ public class MemoryNavigableLRUCache<K, V> extends MemoryLRUCache<K, V> { } @Override - public KeyValue<K, V> next() { + public KeyValue<Bytes, byte[]> next() { lastKey = keys.next(); return new KeyValue<>(lastKey, entries.get(lastKey)); } @@ -80,7 +81,7 @@ public class MemoryNavigableLRUCache<K, V> extends MemoryLRUCache<K, V> { } @Override - public K peekNextKey() { + public Bytes peekNextKey() { throw new UnsupportedOperationException("peekNextKey not supported"); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java index d855442..c9ca423 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java @@ -16,90 +16,66 @@ */ package org.apache.kafka.streams.state.internals; -import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.kstream.Windowed; -import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.processor.StateStore; -import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.SessionStore; -import org.apache.kafka.streams.state.StateSerdes; -public class RocksDBSessionStore<K, AGG> extends WrappedStateStore<SegmentedBytesStore> implements SessionStore<K, AGG> { +public class RocksDBSessionStore extends WrappedStateStore<SegmentedBytesStore> implements SessionStore<Bytes, byte[]> { - private final Serde<K> keySerde; - private final Serde<AGG> aggSerde; - - private StateSerdes<K, AGG> serdes; - private String topic; - - RocksDBSessionStore(final SegmentedBytesStore bytesStore, - final Serde<K> keySerde, - final Serde<AGG> aggSerde) { + RocksDBSessionStore(final SegmentedBytesStore bytesStore) { super(bytesStore); - this.keySerde = keySerde; - this.aggSerde = aggSerde; - } - - @Override - @SuppressWarnings("unchecked") - public void init(final ProcessorContext context, final StateStore root) { - final String storeName = name(); - topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName); - - serdes = new StateSerdes<>( - topic, - keySerde == null ? (Serde<K>) context.keySerde() : keySerde, - aggSerde == null ? (Serde<AGG>) context.valueSerde() : aggSerde); - - super.init(context, root); } @Override - public KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, final long earliestSessionEndTime, final long latestSessionStartTime) { + public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final Bytes key, + final long earliestSessionEndTime, + final long latestSessionStartTime) { final KeyValueIterator<Bytes, byte[]> bytesIterator = wrapped().fetch( - Bytes.wrap(serdes.rawKey(key)), + key, earliestSessionEndTime, latestSessionStartTime ); - return new WrappedSessionStoreIterator<>(bytesIterator, serdes); + return new WrappedSessionStoreIterator(bytesIterator); } @Override - public KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom, final K keyTo, final long earliestSessionEndTime, final long latestSessionStartTime) { + public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final Bytes keyFrom, + final Bytes keyTo, + final long earliestSessionEndTime, + final long latestSessionStartTime) { final KeyValueIterator<Bytes, byte[]> bytesIterator = wrapped().fetch( - Bytes.wrap(serdes.rawKey(keyFrom)), - Bytes.wrap(serdes.rawKey(keyTo)), + keyFrom, + keyTo, earliestSessionEndTime, latestSessionStartTime ); - return new WrappedSessionStoreIterator<>(bytesIterator, serdes); + return new WrappedSessionStoreIterator(bytesIterator); } @Override - public AGG fetchSession(final K key, final long startTime, final long endTime) { - return serdes.valueFrom(wrapped().get(SessionKeySchema.toBinary(Bytes.wrap(serdes.rawKey(key)), startTime, endTime))); + public byte[] fetchSession(final Bytes key, final long startTime, final long endTime) { + return wrapped().get(SessionKeySchema.toBinary(key, startTime, endTime)); } @Override - public KeyValueIterator<Windowed<K>, AGG> fetch(final K key) { + public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes key) { return findSessions(key, 0, Long.MAX_VALUE); } @Override - public KeyValueIterator<Windowed<K>, AGG> fetch(final K from, final K to) { + public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes from, final Bytes to) { return findSessions(from, to, 0, Long.MAX_VALUE); } @Override - public void remove(final Windowed<K> key) { - wrapped().remove(Bytes.wrap(SessionKeySchema.toBinary(key, serdes.keySerializer(), topic))); + public void remove(final Windowed<Bytes> key) { + wrapped().remove(SessionKeySchema.toBinary(key)); } @Override - public void put(final Windowed<K> sessionKey, final AGG aggregate) { - wrapped().put(Bytes.wrap(SessionKeySchema.toBinary(sessionKey, serdes.keySerializer(), topic)), serdes.rawValue(aggregate)); + public void put(final Windowed<Bytes> sessionKey, final byte[] aggregate) { + wrapped().put(SessionKeySchema.toBinary(sessionKey), aggregate); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java index c22ca52..44c9f79 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java @@ -16,98 +16,85 @@ */ package org.apache.kafka.streams.state.internals; -import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; -import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.state.KeyValueIterator; -import org.apache.kafka.streams.state.StateSerdes; import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.state.WindowStoreIterator; -public class RocksDBWindowStore<K, V> extends WrappedStateStore<SegmentedBytesStore> implements WindowStore<K, V> { +public class RocksDBWindowStore extends WrappedStateStore<SegmentedBytesStore> implements WindowStore<Bytes, byte[]> { - private final Serde<K> keySerde; - private final Serde<V> valueSerde; private final boolean retainDuplicates; private final long windowSize; private ProcessorContext context; - private StateSerdes<K, V> serdes; private int seqnum = 0; RocksDBWindowStore(final SegmentedBytesStore bytesStore, - final Serde<K> keySerde, - final Serde<V> valueSerde, final boolean retainDuplicates, final long windowSize) { super(bytesStore); - this.keySerde = keySerde; - this.valueSerde = valueSerde; this.retainDuplicates = retainDuplicates; this.windowSize = windowSize; } @Override - @SuppressWarnings("unchecked") public void init(final ProcessorContext context, final StateStore root) { this.context = context; - // construct the serde - serdes = new StateSerdes<>(ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()), - keySerde == null ? (Serde<K>) context.keySerde() : keySerde, - valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde); - super.init(context, root); } @Override - public void put(final K key, final V value) { + public void put(final Bytes key, final byte[] value) { put(key, value, context.timestamp()); } @Override - public void put(final K key, final V value, final long windowStartTimestamp) { + public void put(final Bytes key, final byte[] value, final long windowStartTimestamp) { maybeUpdateSeqnumForDups(); - wrapped().put(WindowKeySchema.toStoreKeyBinary(key, windowStartTimestamp, seqnum, serdes), serdes.rawValue(value)); + wrapped().put(WindowKeySchema.toStoreKeyBinary(key, windowStartTimestamp, seqnum), value); } @Override - public V fetch(final K key, final long timestamp) { - final byte[] bytesValue = wrapped().get(WindowKeySchema.toStoreKeyBinary(key, timestamp, seqnum, serdes)); + public byte[] fetch(final Bytes key, final long timestamp) { + final byte[] bytesValue = wrapped().get(WindowKeySchema.toStoreKeyBinary(key, timestamp, seqnum)); if (bytesValue == null) { return null; } - return serdes.valueFrom(bytesValue); + return bytesValue; } @SuppressWarnings("deprecation") @Override - public WindowStoreIterator<V> fetch(final K key, final long timeFrom, final long timeTo) { - final KeyValueIterator<Bytes, byte[]> bytesIterator = wrapped().fetch(Bytes.wrap(serdes.rawKey(key)), timeFrom, timeTo); - return new WindowStoreIteratorWrapper<>(bytesIterator, serdes, windowSize).valuesIterator(); + public WindowStoreIterator<byte[]> fetch(final Bytes key, final long timeFrom, final long timeTo) { + final KeyValueIterator<Bytes, byte[]> bytesIterator = wrapped().fetch(key, timeFrom, timeTo); + return new WindowStoreIteratorWrapper(bytesIterator, windowSize).valuesIterator(); } @SuppressWarnings("deprecation") @Override - public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final long timeFrom, final long timeTo) { - final KeyValueIterator<Bytes, byte[]> bytesIterator = wrapped().fetch(Bytes.wrap(serdes.rawKey(from)), Bytes.wrap(serdes.rawKey(to)), timeFrom, timeTo); - return new WindowStoreIteratorWrapper<>(bytesIterator, serdes, windowSize).keyValueIterator(); + public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes from, + final Bytes to, + final long timeFrom, + final long timeTo) { + final KeyValueIterator<Bytes, byte[]> bytesIterator = wrapped().fetch(from, to, timeFrom, timeTo); + return new WindowStoreIteratorWrapper(bytesIterator, windowSize).keyValueIterator(); } @Override - public KeyValueIterator<Windowed<K>, V> all() { + public KeyValueIterator<Windowed<Bytes>, byte[]> all() { final KeyValueIterator<Bytes, byte[]> bytesIterator = wrapped().all(); - return new WindowStoreIteratorWrapper<>(bytesIterator, serdes, windowSize).keyValueIterator(); + return new WindowStoreIteratorWrapper(bytesIterator, windowSize).keyValueIterator(); } @SuppressWarnings("deprecation") @Override - public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom, final long timeTo) { + public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final long timeFrom, final long timeTo) { final KeyValueIterator<Bytes, byte[]> bytesIterator = wrapped().fetchAll(timeFrom, timeTo); - return new WindowStoreIteratorWrapper<>(bytesIterator, serdes, windowSize).keyValueIterator(); + return new WindowStoreIteratorWrapper(bytesIterator, windowSize).keyValueIterator(); } private void maybeUpdateSeqnumForDups() { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java index e88755b..8f305db 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams.state.internals; -import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.state.SessionBytesStoreSupplier; import org.apache.kafka.streams.state.SessionStore; @@ -44,7 +43,7 @@ public class RocksDbSessionBytesStoreSupplier implements SessionBytesStoreSuppli retentionPeriod, segmentIntervalMs(), new SessionKeySchema()); - return new RocksDBSessionStore<>(segmented, Serdes.Bytes(), Serdes.ByteArray()); + return new RocksDBSessionStore(segmented); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java index b9b7279..ecdfad2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams.state.internals; -import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.state.WindowBytesStoreSupplier; import org.apache.kafka.streams.state.WindowStore; @@ -54,12 +53,10 @@ public class RocksDbWindowBytesStoreSupplier implements WindowBytesStoreSupplier segmentInterval, new WindowKeySchema() ); - return new RocksDBWindowStore<>(segmentedBytesStore, - Serdes.Bytes(), - Serdes.ByteArray(), - retainDuplicates, - windowSize); - + return new RocksDBWindowStore( + segmentedBytesStore, + retainDuplicates, + windowSize); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreIteratorWrapper.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreIteratorWrapper.java index 1feab8f..4095445 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreIteratorWrapper.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreIteratorWrapper.java @@ -20,39 +20,33 @@ import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.state.KeyValueIterator; -import org.apache.kafka.streams.state.StateSerdes; import org.apache.kafka.streams.state.WindowStoreIterator; -class WindowStoreIteratorWrapper<K, V> { +class WindowStoreIteratorWrapper { private final KeyValueIterator<Bytes, byte[]> bytesIterator; - private final StateSerdes<K, V> serdes; private final long windowSize; WindowStoreIteratorWrapper(final KeyValueIterator<Bytes, byte[]> bytesIterator, - final StateSerdes<K, V> serdes, final long windowSize) { this.bytesIterator = bytesIterator; - this.serdes = serdes; this.windowSize = windowSize; } - public WindowStoreIterator<V> valuesIterator() { - return new WrappedWindowStoreIterator<>(bytesIterator, serdes); + public WindowStoreIterator<byte[]> valuesIterator() { + return new WrappedWindowStoreIterator(bytesIterator); } - public KeyValueIterator<Windowed<K>, V> keyValueIterator() { - return new WrappedKeyValueIterator<>(bytesIterator, serdes, windowSize); + public KeyValueIterator<Windowed<Bytes>, byte[]> keyValueIterator() { + return new WrappedKeyValueIterator(bytesIterator, windowSize); } - private static class WrappedWindowStoreIterator<V> implements WindowStoreIterator<V> { + private static class WrappedWindowStoreIterator implements WindowStoreIterator<byte[]> { final KeyValueIterator<Bytes, byte[]> bytesIterator; - final StateSerdes<?, V> serdes; WrappedWindowStoreIterator( - final KeyValueIterator<Bytes, byte[]> bytesIterator, final StateSerdes<?, V> serdes) { + final KeyValueIterator<Bytes, byte[]> bytesIterator) { this.bytesIterator = bytesIterator; - this.serdes = serdes; } @Override @@ -66,11 +60,10 @@ class WindowStoreIteratorWrapper<K, V> { } @Override - public KeyValue<Long, V> next() { + public KeyValue<Long, byte[]> next() { final KeyValue<Bytes, byte[]> next = bytesIterator.next(); final long timestamp = WindowKeySchema.extractStoreTimestamp(next.key.get()); - final V value = serdes.valueFrom(next.value); - return KeyValue.pair(timestamp, value); + return KeyValue.pair(timestamp, next.value); } @Override @@ -84,25 +77,20 @@ class WindowStoreIteratorWrapper<K, V> { } } - private static class WrappedKeyValueIterator<K, V> implements KeyValueIterator<Windowed<K>, V> { + private static class WrappedKeyValueIterator implements KeyValueIterator<Windowed<Bytes>, byte[]> { final KeyValueIterator<Bytes, byte[]> bytesIterator; - final StateSerdes<K, V> serdes; final long windowSize; WrappedKeyValueIterator(final KeyValueIterator<Bytes, byte[]> bytesIterator, - final StateSerdes<K, V> serdes, final long windowSize) { this.bytesIterator = bytesIterator; - this.serdes = serdes; this.windowSize = windowSize; } @Override - public Windowed<K> peekNextKey() { + public Windowed<Bytes> peekNextKey() { final byte[] nextKey = bytesIterator.peekNextKey().get(); - final long timestamp = WindowKeySchema.extractStoreTimestamp(nextKey); - final K key = WindowKeySchema.extractStoreKey(nextKey, serdes); - return new Windowed<>(key, WindowKeySchema.timeWindowForSize(timestamp, windowSize)); + return WindowKeySchema.fromStoreBytesKey(nextKey, windowSize); } @Override @@ -111,16 +99,9 @@ class WindowStoreIteratorWrapper<K, V> { } @Override - public KeyValue<Windowed<K>, V> next() { + public KeyValue<Windowed<Bytes>, byte[]> next() { final KeyValue<Bytes, byte[]> next = bytesIterator.next(); - final long timestamp = WindowKeySchema.extractStoreTimestamp(next.key.get()); - final K key = WindowKeySchema.extractStoreKey(next.key.get(), serdes); - final V value = serdes.valueFrom(next.value); - return KeyValue.pair( - new Windowed<>(key, WindowKeySchema.timeWindowForSize(timestamp, windowSize)), - value - ); - + return KeyValue.pair(WindowKeySchema.fromStoreBytesKey(next.key.get(), windowSize), next.value); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedSessionStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedSessionStoreIterator.java index ce27457..281297c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedSessionStoreIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedSessionStoreIterator.java @@ -20,17 +20,13 @@ import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.state.KeyValueIterator; -import org.apache.kafka.streams.state.StateSerdes; -class WrappedSessionStoreIterator<K, V> implements KeyValueIterator<Windowed<K>, V> { +class WrappedSessionStoreIterator implements KeyValueIterator<Windowed<Bytes>, byte[]> { private final KeyValueIterator<Bytes, byte[]> bytesIterator; - private final StateSerdes<K, V> serdes; - WrappedSessionStoreIterator(final KeyValueIterator<Bytes, byte[]> bytesIterator, - final StateSerdes<K, V> serdes) { + WrappedSessionStoreIterator(final KeyValueIterator<Bytes, byte[]> bytesIterator) { this.bytesIterator = bytesIterator; - this.serdes = serdes; } @Override @@ -39,9 +35,8 @@ class WrappedSessionStoreIterator<K, V> implements KeyValueIterator<Windowed<K>, } @Override - public Windowed<K> peekNextKey() { - final Bytes bytes = bytesIterator.peekNextKey(); - return SessionKeySchema.from(bytes.get(), serdes.keyDeserializer(), serdes.topic()); + public Windowed<Bytes> peekNextKey() { + return SessionKeySchema.from(bytesIterator.peekNextKey()); } @Override @@ -50,9 +45,9 @@ class WrappedSessionStoreIterator<K, V> implements KeyValueIterator<Windowed<K>, } @Override - public KeyValue<Windowed<K>, V> next() { + public KeyValue<Windowed<Bytes>, byte[]> next() { final KeyValue<Bytes, byte[]> next = bytesIterator.next(); - return KeyValue.pair(SessionKeySchema.from(next.key.get(), serdes.keyDeserializer(), serdes.topic()), serdes.valueFrom(next.value)); + return KeyValue.pair(SessionKeySchema.from(next.key), next.value); } @Override diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java index a6f0a71..1cfdcd7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java @@ -71,7 +71,7 @@ public class CachingSessionStoreTest { public void setUp() { final SessionKeySchema schema = new SessionKeySchema(); final RocksDBSegmentedBytesStore underlying = new RocksDBSegmentedBytesStore("test", "metrics-scope", 0L, SEGMENT_INTERVAL, schema); - final RocksDBSessionStore<Bytes, byte[]> sessionStore = new RocksDBSessionStore<>(underlying, Serdes.Bytes(), Serdes.ByteArray()); + final RocksDBSessionStore sessionStore = new RocksDBSessionStore(underlying); cachingStore = new CachingSessionStore<>(sessionStore, Serdes.String(), Serdes.String(), SEGMENT_INTERVAL); cache = new ThreadCache(new LogContext("testCache "), MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics())); final InternalMockProcessorContext context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java index cb3fcd4..2bb758e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java @@ -83,10 +83,8 @@ public class CachingWindowStoreTest { public void setUp() { keySchema = new WindowKeySchema(); underlying = new RocksDBSegmentedBytesStore("test", "metrics-scope", 0, SEGMENT_INTERVAL, keySchema); - final RocksDBWindowStore<Bytes, byte[]> windowStore = new RocksDBWindowStore<>( + final RocksDBWindowStore windowStore = new RocksDBWindowStore( underlying, - Serdes.Bytes(), - Serdes.ByteArray(), false, WINDOW_SIZE); cacheListener = new CachingKeyValueStoreTest.CacheFlushListenerStub<>(); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java index 3653e7e..0786c37 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java @@ -25,6 +25,7 @@ import org.apache.kafka.streams.kstream.internals.SessionWindow; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.SessionStore; +import org.apache.kafka.streams.state.Stores; import org.apache.kafka.test.InternalMockProcessorContext; import org.apache.kafka.test.NoOpRecordCollector; import org.apache.kafka.test.TestUtils; @@ -36,6 +37,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import static java.time.Duration.ofMillis; import static org.apache.kafka.test.StreamsTestUtils.toList; import static org.apache.kafka.test.StreamsTestUtils.valuesToList; import static org.hamcrest.CoreMatchers.equalTo; @@ -51,19 +53,12 @@ public class RocksDBSessionStoreTest { @Before public void before() { - final SessionKeySchema schema = new SessionKeySchema(); - - final RocksDBSegmentedBytesStore bytesStore = new RocksDBSegmentedBytesStore( - "session-store", - "metrics-scope", - 10_000L, - 60_000L, - schema); - - sessionStore = new RocksDBSessionStore<>( - bytesStore, + sessionStore = Stores.sessionStoreBuilder( + Stores.persistentSessionStore( + "session-store", + ofMillis(10_000L)), Serdes.String(), - Serdes.Long()); + Serdes.Long()).build(); context = new InternalMockProcessorContext( TestUtils.tempDirectory(), @@ -74,6 +69,7 @@ public class RocksDBSessionStoreTest { new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics()))); + sessionStore.init(context, sessionStore); } @@ -188,17 +184,12 @@ public class RocksDBSessionStoreTest { @Test public void shouldFetchExactKeys() { - final RocksDBSegmentedBytesStore bytesStore = new RocksDBSegmentedBytesStore( - "session-store", - "metrics-scope", - 0x7a00000000000000L, - 0x7a00000000000000L, - new SessionKeySchema()); - - sessionStore = new RocksDBSessionStore<>( - bytesStore, + sessionStore = Stores.sessionStoreBuilder( + Stores.persistentSessionStore( + "session-store", + ofMillis(0x7a00000000000000L)), Serdes.String(), - Serdes.Long()); + Serdes.Long()).build(); sessionStore.init(context, sessionStore); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java index 32fa0f7..42b1b8c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java @@ -1369,24 +1369,6 @@ public class RocksDBWindowStoreTest { } @Test - public void shouldNoNullPointerWhenSerdeDoesNotHandleNull() { - windowStore = new RocksDBWindowStore<>( - new RocksDBSegmentedBytesStore( - windowName, - "metrics-scope", - retentionPeriod, - segmentInterval, - new WindowKeySchema()), - Serdes.Integer(), - new SerdeThatDoesntHandleNull(), - false, - windowSize); - windowStore.init(context, windowStore); - - assertNull(windowStore.fetch(1, 0)); - } - - @Test public void shouldFetchAndIterateOverExactBinaryKeys() { final WindowStore<Bytes, String> windowStore = Stores.windowStoreBuilder( Stores.persistentWindowStore(windowName, ofMillis(60_000L), ofMillis(60_000L), true),