Repository: kafka Updated Branches: refs/heads/trunk 49f80b236 -> aea146511
HOTFIX: WindowedStreamPartitioner does not provide topic name to serializer Author: Matthias J. Sax <[email protected]> Reviewers: Eno Thereska <[email protected]>, Damian Guy <[email protected]>, Ismael Juma <[email protected]> Closes #2777 from mjsax/hotfix-window-serdes-trunk Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/aea14651 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/aea14651 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/aea14651 Branch: refs/heads/trunk Commit: aea14651184476940c69238535de5143e61f4c31 Parents: 49f80b2 Author: Matthias J. Sax <[email protected]> Authored: Wed Apr 5 10:31:06 2017 +0100 Committer: Ismael Juma <[email protected]> Committed: Wed Apr 5 10:31:06 2017 +0100 ---------------------------------------------------------------------- .../streams/kstream/internals/KStreamImpl.java | 12 ++--- .../kstream/internals/SessionKeySerde.java | 17 +++--- .../internals/WindowedStreamPartitioner.java | 8 +-- .../apache/kafka/streams/state/StateSerdes.java | 57 +++++++++++--------- .../state/internals/CachingKeyValueStore.java | 8 +-- .../state/internals/CachingSessionStore.java | 23 ++++---- .../state/internals/CachingWindowStore.java | 14 ++--- .../ChangeLoggingKeyValueBytesStore.java | 9 +++- .../internals/ChangeLoggingKeyValueStore.java | 7 +-- .../ChangeLoggingSegmentedBytesStore.java | 9 +++- .../internals/InMemoryKeyValueLoggedStore.java | 10 ++-- .../state/internals/InMemoryKeyValueStore.java | 8 +-- .../streams/state/internals/MemoryLRUCache.java | 8 +-- .../MergedSortedCacheSessionStoreIterator.java | 4 +- .../internals/RocksDBSegmentedBytesStore.java | 3 ++ .../state/internals/RocksDBSessionStore.java | 16 ++++-- .../internals/RocksDBSessionStoreSupplier.java | 6 +-- .../streams/state/internals/RocksDBStore.java | 8 +-- .../state/internals/RocksDBWindowStore.java | 7 +-- .../state/internals/SegmentedBytesStore.java | 8 +++ .../state/internals/SessionKeySchema.java | 10 +++- .../state/internals/WindowKeySchema.java | 7 ++- .../state/internals/WindowStoreUtils.java | 5 +- .../internals/WrappedSessionStoreIterator.java | 4 +- .../kstream/internals/SessionKeySerdeTest.java | 16 +++--- .../WindowedStreamPartitionerTest.java | 2 +- .../streams/state/KeyValueStoreTestDriver.java | 34 ++++++------ .../internals/CachingSessionStoreTest.java | 6 ++- .../state/internals/CachingWindowStoreTest.java | 2 +- .../CompositeReadOnlyKeyValueStoreTest.java | 2 +- ...gedSortedCacheKeyValueStoreIteratorTest.java | 2 +- ...tedCacheWrappedSessionStoreIteratorTest.java | 2 +- .../RocksDBSegmentedBytesStoreTest.java | 9 ++-- .../internals/RocksDBSessionStoreTest.java | 5 +- .../state/internals/SessionKeySchemaTest.java | 1 + .../state/internals/StateStoreTestUtils.java | 16 ++++-- .../state/internals/StoreChangeLoggerTest.java | 6 +-- .../internals/WrappingStoreProviderTest.java | 4 +- 38 files changed, 229 insertions(+), 146 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 5eabc2c..bbd4ac4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -360,16 +360,16 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V @SuppressWarnings("unchecked") @Override - public void to(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<? super K, ? super V> partitioner, String topic) { + public void to(final Serde<K> keySerde, final Serde<V> valSerde, StreamPartitioner<? super K, ? super V> partitioner, final String topic) { Objects.requireNonNull(topic, "topic can't be null"); - String name = topology.newName(SINK_NAME); + final String name = topology.newName(SINK_NAME); - Serializer<K> keySerializer = keySerde == null ? null : keySerde.serializer(); - Serializer<V> valSerializer = valSerde == null ? null : valSerde.serializer(); + final Serializer<K> keySerializer = keySerde == null ? null : keySerde.serializer(); + final Serializer<V> valSerializer = valSerde == null ? null : valSerde.serializer(); if (partitioner == null && keySerializer != null && keySerializer instanceof WindowedSerializer) { - WindowedSerializer<Object> windowedSerializer = (WindowedSerializer<Object>) keySerializer; - partitioner = (StreamPartitioner<K, V>) new WindowedStreamPartitioner<Object, V>(windowedSerializer); + final WindowedSerializer<Object> windowedSerializer = (WindowedSerializer<Object>) keySerializer; + partitioner = (StreamPartitioner<K, V>) new WindowedStreamPartitioner<Object, V>(topic, windowedSerializer); } topology.addSink(name, topic, keySerializer, valSerializer, partitioner, this.name); http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java index 7a85c77..3b57d95 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java @@ -33,7 +33,6 @@ import java.util.Map; */ public class SessionKeySerde<K> implements Serde<Windowed<K>> { private static final int TIMESTAMP_SIZE = 8; - private static final String SESSIONKEY = "sessionkey"; private final Serde<K> keySerde; @@ -77,7 +76,7 @@ public class SessionKeySerde<K> implements Serde<Windowed<K>> { if (data == null) { return null; } - return toBinary(data, keySerializer).get(); + return toBinary(data, keySerializer, topic).get(); } @Override @@ -102,7 +101,7 @@ public class SessionKeySerde<K> implements Serde<Windowed<K>> { if (data == null || data.length == 0) { return null; } - return from(data, deserializer); + return from(data, deserializer, topic); } @@ -133,8 +132,8 @@ public class SessionKeySerde<K> implements Serde<Windowed<K>> { return bytes; } - public static <K> Windowed<K> from(final byte[] binaryKey, final Deserializer<K> keyDeserializer) { - final K key = extractKey(binaryKey, keyDeserializer); + public static <K> Windowed<K> from(final byte[] binaryKey, final Deserializer<K> keyDeserializer, final String topic) { + final K key = extractKey(binaryKey, keyDeserializer, topic); final Window window = extractWindow(binaryKey); return new Windowed<>(key, window); } @@ -147,12 +146,12 @@ public class SessionKeySerde<K> implements Serde<Windowed<K>> { return new Windowed<>(Bytes.wrap(extractKeyBytes(binaryKey)), new SessionWindow(start, end)); } - private static <K> K extractKey(final byte[] binaryKey, Deserializer<K> deserializer) { - return deserializer.deserialize(SESSIONKEY, extractKeyBytes(binaryKey)); + private static <K> K extractKey(final byte[] binaryKey, final Deserializer<K> deserializer, final String topic) { + return deserializer.deserialize(topic, extractKeyBytes(binaryKey)); } - public static <K> Bytes toBinary(final Windowed<K> sessionKey, final Serializer<K> serializer) { - final byte[] bytes = serializer.serialize(SESSIONKEY, sessionKey.key()); + public static <K> Bytes toBinary(final Windowed<K> sessionKey, final Serializer<K> serializer, final String topic) { + final byte[] bytes = serializer.serialize(topic, sessionKey.key()); ByteBuffer buf = ByteBuffer.allocate(bytes.length + 2 * TIMESTAMP_SIZE); buf.put(bytes); buf.putLong(sessionKey.window().end()); http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java index f4a5e81..fa1ceae 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java @@ -24,9 +24,11 @@ import static org.apache.kafka.common.utils.Utils.toPositive; public class WindowedStreamPartitioner<K, V> implements StreamPartitioner<Windowed<K>, V> { + private final String topic; private final WindowedSerializer<K> serializer; - public WindowedStreamPartitioner(WindowedSerializer<K> serializer) { + WindowedStreamPartitioner(final String topic, final WindowedSerializer<K> serializer) { + this.topic = topic; this.serializer = serializer; } @@ -40,8 +42,8 @@ public class WindowedStreamPartitioner<K, V> implements StreamPartitioner<Window * @param numPartitions the total number of partitions * @return an integer between 0 and {@code numPartitions-1}, or {@code null} if the default partitioning logic should be used */ - public Integer partition(Windowed<K> windowedKey, V value, int numPartitions) { - byte[] keyBytes = serializer.serializeBaseKey(null, windowedKey); + public Integer partition(final Windowed<K> windowedKey, final V value, final int numPartitions) { + final byte[] keyBytes = serializer.serializeBaseKey(topic, windowedKey); // hash the keyBytes to choose a partition return toPositive(Utils.murmur2(keyBytes)) % numPartitions; http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java b/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java index 4366311..d43c613 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java @@ -32,18 +32,21 @@ public final class StateSerdes<K, V> { /** * Create a new instance of {@link StateSerdes} for the given state name and key-/value-type classes. * - * @param stateName the name of the state - * @param keyClass the class of the key type - * @param valueClass the class of the value type - * @param <K> the key type - * @param <V> the value type - * @return a new instance of {@link StateSerdes} + * @param topic the topic name + * @param keyClass the class of the key type + * @param valueClass the class of the value type + * @param <K> the key type + * @param <V> the value type + * @return a new instance of {@link StateSerdes} */ - public static <K, V> StateSerdes<K, V> withBuiltinTypes(String stateName, Class<K> keyClass, Class<V> valueClass) { - return new StateSerdes<>(stateName, Serdes.serdeFrom(keyClass), Serdes.serdeFrom(valueClass)); + public static <K, V> StateSerdes<K, V> withBuiltinTypes( + final String topic, + final Class<K> keyClass, + final Class<V> valueClass) { + return new StateSerdes<>(topic, Serdes.serdeFrom(keyClass), Serdes.serdeFrom(valueClass)); } - private final String stateName; + private final String topic; private final Serde<K> keySerde; private final Serde<V> valueSerde; @@ -53,22 +56,26 @@ public final class StateSerdes<K, V> { * is provided to bind this serde factory to, so that future calls for serialize / deserialize do not * need to provide the topic name any more. * - * @param stateName the name of the state + * @param topic the topic name * @param keySerde the serde for keys; cannot be null * @param valueSerde the serde for values; cannot be null * @throws IllegalArgumentException if key or value serde is null */ @SuppressWarnings("unchecked") - public StateSerdes(String stateName, - Serde<K> keySerde, - Serde<V> valueSerde) { - this.stateName = stateName; - - if (keySerde == null) + public StateSerdes(final String topic, + final Serde<K> keySerde, + final Serde<V> valueSerde) { + if (topic == null) { + throw new IllegalArgumentException("topic cannot be null"); + } + if (keySerde == null) { throw new IllegalArgumentException("key serde cannot be null"); - if (valueSerde == null) + } + if (valueSerde == null) { throw new IllegalArgumentException("value serde cannot be null"); + } + this.topic = topic; this.keySerde = keySerde; this.valueSerde = valueSerde; } @@ -128,12 +135,12 @@ public final class StateSerdes<K, V> { } /** - * Return the name of the state. + * Return the topic. * - * @return the name of the state + * @return the topic */ - public String stateName() { - return stateName; + public String topic() { + return topic; } /** @@ -143,7 +150,7 @@ public final class StateSerdes<K, V> { * @return the key as typed object */ public K keyFrom(byte[] rawKey) { - return keySerde.deserializer().deserialize(stateName, rawKey); + return keySerde.deserializer().deserialize(topic, rawKey); } /** @@ -153,7 +160,7 @@ public final class StateSerdes<K, V> { * @return the value as typed object */ public V valueFrom(byte[] rawValue) { - return valueSerde.deserializer().deserialize(stateName, rawValue); + return valueSerde.deserializer().deserialize(topic, rawValue); } /** @@ -163,7 +170,7 @@ public final class StateSerdes<K, V> { * @return the serialized key */ public byte[] rawKey(K key) { - return keySerde.serializer().serialize(stateName, key); + return keySerde.serializer().serialize(topic, key); } /** @@ -173,6 +180,6 @@ public final class StateSerdes<K, V> { * @return the serialized value */ public byte[] rawValue(V value) { - return valueSerde.serializer().serialize(stateName, value); + return valueSerde.serializer().serialize(topic, value); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java index d9ef688..2a720be 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java @@ -23,6 +23,7 @@ import org.apache.kafka.streams.kstream.internals.CacheFlushListener; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.processor.internals.RecordContext; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; @@ -51,11 +52,6 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im this.valueSerde = valueSerde; } - @Override - public String name() { - return underlying.name(); - } - @SuppressWarnings("unchecked") @Override public void init(final ProcessorContext context, final StateStore root) { @@ -69,7 +65,7 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im @SuppressWarnings("unchecked") private void initInternal(final ProcessorContext context) { this.context = (InternalProcessorContext) context; - this.serdes = new StateSerdes<>(underlying.name(), + this.serdes = new StateSerdes<>(ProcessorStateManager.storeChangelogTopic(context.applicationId(), underlying.name()), keySerde == null ? (Serde<K>) context.keySerde() : keySerde, valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde); http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java index a4b46ff..bebd118 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java @@ -24,6 +24,7 @@ import org.apache.kafka.streams.kstream.internals.SessionKeySerde; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.processor.internals.RecordContext; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.SessionStore; @@ -43,6 +44,7 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i private StateSerdes<K, AGG> serdes; private InternalProcessorContext context; private CacheFlushListener<Windowed<K>, AGG> flushListener; + private String topic; CachingSessionStore(final SessionStore<Bytes, byte[]> bytesStore, final Serde<K> keySerde, @@ -56,6 +58,7 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i @SuppressWarnings("unchecked") public void init(final ProcessorContext context, final StateStore root) { + topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), root.name()); bytesStore.init(context, root); initInternal((InternalProcessorContext) context); } @@ -64,13 +67,15 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i private void initInternal(final InternalProcessorContext context) { this.context = context; - this.serdes = new StateSerdes<>(bytesStore.name(), - keySerde == null ? (Serde<K>) context.keySerde() : keySerde, - aggSerde == null ? (Serde<AGG>) context.valueSerde() : aggSerde); + keySchema.init(topic); + serdes = new StateSerdes<>( + topic, + keySerde == null ? (Serde<K>) context.keySerde() : keySerde, + aggSerde == null ? (Serde<AGG>) context.valueSerde() : aggSerde); - this.cacheName = context.taskId() + "-" + bytesStore.name(); - this.cache = this.context.getCache(); + cacheName = context.taskId() + "-" + bytesStore.name(); + cache = context.getCache(); cache.addDirtyEntryFlushListener(cacheName, new ThreadCache.DirtyEntryFlushListener() { @Override public void apply(final List<ThreadCache.DirtyEntry> entries) { @@ -85,7 +90,7 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i final long earliestSessionEndTime, final long latestSessionStartTime) { validateStoreOpen(); - final Bytes binarySessionId = Bytes.wrap(keySerde.serializer().serialize(this.name(), key)); + final Bytes binarySessionId = Bytes.wrap(keySerde.serializer().serialize(topic, key)); final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(cacheName, keySchema.lowerRange(binarySessionId, earliestSessionEndTime), keySchema.upperRange(binarySessionId, latestSessionStartTime)); @@ -106,7 +111,7 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i @Override public void put(final Windowed<K> key, AGG value) { validateStoreOpen(); - final Bytes binaryKey = SessionKeySerde.toBinary(key, keySerde.serializer()); + final Bytes binaryKey = SessionKeySerde.toBinary(key, keySerde.serializer(), topic); final LRUCacheEntry entry = new LRUCacheEntry(serdes.rawValue(value), true, context.offset(), key.window().end(), context.partition(), context.topic()); cache.put(cacheName, binaryKey, entry); @@ -122,12 +127,12 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i final RecordContext current = context.recordContext(); context.setRecordContext(entry.recordContext()); try { - final Windowed<K> key = SessionKeySerde.from(binaryKey.get(), keySerde.deserializer()); + final Windowed<K> key = SessionKeySerde.from(binaryKey.get(), keySerde.deserializer(), topic); if (flushListener != null) { final AGG newValue = serdes.valueFrom(entry.newValue()); final AGG oldValue = fetchPrevious(binaryKey); if (!(newValue == null && oldValue == null)) { - flushListener.apply(key, newValue == null ? null : newValue, oldValue); + flushListener.apply(key, newValue, oldValue); } } bytesStore.put(new Windowed<>(Bytes.wrap(serdes.rawKey(key.key())), key.window()), entry.newValue()); http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java index 4003e54..f492573 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java @@ -23,6 +23,7 @@ import org.apache.kafka.streams.kstream.internals.CacheFlushListener; import org.apache.kafka.streams.kstream.internals.TimeWindow; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.processor.internals.RecordContext; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; @@ -62,17 +63,18 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl public void init(final ProcessorContext context, final StateStore root) { underlying.init(context, root); initInternal(context); + keySchema.init(context.applicationId()); } @SuppressWarnings("unchecked") private void initInternal(final ProcessorContext context) { this.context = (InternalProcessorContext) context; - this.serdes = new StateSerdes<>(underlying.name(), - keySerde == null ? (Serde<K>) context.keySerde() : keySerde, - valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde); + serdes = new StateSerdes<>(ProcessorStateManager.storeChangelogTopic(context.applicationId(), underlying.name()), + keySerde == null ? (Serde<K>) context.keySerde() : keySerde, + valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde); - this.name = context.taskId() + "-" + underlying.name(); - this.cache = this.context.getCache(); + name = context.taskId() + "-" + underlying.name(); + cache = this.context.getCache(); cache.addDirtyEntryFlushListener(name, new ThreadCache.DirtyEntryFlushListener() { @Override @@ -161,7 +163,7 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl return new MergedSortedCacheWindowStoreIterator<>(filteredCacheIterator, underlyingIterator, - new StateSerdes<>(serdes.stateName(), Serdes.Long(), serdes.valueSerde())); + new StateSerdes<>(serdes.topic(), Serdes.Long(), serdes.valueSerde())); } private V fetchPrevious(final Bytes key, final long timestamp) { http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java index f5ad3ac..8dc457a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; @@ -37,7 +38,13 @@ public class ChangeLoggingKeyValueBytesStore extends WrappedStateStore.AbstractS @Override public void init(final ProcessorContext context, final StateStore root) { inner.init(context, root); - this.changeLogger = new StoreChangeLogger<>(inner.name(), context, WindowStoreUtils.INNER_SERDES); + this.changeLogger = new StoreChangeLogger<>( + inner.name(), + context, + WindowStoreUtils.getInnerStateSerde( + ProcessorStateManager.storeChangelogTopic( + context.applicationId(), + inner.name()))); } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStore.java index c60278f..ea9f7aa 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStore.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StateSerdes; @@ -55,9 +56,9 @@ class ChangeLoggingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateSt public void init(final ProcessorContext context, final StateStore root) { innerBytes.init(context, root); - this.serdes = new StateSerdes<>(innerBytes.name(), - keySerde == null ? (Serde<K>) context.keySerde() : keySerde, - valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde); + serdes = new StateSerdes<>(ProcessorStateManager.storeChangelogTopic(context.applicationId(), innerBytes.name()), + keySerde == null ? (Serde<K>) context.keySerde() : keySerde, + valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde); } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java index 3082426..d23e115 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java @@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.state.KeyValueIterator; /** @@ -65,6 +66,12 @@ class ChangeLoggingSegmentedBytesStore extends WrappedStateStore.AbstractStateSt @SuppressWarnings("unchecked") public void init(final ProcessorContext context, final StateStore root) { bytesStore.init(context, root); - changeLogger = new StoreChangeLogger<>(name(), context, WindowStoreUtils.INNER_SERDES); + changeLogger = new StoreChangeLogger<>( + name(), + context, + WindowStoreUtils.getInnerStateSerde( + ProcessorStateManager.storeChangelogTopic( + context.applicationId(), + bytesStore.name()))); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java index bcc9819..638caad 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StateSerdes; @@ -33,7 +34,6 @@ public class InMemoryKeyValueLoggedStore<K, V> extends WrappedStateStore.Abstrac private final Serde<V> valueSerde; private StoreChangeLogger<K, V> changeLogger; - private ProcessorContext context; InMemoryKeyValueLoggedStore(final KeyValueStore<K, V> inner, Serde<K> keySerde, Serde<V> valueSerde) { super(inner); @@ -45,13 +45,13 @@ public class InMemoryKeyValueLoggedStore<K, V> extends WrappedStateStore.Abstrac @Override @SuppressWarnings("unchecked") public void init(ProcessorContext context, StateStore root) { - this.context = context; inner.init(context, root); // construct the serde - StateSerdes<K, V> serdes = new StateSerdes<>(inner.name(), - keySerde == null ? (Serde<K>) context.keySerde() : keySerde, - valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde); + StateSerdes<K, V> serdes = new StateSerdes<>( + ProcessorStateManager.storeChangelogTopic(context.applicationId(), inner.name()), + keySerde == null ? (Serde<K>) context.keySerde() : keySerde, + valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde); this.changeLogger = new StoreChangeLogger<>(inner.name(), context, serdes); http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java index f63d2f1..41c6de3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java @@ -21,6 +21,7 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StateSerdes; @@ -64,9 +65,10 @@ public class InMemoryKeyValueStore<K, V> implements KeyValueStore<K, V> { @SuppressWarnings("unchecked") public void init(ProcessorContext context, StateStore root) { // construct the serde - this.serdes = new StateSerdes<>(name, - keySerde == null ? (Serde<K>) context.keySerde() : keySerde, - valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde); + this.serdes = new StateSerdes<>( + ProcessorStateManager.storeChangelogTopic(context.applicationId(), name), + keySerde == null ? (Serde<K>) context.keySerde() : keySerde, + valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde); if (root != null) { // register the store http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java index cf78165..e6bba54 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java @@ -21,6 +21,7 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StateSerdes; @@ -103,9 +104,10 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> { @SuppressWarnings("unchecked") public void init(ProcessorContext context, StateStore root) { // construct the serde - this.serdes = new StateSerdes<>(name, - keySerde == null ? (Serde<K>) context.keySerde() : keySerde, - valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde); + this.serdes = new StateSerdes<>( + ProcessorStateManager.storeChangelogTopic(context.applicationId(), name), + keySerde == null ? (Serde<K>) context.keySerde() : keySerde, + valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde); // register the store context.register(root, true, new StateRestoreCallback() { http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java index 72b73ed..3f9b620 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java @@ -36,7 +36,7 @@ class MergedSortedCacheSessionStoreIterator<K, AGG> extends AbstractMergedSorted MergedSortedCacheSessionStoreIterator(final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator, final KeyValueIterator<Windowed<Bytes>, byte[]> storeIterator, final StateSerdes<K, AGG> serdes) { - super(cacheIterator, storeIterator, new StateSerdes<>(serdes.stateName(), + super(cacheIterator, storeIterator, new StateSerdes<>(serdes.topic(), new SessionKeySerde<>(serdes.keySerde()), serdes.valueSerde())); @@ -51,7 +51,7 @@ class MergedSortedCacheSessionStoreIterator<K, AGG> extends AbstractMergedSorted @Override Windowed<K> deserializeCacheKey(final Bytes cacheKey) { - return SessionKeySerde.from(cacheKey.get(), rawSerdes.keyDeserializer()); + return SessionKeySerde.from(cacheKey.get(), rawSerdes.keyDeserializer(), rawSerdes.topic()); } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java index a9d9259..252a55f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.state.KeyValueIterator; import java.util.List; @@ -89,6 +90,8 @@ class RocksDBSegmentedBytesStore implements SegmentedBytesStore { public void init(ProcessorContext context, StateStore root) { this.context = context; + keySchema.init(ProcessorStateManager.storeChangelogTopic(context.applicationId(), root.name())); + segments.openExisting(context); // register and possibly restore the state from the logs http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java index cdd0dd7..5027781 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java @@ -23,6 +23,7 @@ import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.SessionKeySerde; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.SessionStore; import org.apache.kafka.streams.state.StateSerdes; @@ -35,6 +36,7 @@ class RocksDBSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i protected final SegmentedBytesStore bytesStore; protected StateSerdes<K, AGG> serdes; + protected String topic; // this is optimizing the case when this store is already a bytes store, in which we can avoid Bytes.wrap() costs private static class RocksDBSessionBytesStore extends RocksDBSessionStore<Bytes, byte[]> { @@ -75,9 +77,13 @@ class RocksDBSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i @Override @SuppressWarnings("unchecked") public void init(final ProcessorContext context, final StateStore root) { - this.serdes = new StateSerdes<>(bytesStore.name(), - keySerde == null ? (Serde<K>) context.keySerde() : keySerde, - aggSerde == null ? (Serde<AGG>) context.valueSerde() : aggSerde); + final String storeName = bytesStore.name(); + topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName); + + serdes = new StateSerdes<>( + topic, + keySerde == null ? (Serde<K>) context.keySerde() : keySerde, + aggSerde == null ? (Serde<AGG>) context.valueSerde() : aggSerde); bytesStore.init(context, root); } @@ -95,11 +101,11 @@ class RocksDBSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i @Override public void remove(final Windowed<K> key) { - bytesStore.remove(SessionKeySerde.toBinary(key, serdes.keySerializer())); + bytesStore.remove(SessionKeySerde.toBinary(key, serdes.keySerializer(), topic)); } @Override public void put(final Windowed<K> sessionKey, final AGG aggregate) { - bytesStore.put(SessionKeySerde.toBinary(sessionKey, serdes.keySerializer()), aggSerde.serializer().serialize(bytesStore.name(), aggregate)); + bytesStore.put(SessionKeySerde.toBinary(sessionKey, serdes.keySerializer(), topic), aggSerde.serializer().serialize(topic, aggregate)); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java index 6743a7e..4e618d9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java @@ -51,9 +51,9 @@ public class RocksDBSessionStoreSupplier<K, V> extends AbstractStoreSupplier<K, public SessionStore<K, V> get() { final SessionKeySchema keySchema = new SessionKeySchema(); final RocksDBSegmentedBytesStore segmented = new RocksDBSegmentedBytesStore(name, - retentionPeriod, - NUM_SEGMENTS, - keySchema); + retentionPeriod, + NUM_SEGMENTS, + keySchema); if (cached && logged) { final ChangeLoggingSegmentedBytesStore logged = new ChangeLoggingSegmentedBytesStore(segmented); http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index 932ddd2..c879b91 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -27,6 +27,7 @@ import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.RocksDBConfigSetter; @@ -143,9 +144,10 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { } // we need to construct the serde while opening DB since // it is also triggered by windowed DB segments without initialization - this.serdes = new StateSerdes<>(name, - keySerde == null ? (Serde<K>) context.keySerde() : keySerde, - valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde); + this.serdes = new StateSerdes<>( + ProcessorStateManager.storeChangelogTopic(context.applicationId(), name), + keySerde == null ? (Serde<K>) context.keySerde() : keySerde, + valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde); this.dbDir = new File(new File(context.stateDir(), parentDir), this.name); try { http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java index b82e416..5e8d0b2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.StateSerdes; import org.apache.kafka.streams.state.WindowStore; @@ -77,9 +78,9 @@ class RocksDBWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl public void init(final ProcessorContext context, final StateStore root) { this.context = context; // construct the serde - this.serdes = new StateSerdes<>(bytesStore.name(), - keySerde == null ? (Serde<K>) context.keySerde() : keySerde, - valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde); + serdes = new StateSerdes<>(ProcessorStateManager.storeChangelogTopic(context.applicationId(), bytesStore.name()), + keySerde == null ? (Serde<K>) context.keySerde() : keySerde, + valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde); bytesStore.init(context, root); } http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java index 622ed08..0c3bb53 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java @@ -66,6 +66,14 @@ public interface SegmentedBytesStore extends StateStore { byte[] get(Bytes key); interface KeySchema { + + /** + * Initialized the schema with a topic. + * + * @param topic a topic name + */ + void init(final String topic); + /** * Given a record-key and a time, construct a Segmented key that represents * the upper range of keys to search when performing range queries. http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java index 7d6761c..80785b2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java @@ -27,17 +27,23 @@ import java.util.List; class SessionKeySchema implements SegmentedBytesStore.KeySchema { + private String topic; + + @Override + public void init(final String topic) { + this.topic = topic; + } @Override public Bytes upperRange(final Bytes key, final long to) { final Windowed<Bytes> sessionKey = new Windowed<>(key, new SessionWindow(to, Long.MAX_VALUE)); - return SessionKeySerde.toBinary(sessionKey, Serdes.Bytes().serializer()); + return SessionKeySerde.toBinary(sessionKey, Serdes.Bytes().serializer(), topic); } @Override public Bytes lowerRange(final Bytes key, final long from) { final Windowed<Bytes> sessionKey = new Windowed<>(key, new SessionWindow(0, Math.max(0, from))); - return SessionKeySerde.toBinary(sessionKey, Serdes.Bytes().serializer()); + return SessionKeySerde.toBinary(sessionKey, Serdes.Bytes().serializer(), topic); } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java index 76faf0e..b9a8665 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java @@ -24,7 +24,12 @@ import org.apache.kafka.streams.state.StateSerdes; import java.util.List; class WindowKeySchema implements RocksDBSegmentedBytesStore.KeySchema { - private final StateSerdes<Bytes, byte[]> serdes = new StateSerdes<>("window-store-key-schema", Serdes.Bytes(), Serdes.ByteArray()); + private StateSerdes<Bytes, byte[]> serdes; + + @Override + public void init(final String topic) { + serdes = new StateSerdes<>(topic, Serdes.Bytes(), Serdes.ByteArray()); + } @Override public Bytes upperRange(final Bytes key, final long to) { http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java index b93e39a..faf2899 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java @@ -31,7 +31,10 @@ public class WindowStoreUtils { /** Inner byte array serde used for segments */ static final Serde<Bytes> INNER_KEY_SERDE = Serdes.Bytes(); static final Serde<byte[]> INNER_VALUE_SERDE = Serdes.ByteArray(); - static final StateSerdes<Bytes, byte[]> INNER_SERDES = new StateSerdes<>("rocksDB-inner", INNER_KEY_SERDE, INNER_VALUE_SERDE); + + static StateSerdes<Bytes, byte[]> getInnerStateSerde(final String topic) { + return new StateSerdes<>(topic, INNER_KEY_SERDE, INNER_VALUE_SERDE); + } static <K> Bytes toBinaryKey(K key, final long timestamp, final int seqnum, StateSerdes<K, ?> serdes) { byte[] serializedKey = serdes.rawKey(key); http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedSessionStoreIterator.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedSessionStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedSessionStoreIterator.java index 819f263..6fd9636 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedSessionStoreIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedSessionStoreIterator.java @@ -66,7 +66,7 @@ class WrappedSessionStoreIterator<K, V> implements KeyValueIterator<Windowed<K>, @Override public Windowed<K> peekNextKey() { final Bytes bytes = bytesIterator.peekNextKey(); - return SessionKeySerde.from(bytes.get(), serdes.keyDeserializer()); + return SessionKeySerde.from(bytes.get(), serdes.keyDeserializer(), serdes.topic()); } @Override @@ -77,7 +77,7 @@ class WrappedSessionStoreIterator<K, V> implements KeyValueIterator<Windowed<K>, @Override public KeyValue<Windowed<K>, V> next() { final KeyValue<Bytes, byte[]> next = bytesIterator.next(); - return KeyValue.pair(SessionKeySerde.from(next.key.get(), serdes.keyDeserializer()), serdes.valueFrom(next.value)); + return KeyValue.pair(SessionKeySerde.from(next.key.get(), serdes.keyDeserializer(), serdes.topic()), serdes.valueFrom(next.value)); } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionKeySerdeTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionKeySerdeTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionKeySerdeTest.java index 65c4d9f..aca3352 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionKeySerdeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionKeySerdeTest.java @@ -62,39 +62,39 @@ public class SessionKeySerdeTest { @Test public void shouldConvertToBinaryAndBack() throws Exception { - final Bytes serialized = SessionKeySerde.toBinary(windowedKey, serde.serializer()); - final Windowed<String> result = SessionKeySerde.from(serialized.get(), Serdes.String().deserializer()); + final Bytes serialized = SessionKeySerde.toBinary(windowedKey, serde.serializer(), "dummy"); + final Windowed<String> result = SessionKeySerde.from(serialized.get(), Serdes.String().deserializer(), "dummy"); assertEquals(windowedKey, result); } @Test public void shouldExtractEndTimeFromBinary() throws Exception { - final Bytes serialized = SessionKeySerde.toBinary(windowedKey, serde.serializer()); + final Bytes serialized = SessionKeySerde.toBinary(windowedKey, serde.serializer(), "dummy"); assertEquals(endTime, SessionKeySerde.extractEnd(serialized.get())); } @Test public void shouldExtractStartTimeFromBinary() throws Exception { - final Bytes serialized = SessionKeySerde.toBinary(windowedKey, serde.serializer()); + final Bytes serialized = SessionKeySerde.toBinary(windowedKey, serde.serializer(), "dummy"); assertEquals(startTime, SessionKeySerde.extractStart(serialized.get())); } @Test public void shouldExtractWindowFromBindary() throws Exception { - final Bytes serialized = SessionKeySerde.toBinary(windowedKey, serde.serializer()); + final Bytes serialized = SessionKeySerde.toBinary(windowedKey, serde.serializer(), "dummy"); assertEquals(window, SessionKeySerde.extractWindow(serialized.get())); } @Test public void shouldExtractKeyBytesFromBinary() throws Exception { - final Bytes serialized = SessionKeySerde.toBinary(windowedKey, serde.serializer()); + final Bytes serialized = SessionKeySerde.toBinary(windowedKey, serde.serializer(), "dummy"); assertArrayEquals(key.getBytes(), SessionKeySerde.extractKeyBytes(serialized.get())); } @Test public void shouldExtractKeyFromBinary() throws Exception { - final Bytes serialized = SessionKeySerde.toBinary(windowedKey, serde.serializer()); - assertEquals(windowedKey, SessionKeySerde.from(serialized.get(), serde.deserializer())); + final Bytes serialized = SessionKeySerde.toBinary(windowedKey, serde.serializer(), "dummy"); + assertEquals(windowedKey, SessionKeySerde.from(serialized.get(), serde.deserializer(), "dummy")); } @Test http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java index 3af50d8..316494d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java @@ -69,7 +69,7 @@ public class WindowedStreamPartitionerTest { DefaultPartitioner defaultPartitioner = new DefaultPartitioner(); WindowedSerializer<Integer> windowedSerializer = new WindowedSerializer<>(intSerializer); - WindowedStreamPartitioner<Integer, String> streamPartitioner = new WindowedStreamPartitioner<>(windowedSerializer); + WindowedStreamPartitioner<Integer, String> streamPartitioner = new WindowedStreamPartitioner<>(topicName, windowedSerializer); for (int k = 0; k < 10; k++) { Integer key = rand.nextInt(); http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java index 08adbf4..b758799 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java @@ -16,16 +16,6 @@ */ package org.apache.kafka.streams.state; -import java.io.File; -import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Properties; -import java.util.Set; import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.metrics.JmxReporter; @@ -54,6 +44,17 @@ import org.apache.kafka.test.MockProcessorContext; import org.apache.kafka.test.MockTimestampExtractor; import org.apache.kafka.test.TestUtils; +import java.io.File; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.Set; + /** * A component that provides a {@link #context() ProcessingContext} that can be supplied to a {@link KeyValueStore} so that * all entries written to the Kafka topic by the store during {@link KeyValueStore#flush()} are captured for testing purposes. @@ -170,11 +171,12 @@ public class KeyValueStoreTestDriver<K, V> { * @param valueDeserializer the value deserializer for the {@link ProcessorContext}; may not be null * @return the test driver; never null */ - public static <K, V> KeyValueStoreTestDriver<K, V> create(Serializer<K> keySerializer, - Deserializer<K> keyDeserializer, - Serializer<V> valueSerializer, - Deserializer<V> valueDeserializer) { - StateSerdes<K, V> serdes = new StateSerdes<K, V>("unexpected", + public static <K, V> KeyValueStoreTestDriver<K, V> create(final Serializer<K> keySerializer, + final Deserializer<K> keyDeserializer, + final Serializer<V> valueSerializer, + final Deserializer<V> valueDeserializer) { + StateSerdes<K, V> serdes = new StateSerdes<K, V>( + "unexpected", Serdes.serdeFrom(keySerializer, keyDeserializer), Serdes.serdeFrom(valueSerializer, valueDeserializer)); return new KeyValueStoreTestDriver<K, V>(serdes); @@ -234,7 +236,7 @@ public class KeyValueStoreTestDriver<K, V> { this.stateDir.mkdirs(); props = new Properties(); - props.put(StreamsConfig.APPLICATION_ID_CONFIG, "applicationId"); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, "application-id"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class); props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, serdes.keySerde().getClass()); http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java index 59caaaf..7377ba2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java @@ -58,7 +58,9 @@ public class CachingSessionStoreTest { @Before public void setUp() throws Exception { - underlying = new RocksDBSegmentedBytesStore("test", 60000, 3, new SessionKeySchema()); + final SessionKeySchema schema = new SessionKeySchema(); + schema.init("topic"); + underlying = new RocksDBSegmentedBytesStore("test", 60000, 3, schema); final RocksDBSessionStore<Bytes, byte[]> sessionStore = new RocksDBSessionStore<>(underlying, Serdes.Bytes(), Serdes.ByteArray()); cachingStore = new CachingSessionStore<>(sessionStore, Serdes.String(), @@ -116,7 +118,7 @@ public class CachingSessionStoreTest { assertEquals(added.size() - 1, cache.size()); final KeyValueIterator<Bytes, byte[]> iterator = underlying.fetch(Bytes.wrap(added.get(0).key.key().getBytes()), 0, 0); final KeyValue<Bytes, byte[]> next = iterator.next(); - assertEquals(added.get(0).key, SessionKeySerde.from(next.key.get(), Serdes.String().deserializer())); + assertEquals(added.get(0).key, SessionKeySerde.from(next.key.get(), Serdes.String().deserializer(), "dummy")); assertArrayEquals(serdes.rawValue(added.get(0).value), next.value); } http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java index 297a88e..054e685 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java @@ -160,7 +160,7 @@ public class CachingWindowStoreTest { @Test public void shouldIterateCacheAndStore() throws Exception { final Bytes key = Bytes.wrap("1" .getBytes()); - underlying.put(WindowStoreUtils.toBinaryKey(key, DEFAULT_TIMESTAMP, 0, WindowStoreUtils.INNER_SERDES), "a".getBytes()); + underlying.put(WindowStoreUtils.toBinaryKey(key, DEFAULT_TIMESTAMP, 0, WindowStoreUtils.getInnerStateSerde("app-id")), "a".getBytes()); cachingStore.put("1", "b", DEFAULT_TIMESTAMP + WINDOW_SIZE); final WindowStoreIterator<String> fetch = cachingStore.fetch("1", DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE); assertEquals(KeyValue.pair(DEFAULT_TIMESTAMP, "a"), fetch.next()); http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java index e19c4ef..2e5b872 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java @@ -61,7 +61,7 @@ public class CompositeReadOnlyKeyValueStoreTest { } private KeyValueStore<String, String> newStoreInstance() { - return StateStoreTestUtils.newKeyValueStore(storeName, String.class, String.class); + return StateStoreTestUtils.newKeyValueStore(storeName, "app-id", String.class, String.class); } @Test http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java index 3f05428..6e0059f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java @@ -32,7 +32,7 @@ import static org.junit.Assert.assertFalse; public class MergedSortedCacheKeyValueStoreIteratorTest { private final String namespace = "one"; - private final StateSerdes<byte[], byte[]> serdes = new StateSerdes<>(namespace, Serdes.ByteArray(), Serdes.ByteArray()); + private final StateSerdes<byte[], byte[]> serdes = new StateSerdes<>("dummy", Serdes.ByteArray(), Serdes.ByteArray()); private KeyValueStore<Bytes, byte[]> store; private ThreadCache cache; http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedSessionStoreIteratorTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedSessionStoreIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedSessionStoreIteratorTest.java index 5f24fde..d3d8f40 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedSessionStoreIteratorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedSessionStoreIteratorTest.java @@ -45,7 +45,7 @@ public class MergedSortedCacheWrappedSessionStoreIteratorTest { private final SessionWindow cacheWindow = new SessionWindow(10, 20); private final Iterator<KeyValue<Bytes, LRUCacheEntry>> cacheKvs = Collections.singleton(KeyValue.pair( SessionKeySerde.toBinary( - new Windowed<>(cacheKey, cacheWindow), Serdes.String().serializer()), new LRUCacheEntry(cacheKey.getBytes()))) + new Windowed<>(cacheKey, cacheWindow), Serdes.String().serializer(), "dummy"), new LRUCacheEntry(cacheKey.getBytes()))) .iterator(); @Test http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java index ab13c24..bd335d4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java @@ -54,11 +54,12 @@ public class RocksDBSegmentedBytesStoreTest { @Before public void before() { - + final SessionKeySchema schema = new SessionKeySchema(); + schema.init("topic"); bytesStore = new RocksDBSegmentedBytesStore(storeName, retention, numSegments, - new SessionKeySchema()); + schema); stateDir = TestUtils.tempDirectory(); final MockProcessorContext context = new MockProcessorContext(stateDir, @@ -154,7 +155,7 @@ public class RocksDBSegmentedBytesStoreTest { } private Bytes serializeKey(final Windowed<String> key) { - return SessionKeySerde.toBinary(key, Serdes.String().serializer()); + return SessionKeySerde.toBinary(key, Serdes.String().serializer(), "dummy"); } private List<KeyValue<Windowed<String>, Long>> toList(final KeyValueIterator<Bytes, byte[]> iterator) { @@ -162,7 +163,7 @@ public class RocksDBSegmentedBytesStoreTest { while (iterator.hasNext()) { final KeyValue<Bytes, byte[]> next = iterator.next(); final KeyValue<Windowed<String>, Long> deserialized - = KeyValue.pair(SessionKeySerde.from(next.key.get(), Serdes.String().deserializer()), Serdes.Long().deserializer().deserialize("", next.value)); + = KeyValue.pair(SessionKeySerde.from(next.key.get(), Serdes.String().deserializer(), "dummy"), Serdes.Long().deserializer().deserialize("", next.value)); results.add(deserialized); } return results; http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java index 9be7c10..7f01aae 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java @@ -48,8 +48,11 @@ public class RocksDBSessionStoreTest { @Before public void before() { + final SessionKeySchema schema = new SessionKeySchema(); + schema.init("topic"); + final RocksDBSegmentedBytesStore bytesStore = - new RocksDBSegmentedBytesStore("session-store", 10000L, 3, new SessionKeySchema()); + new RocksDBSegmentedBytesStore("session-store", 10000L, 3, schema); sessionStore = new RocksDBSessionStore<>(bytesStore, Serdes.String(), http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java index 7c085dd..354cf01 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java @@ -40,6 +40,7 @@ public class SessionKeySchemaTest { @Before public void before() { + sessionKeySchema.init("topic"); final List<KeyValue<Bytes, Integer>> keys = Arrays.asList(KeyValue.pair(SessionKeySerde.bytesToBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0}), new SessionWindow(0, 0))), 1), KeyValue.pair(SessionKeySerde.bytesToBinary(new Windowed<>(Bytes.wrap(new byte[]{0}), new SessionWindow(0, 0))), 2), KeyValue.pair(SessionKeySerde.bytesToBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0, 0}), new SessionWindow(0, 0))), 3), http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java index 39cc5b5..d30372f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StateSerdes; import org.apache.kafka.test.MockProcessorContext; @@ -28,7 +29,10 @@ import java.util.Collections; @SuppressWarnings("unchecked") public class StateStoreTestUtils { - public static <K, V> KeyValueStore<K, V> newKeyValueStore(String name, Class<K> keyType, Class<V> valueType) { + public static <K, V> KeyValueStore<K, V> newKeyValueStore(final String name, + final String applicationId, + final Class<K> keyType, + final Class<V> valueType) { final InMemoryKeyValueStoreSupplier<K, V> supplier = new InMemoryKeyValueStoreSupplier<>(name, null, null, @@ -37,8 +41,14 @@ public class StateStoreTestUtils { Collections.<String, String>emptyMap()); final StateStore stateStore = supplier.get(); - stateStore.init(new MockProcessorContext(StateSerdes.withBuiltinTypes(name, keyType, valueType), - new NoOpRecordCollector()), stateStore); + stateStore.init( + new MockProcessorContext( + StateSerdes.withBuiltinTypes( + ProcessorStateManager.storeChangelogTopic(applicationId, name), + keyType, + valueType), + new NoOpRecordCollector()), + stateStore); return (KeyValueStore<K, V>) stateStore; } http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java index 6edda74..311eaf6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java @@ -17,9 +17,6 @@ package org.apache.kafka.streams.state.internals; -import java.util.HashMap; -import java.util.Map; - import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.processor.internals.RecordCollectorImpl; @@ -27,6 +24,9 @@ import org.apache.kafka.streams.state.StateSerdes; import org.apache.kafka.test.MockProcessorContext; import org.junit.Test; +import java.util.HashMap; +import java.util.Map; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; http://git-wip-us.apache.org/repos/asf/kafka/blob/aea14651/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java index 76a8747..9918672 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java @@ -42,9 +42,9 @@ public class WrappingStoreProviderTest { final StateStoreProviderStub stubProviderTwo = new StateStoreProviderStub(false); - stubProviderOne.addStore("kv", StateStoreTestUtils.newKeyValueStore("kv", String.class, String.class)); + stubProviderOne.addStore("kv", StateStoreTestUtils.newKeyValueStore("kv", "app-id", String.class, String.class)); stubProviderOne.addStore("window", new NoOpWindowStore()); - stubProviderTwo.addStore("kv", StateStoreTestUtils.newKeyValueStore("kv", String.class, String.class)); + stubProviderTwo.addStore("kv", StateStoreTestUtils.newKeyValueStore("kv", "app-id", String.class, String.class)); stubProviderTwo.addStore("window", new NoOpWindowStore()); wrappingStoreProvider = new WrappingStoreProvider(
