Repository: kafka Updated Branches: refs/heads/0.10.2 d64025911 -> 65c8f680e
HOTFIX: WindowedStreamPartitioner does not provide topic name to serializer Author: Matthias J. Sax <[email protected]> Reviewers: Eno Thereska <[email protected]>, Damian Guy <[email protected]>, Ismael Juma <[email protected]> Closes #2776 from mjsax/hotfix-window-serdes-0102 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/65c8f680 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/65c8f680 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/65c8f680 Branch: refs/heads/0.10.2 Commit: 65c8f680ee4bf0efad82843736b1c63bc89c7d61 Parents: d640259 Author: Matthias J. Sax <[email protected]> Authored: Wed Apr 5 11:21:20 2017 +0100 Committer: Ismael Juma <[email protected]> Committed: Wed Apr 5 11:21:20 2017 +0100 ---------------------------------------------------------------------- .../streams/kstream/internals/KStreamImpl.java | 12 ++-- .../kstream/internals/SessionKeySerde.java | 17 +++--- .../internals/WindowedStreamPartitioner.java | 8 ++- .../apache/kafka/streams/state/StateSerdes.java | 61 +++++++++++--------- .../state/internals/CachingKeyValueStore.java | 3 +- .../state/internals/CachingSessionStore.java | 14 +++-- .../state/internals/CachingWindowStore.java | 7 ++- .../ChangeLoggingKeyValueBytesStore.java | 9 ++- .../internals/ChangeLoggingKeyValueStore.java | 3 +- .../ChangeLoggingSegmentedBytesStore.java | 9 ++- .../internals/InMemoryKeyValueLoggedStore.java | 10 ++-- .../InMemoryKeyValueStoreSupplier.java | 8 ++- .../streams/state/internals/MemoryLRUCache.java | 8 ++- .../MergedSortedCacheSessionStoreIterator.java | 4 +- .../internals/RocksDBSegmentedBytesStore.java | 3 + .../state/internals/RocksDBSessionStore.java | 14 +++-- .../streams/state/internals/RocksDBStore.java | 8 ++- .../state/internals/RocksDBWindowStore.java | 3 +- .../state/internals/SegmentedBytesStore.java | 8 +++ .../state/internals/SessionKeySchema.java | 10 +++- .../state/internals/WindowStoreKeySchema.java | 7 ++- .../state/internals/WindowStoreUtils.java | 5 +- .../kstream/internals/SessionKeySerdeTest.java | 10 ++-- .../WindowedStreamPartitionerTest.java | 2 +- .../streams/state/KeyValueStoreTestDriver.java | 8 +-- .../internals/CachingSessionStoreTest.java | 2 +- .../state/internals/CachingWindowStoreTest.java | 2 +- .../CompositeReadOnlyKeyValueStoreTest.java | 2 +- ...rgedSortedCacheSessionStoreIteratorTest.java | 2 +- .../RocksDBSegmentedBytesStoreTest.java | 4 +- .../state/internals/StateStoreTestUtils.java | 13 ++++- .../internals/WrappingStoreProviderTest.java | 4 +- 32 files changed, 176 insertions(+), 104 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/65c8f680/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 0434f06..7b9fd94 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -349,16 +349,16 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V @SuppressWarnings("unchecked") @Override - public void to(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<? super K, ? super V> partitioner, String topic) { + public void to(final Serde<K> keySerde, final Serde<V> valSerde, StreamPartitioner<? super K, ? super V> partitioner, final String topic) { Objects.requireNonNull(topic, "topic can't be null"); - String name = topology.newName(SINK_NAME); + final String name = topology.newName(SINK_NAME); - Serializer<K> keySerializer = keySerde == null ? null : keySerde.serializer(); - Serializer<V> valSerializer = valSerde == null ? null : valSerde.serializer(); + final Serializer<K> keySerializer = keySerde == null ? null : keySerde.serializer(); + final Serializer<V> valSerializer = valSerde == null ? null : valSerde.serializer(); if (partitioner == null && keySerializer != null && keySerializer instanceof WindowedSerializer) { - WindowedSerializer<Object> windowedSerializer = (WindowedSerializer<Object>) keySerializer; - partitioner = (StreamPartitioner<K, V>) new WindowedStreamPartitioner<Object, V>(windowedSerializer); + final WindowedSerializer<Object> windowedSerializer = (WindowedSerializer<Object>) keySerializer; + partitioner = (StreamPartitioner<K, V>) new WindowedStreamPartitioner<Object, V>(topic, windowedSerializer); } topology.addSink(name, topic, keySerializer, valSerializer, partitioner, this.name); http://git-wip-us.apache.org/repos/asf/kafka/blob/65c8f680/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java index 249350e..6909773 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java @@ -33,7 +33,6 @@ import java.util.Map; */ public class SessionKeySerde<K> implements Serde<Windowed<K>> { private static final int TIMESTAMP_SIZE = 8; - private static final String SESSIONKEY = "sessionkey"; private final Serde<K> keySerde; @@ -77,7 +76,7 @@ public class SessionKeySerde<K> implements Serde<Windowed<K>> { if (data == null) { return null; } - return toBinary(data, keySerializer).get(); + return toBinary(data, keySerializer, topic).get(); } @Override @@ -102,7 +101,7 @@ public class SessionKeySerde<K> implements Serde<Windowed<K>> { if (data == null || data.length == 0) { return null; } - return from(data, deserializer); + return from(data, deserializer, topic); } @@ -127,20 +126,20 @@ public class SessionKeySerde<K> implements Serde<Windowed<K>> { return bytes; } - public static <K> Windowed<K> from(final byte[] binaryKey, final Deserializer<K> keyDeserializer) { - final K key = extractKey(binaryKey, keyDeserializer); + public static <K> Windowed<K> from(final byte[] binaryKey, final Deserializer<K> keyDeserializer, final String topic) { + final K key = extractKey(binaryKey, keyDeserializer, topic); final ByteBuffer buffer = ByteBuffer.wrap(binaryKey); final long start = buffer.getLong(binaryKey.length - TIMESTAMP_SIZE); final long end = buffer.getLong(binaryKey.length - 2 * TIMESTAMP_SIZE); return new Windowed<>(key, new SessionWindow(start, end)); } - private static <K> K extractKey(final byte[] binaryKey, Deserializer<K> deserializer) { - return deserializer.deserialize(SESSIONKEY, extractKeyBytes(binaryKey)); + private static <K> K extractKey(final byte[] binaryKey, Deserializer<K> deserializer, final String topic) { + return deserializer.deserialize(topic, extractKeyBytes(binaryKey)); } - public static <K> Bytes toBinary(final Windowed<K> sessionKey, final Serializer<K> serializer) { - final byte[] bytes = serializer.serialize(SESSIONKEY, sessionKey.key()); + public static <K> Bytes toBinary(final Windowed<K> sessionKey, final Serializer<K> serializer, final String topic) { + final byte[] bytes = serializer.serialize(topic, sessionKey.key()); ByteBuffer buf = ByteBuffer.allocate(bytes.length + 2 * TIMESTAMP_SIZE); buf.put(bytes); buf.putLong(sessionKey.window().end()); http://git-wip-us.apache.org/repos/asf/kafka/blob/65c8f680/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java index ba9873b..ad54fcb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java @@ -24,9 +24,11 @@ import static org.apache.kafka.common.utils.Utils.toPositive; public class WindowedStreamPartitioner<K, V> implements StreamPartitioner<Windowed<K>, V> { + private final String topic; private final WindowedSerializer<K> serializer; - public WindowedStreamPartitioner(WindowedSerializer<K> serializer) { + WindowedStreamPartitioner(final String topic, final WindowedSerializer<K> serializer) { + this.topic = topic; this.serializer = serializer; } @@ -40,8 +42,8 @@ public class WindowedStreamPartitioner<K, V> implements StreamPartitioner<Window * @param numPartitions the total number of partitions * @return an integer between 0 and {@code numPartitions-1}, or {@code null} if the default partitioning logic should be used */ - public Integer partition(Windowed<K> windowedKey, V value, int numPartitions) { - byte[] keyBytes = serializer.serializeBaseKey(null, windowedKey); + public Integer partition(final Windowed<K> windowedKey, final V value, final int numPartitions) { + final byte[] keyBytes = serializer.serializeBaseKey(topic, windowedKey); // hash the keyBytes to choose a partition return toPositive(Utils.murmur2(keyBytes)) % numPartitions; http://git-wip-us.apache.org/repos/asf/kafka/blob/65c8f680/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java b/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java index b19510c..059be37 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java @@ -32,18 +32,21 @@ public final class StateSerdes<K, V> { /** * Create a new instance of {@link StateSerdes} for the given state name and key-/value-type classes. * - * @param stateName the name of the state - * @param keyClass the class of the key type - * @param valueClass the class of the value type - * @param <K> the key type - * @param <V> the value type - * @return a new instance of {@link StateSerdes} + * @param topic the topic name + * @param keyClass the class of the key type + * @param valueClass the class of the value type + * @param <K> the key type + * @param <V> the value type + * @return a new instance of {@link StateSerdes} */ - public static <K, V> StateSerdes<K, V> withBuiltinTypes(String stateName, Class<K> keyClass, Class<V> valueClass) { - return new StateSerdes<>(stateName, Serdes.serdeFrom(keyClass), Serdes.serdeFrom(valueClass)); + public static <K, V> StateSerdes<K, V> withBuiltinTypes( + final String topic, + final Class<K> keyClass, + final Class<V> valueClass) { + return new StateSerdes<>(topic, Serdes.serdeFrom(keyClass), Serdes.serdeFrom(valueClass)); } - private final String stateName; + private final String topic; private final Serde<K> keySerde; private final Serde<V> valueSerde; @@ -53,22 +56,26 @@ public final class StateSerdes<K, V> { * is provided to bind this serde factory to, so that future calls for serialize / deserialize do not * need to provide the topic name any more. * - * @param stateName the name of the state - * @param keySerde the serde for keys; cannot be null - * @param valueSerde the serde for values; cannot be null + * @param topic the topic name + * @param keySerde the serde for keys; cannot be null + * @param valueSerde the serde for values; cannot be null * @throws IllegalArgumentException if key or value serde is null */ @SuppressWarnings("unchecked") - public StateSerdes(String stateName, - Serde<K> keySerde, - Serde<V> valueSerde) { - this.stateName = stateName; - - if (keySerde == null) + public StateSerdes(final String topic, + final Serde<K> keySerde, + final Serde<V> valueSerde) { + if (topic == null) { + throw new IllegalArgumentException("topic cannot be null"); + } + if (keySerde == null) { throw new IllegalArgumentException("key serde cannot be null"); - if (valueSerde == null) + } + if (valueSerde == null) { throw new IllegalArgumentException("value serde cannot be null"); + } + this.topic = topic; this.keySerde = keySerde; this.valueSerde = valueSerde; } @@ -128,12 +135,12 @@ public final class StateSerdes<K, V> { } /** - * Return the name of the state. + * Return the topic. * - * @return the name of the state + * @return the topic */ - public String stateName() { - return stateName; + public String topic() { + return topic; } /** @@ -143,7 +150,7 @@ public final class StateSerdes<K, V> { * @return the key as typed object */ public K keyFrom(byte[] rawKey) { - return keySerde.deserializer().deserialize(stateName, rawKey); + return keySerde.deserializer().deserialize(topic, rawKey); } /** @@ -153,7 +160,7 @@ public final class StateSerdes<K, V> { * @return the value as typed object */ public V valueFrom(byte[] rawValue) { - return valueSerde.deserializer().deserialize(stateName, rawValue); + return valueSerde.deserializer().deserialize(topic, rawValue); } /** @@ -163,7 +170,7 @@ public final class StateSerdes<K, V> { * @return the serialized key */ public byte[] rawKey(K key) { - return keySerde.serializer().serialize(stateName, key); + return keySerde.serializer().serialize(topic, key); } /** @@ -173,6 +180,6 @@ public final class StateSerdes<K, V> { * @return the serialized value */ public byte[] rawValue(V value) { - return valueSerde.serializer().serialize(stateName, value); + return valueSerde.serializer().serialize(topic, value); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/65c8f680/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java index 1e91b47..f9f3077 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java @@ -24,6 +24,7 @@ import org.apache.kafka.streams.kstream.internals.CacheFlushListener; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.processor.internals.RecordContext; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; @@ -69,7 +70,7 @@ class CachingKeyValueStore<K, V> implements WrappedStateStore, KeyValueStore<K, @SuppressWarnings("unchecked") void initInternal(final ProcessorContext context) { this.context = (InternalProcessorContext) context; - this.serdes = new StateSerdes<>(underlying.name(), + this.serdes = new StateSerdes<>(ProcessorStateManager.storeChangelogTopic(context.applicationId(), underlying.name()), keySerde == null ? (Serde<K>) context.keySerde() : keySerde, valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde); http://git-wip-us.apache.org/repos/asf/kafka/blob/65c8f680/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java index 6cfbf81..ed64246 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java @@ -24,6 +24,7 @@ import org.apache.kafka.streams.kstream.internals.SessionKeySerde; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.processor.internals.RecordContext; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.SessionStore; @@ -43,6 +44,7 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractWrappedState private StateSerdes<K, AGG> serdes; private ThreadCache cache; private CacheFlushListener<Windowed<K>, AGG> flushListener; + private String topic; CachingSessionStore(final SessionStore<Bytes, byte[]> bytesStore, final Serde<K> keySerde, @@ -58,7 +60,7 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractWrappedState final long earliestSessionEndTime, final long latestSessionStartTime) { validateStoreOpen(); - final Bytes binarySessionId = Bytes.wrap(keySerde.serializer().serialize(this.name(), key)); + final Bytes binarySessionId = Bytes.wrap(keySerde.serializer().serialize(topic, key)); final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(cacheName, keySchema.lowerRange(binarySessionId, earliestSessionEndTime).get(), @@ -79,7 +81,7 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractWrappedState public void put(final Windowed<K> key, AGG value) { validateStoreOpen(); - final Bytes binaryKey = SessionKeySerde.toBinary(key, keySerde.serializer()); + final Bytes binaryKey = SessionKeySerde.toBinary(key, keySerde.serializer(), topic); final LRUCacheEntry entry = new LRUCacheEntry(serdes.rawValue(value), true, context.offset(), key.window().end(), context.partition(), context.topic()); cache.put(cacheName, binaryKey.get(), entry); @@ -92,6 +94,7 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractWrappedState @SuppressWarnings("unchecked") public void init(final ProcessorContext context, final StateStore root) { + topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), root.name()); bytesStore.init(context, root); initInternal((InternalProcessorContext) context); } @@ -100,7 +103,8 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractWrappedState private void initInternal(final InternalProcessorContext context) { this.context = context; - this.serdes = new StateSerdes<>(bytesStore.name(), + keySchema.init(topic); + this.serdes = new StateSerdes<>(topic, keySerde == null ? (Serde<K>) context.keySerde() : keySerde, aggSerde == null ? (Serde<AGG>) context.valueSerde() : aggSerde); @@ -123,12 +127,12 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractWrappedState final RecordContext current = context.recordContext(); context.setRecordContext(entry.recordContext()); try { - final Windowed<K> key = SessionKeySerde.from(binaryKey.get(), keySerde.deserializer()); + final Windowed<K> key = SessionKeySerde.from(binaryKey.get(), keySerde.deserializer(), topic); if (flushListener != null) { final AGG newValue = serdes.valueFrom(entry.newValue()); final AGG oldValue = fetchPrevious(binaryKey); if (!(newValue == null && oldValue == null)) { - flushListener.apply(key, newValue == null ? null : newValue, oldValue); + flushListener.apply(key, newValue, oldValue); } } bytesStore.put(new Windowed<>(Bytes.wrap(serdes.rawKey(key.key())), key.window()), entry.newValue()); http://git-wip-us.apache.org/repos/asf/kafka/blob/65c8f680/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java index 33df426..37ce336 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java @@ -25,6 +25,7 @@ import org.apache.kafka.streams.kstream.internals.TimeWindow; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.processor.internals.RecordContext; import org.apache.kafka.streams.state.StateSerdes; import org.apache.kafka.streams.state.WindowStore; @@ -62,12 +63,13 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractWrappedStateSto public void init(final ProcessorContext context, final StateStore root) { underlying.init(context, root); initInternal(context); + keySchema.init(context.applicationId()); } @SuppressWarnings("unchecked") void initInternal(final ProcessorContext context) { this.context = (InternalProcessorContext) context; - this.serdes = new StateSerdes<>(underlying.name(), + this.serdes = new StateSerdes<>(ProcessorStateManager.storeChangelogTopic(context.applicationId(), underlying.name()), keySerde == null ? (Serde<K>) context.keySerde() : keySerde, valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde); @@ -157,8 +159,7 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractWrappedStateSto return new MergedSortedCacheWindowStoreIterator<>(filteredCacheIterator, underlyingIterator, - new StateSerdes<>(serdes.stateName(), Serdes.Long(), serdes.valueSerde())); - + new StateSerdes<>(serdes.topic(), Serdes.Long(), serdes.valueSerde())); } http://git-wip-us.apache.org/repos/asf/kafka/blob/65c8f680/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java index e31d04b..9ece123 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; @@ -37,7 +38,13 @@ public class ChangeLoggingKeyValueBytesStore extends WrappedStateStore.AbstractW @Override public void init(final ProcessorContext context, final StateStore root) { inner.init(context, root); - this.changeLogger = new StoreChangeLogger<>(inner.name(), context, WindowStoreUtils.INNER_SERDES); + this.changeLogger = new StoreChangeLogger<>( + inner.name(), + context, + WindowStoreUtils.getInnerStateSerde( + ProcessorStateManager.storeChangelogTopic( + context.applicationId(), + inner.name()))); } http://git-wip-us.apache.org/repos/asf/kafka/blob/65c8f680/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStore.java index cd63d1a..d50f907 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStore.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StateSerdes; @@ -54,7 +55,7 @@ class ChangeLoggingKeyValueStore<K, V> extends WrappedStateStore.AbstractWrapped public void init(final ProcessorContext context, final StateStore root) { innerBytes.init(context, root); - this.serdes = new StateSerdes<>(innerBytes.name(), + this.serdes = new StateSerdes<>(ProcessorStateManager.storeChangelogTopic(context.applicationId(), innerBytes.name()), keySerde == null ? (Serde<K>) context.keySerde() : keySerde, valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde); } http://git-wip-us.apache.org/repos/asf/kafka/blob/65c8f680/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java index 21c2866..2a5a1a1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java @@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.state.KeyValueIterator; /** @@ -66,6 +67,12 @@ class ChangeLoggingSegmentedBytesStore extends WrappedStateStore.AbstractWrapped @SuppressWarnings("unchecked") public void init(final ProcessorContext context, final StateStore root) { bytesStore.init(context, root); - changeLogger = new StoreChangeLogger<>(name(), context, WindowStoreUtils.INNER_SERDES); + changeLogger = new StoreChangeLogger<>( + name(), + context, + WindowStoreUtils.getInnerStateSerde( + ProcessorStateManager.storeChangelogTopic( + context.applicationId(), + name()))); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/65c8f680/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java index d81f6fb..f402390 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StateSerdes; @@ -35,7 +36,6 @@ public class InMemoryKeyValueLoggedStore<K, V> implements KeyValueStore<K, V> { private final String storeName; private StoreChangeLogger<K, V> changeLogger; - private ProcessorContext context; public InMemoryKeyValueLoggedStore(final String storeName, final KeyValueStore<K, V> inner, Serde<K> keySerde, Serde<V> valueSerde) { this.storeName = storeName; @@ -52,13 +52,13 @@ public class InMemoryKeyValueLoggedStore<K, V> implements KeyValueStore<K, V> { @Override @SuppressWarnings("unchecked") public void init(ProcessorContext context, StateStore root) { - this.context = context; inner.init(context, root); // construct the serde - StateSerdes<K, V> serdes = new StateSerdes<>(storeName, - keySerde == null ? (Serde<K>) context.keySerde() : keySerde, - valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde); + StateSerdes<K, V> serdes = new StateSerdes<>( + ProcessorStateManager.storeChangelogTopic(context.applicationId(), inner.name()), + keySerde == null ? (Serde<K>) context.keySerde() : keySerde, + valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde); this.changeLogger = new StoreChangeLogger<>(storeName, context, serdes); http://git-wip-us.apache.org/repos/asf/kafka/blob/65c8f680/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java index e00f8ab..05130ef 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java @@ -23,6 +23,7 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StateSerdes; @@ -94,9 +95,10 @@ public class InMemoryKeyValueStoreSupplier<K, V> extends AbstractStoreSupplier<K @SuppressWarnings("unchecked") public void init(ProcessorContext context, StateStore root) { // construct the serde - this.serdes = new StateSerdes<>(name, - keySerde == null ? (Serde<K>) context.keySerde() : keySerde, - valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde); + this.serdes = new StateSerdes<>( + ProcessorStateManager.storeChangelogTopic(context.applicationId(), name), + keySerde == null ? (Serde<K>) context.keySerde() : keySerde, + valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde); // register the store context.register(root, true, new StateRestoreCallback() { http://git-wip-us.apache.org/repos/asf/kafka/blob/65c8f680/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java index 4e1f40e..4ab6fab 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java @@ -21,6 +21,7 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StateSerdes; @@ -103,9 +104,10 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> { @SuppressWarnings("unchecked") public void init(ProcessorContext context, StateStore root) { // construct the serde - this.serdes = new StateSerdes<>(name, - keySerde == null ? (Serde<K>) context.keySerde() : keySerde, - valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde); + this.serdes = new StateSerdes<>( + ProcessorStateManager.storeChangelogTopic(context.applicationId(), name), + keySerde == null ? (Serde<K>) context.keySerde() : keySerde, + valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde); // register the store context.register(root, true, new StateRestoreCallback() { http://git-wip-us.apache.org/repos/asf/kafka/blob/65c8f680/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java index db64621..8ecc654 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java @@ -38,7 +38,7 @@ class MergedSortedCacheSessionStoreIterator<K, AGG> extends AbstractMergedSorted MergedSortedCacheSessionStoreIterator(final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator, final KeyValueIterator<Windowed<Bytes>, byte[]> storeIterator, final StateSerdes<K, AGG> serdes) { - super(cacheIterator, storeIterator, new StateSerdes<>(serdes.stateName(), + super(cacheIterator, storeIterator, new StateSerdes<>(serdes.topic(), new SessionKeySerde<>(serdes.keySerde()), serdes.valueSerde())); @@ -53,7 +53,7 @@ class MergedSortedCacheSessionStoreIterator<K, AGG> extends AbstractMergedSorted @Override Windowed<K> deserializeCacheKey(final Bytes cacheKey) { - return SessionKeySerde.from(cacheKey.get(), rawSerdes.keyDeserializer()); + return SessionKeySerde.from(cacheKey.get(), rawSerdes.keyDeserializer(), rawSerdes.topic()); } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/65c8f680/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java index 31956ba..ef6ea3c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.state.KeyValueIterator; import java.util.List; @@ -95,6 +96,8 @@ class RocksDBSegmentedBytesStore implements SegmentedBytesStore { public void init(ProcessorContext context, StateStore root) { this.context = context; + keySchema.init(ProcessorStateManager.storeChangelogTopic(context.applicationId(), root.name())); + segments.openExisting(context); // register and possibly restore the state from the logs http://git-wip-us.apache.org/repos/asf/kafka/blob/65c8f680/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java index a8ddc73..22f4a9d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java @@ -23,6 +23,7 @@ import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.SessionKeySerde; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.SessionStore; import org.apache.kafka.streams.state.StateSerdes; @@ -34,6 +35,7 @@ class RocksDBSessionStore<K, AGG> implements SessionStore<K, AGG> { private final Serde<AGG> aggSerde; private final SegmentedBytesStore bytesStore; private StateSerdes<K, AGG> serdes; + protected String topic; RocksDBSessionStore(final SegmentedBytesStore bytesStore, @@ -55,12 +57,12 @@ class RocksDBSessionStore<K, AGG> implements SessionStore<K, AGG> { @Override public void remove(final Windowed<K> key) { - bytesStore.remove(SessionKeySerde.toBinary(key, serdes.keySerializer())); + bytesStore.remove(SessionKeySerde.toBinary(key, serdes.keySerializer(), topic)); } @Override public void put(final Windowed<K> sessionKey, final AGG aggregate) { - bytesStore.put(SessionKeySerde.toBinary(sessionKey, serdes.keySerializer()), aggSerde.serializer().serialize(bytesStore.name(), aggregate)); + bytesStore.put(SessionKeySerde.toBinary(sessionKey, serdes.keySerializer(), topic), aggSerde.serializer().serialize(bytesStore.name(), aggregate)); } @Override @@ -71,7 +73,9 @@ class RocksDBSessionStore<K, AGG> implements SessionStore<K, AGG> { @Override @SuppressWarnings("unchecked") public void init(final ProcessorContext context, final StateStore root) { - this.serdes = new StateSerdes<>(bytesStore.name(), + final String storeName = bytesStore.name(); + topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName); + this.serdes = new StateSerdes<>(topic, keySerde == null ? (Serde<K>) context.keySerde() : keySerde, aggSerde == null ? (Serde<AGG>) context.valueSerde() : aggSerde); @@ -121,7 +125,7 @@ class RocksDBSessionStore<K, AGG> implements SessionStore<K, AGG> { @Override public Windowed<K> peekNextKey() { final Bytes bytes = bytesIterator.peekNextKey(); - return SessionKeySerde.from(bytes.get(), serdes.keyDeserializer()); + return SessionKeySerde.from(bytes.get(), serdes.keyDeserializer(), serdes.topic()); } @Override @@ -132,7 +136,7 @@ class RocksDBSessionStore<K, AGG> implements SessionStore<K, AGG> { @Override public KeyValue<Windowed<K>, AGG> next() { final KeyValue<Bytes, byte[]> next = bytesIterator.next(); - return KeyValue.pair(SessionKeySerde.from(next.key.get(), serdes.keyDeserializer()), serdes.valueFrom(next.value)); + return KeyValue.pair(SessionKeySerde.from(next.key.get(), serdes.keyDeserializer(), serdes.topic()), serdes.valueFrom(next.value)); } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/65c8f680/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index 6bb14dd..acfb5b1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -27,6 +27,7 @@ import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.RocksDBConfigSetter; @@ -141,9 +142,10 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { } // we need to construct the serde while opening DB since // it is also triggered by windowed DB segments without initialization - this.serdes = new StateSerdes<>(name, - keySerde == null ? (Serde<K>) context.keySerde() : keySerde, - valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde); + this.serdes = new StateSerdes<>( + ProcessorStateManager.storeChangelogTopic(context.applicationId(), name), + keySerde == null ? (Serde<K>) context.keySerde() : keySerde, + valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde); this.dbDir = new File(new File(context.stateDir(), parentDir), this.name); this.db = openDB(this.dbDir, this.options, TTL_SECONDS); http://git-wip-us.apache.org/repos/asf/kafka/blob/65c8f680/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java index 80c4796..ffd3061 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java @@ -25,6 +25,7 @@ import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.StateSerdes; import org.apache.kafka.streams.state.WindowStore; @@ -69,7 +70,7 @@ class RocksDBWindowStore<K, V> implements WindowStore<K, V> { public void init(final ProcessorContext context, final StateStore root) { this.context = context; // construct the serde - this.serdes = new StateSerdes<>(bytesStore.name(), + this.serdes = new StateSerdes<>(ProcessorStateManager.storeChangelogTopic(context.applicationId(), bytesStore.name()), keySerde == null ? (Serde<K>) context.keySerde() : keySerde, valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde); http://git-wip-us.apache.org/repos/asf/kafka/blob/65c8f680/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java index ab1099e..b2957da 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java @@ -67,6 +67,14 @@ public interface SegmentedBytesStore extends StateStore { interface KeySchema { + + /** + * Initialized the schema with a topic. + * + * @param topic a topic name + */ + void init(final String topic); + /** * Given a record-key and a time, construct a Segmented key that represents * the upper range of keys to search when performing range queries. http://git-wip-us.apache.org/repos/asf/kafka/blob/65c8f680/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java index 3747d0f..698970a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java @@ -27,17 +27,23 @@ import java.util.List; class SessionKeySchema implements SegmentedBytesStore.KeySchema { + private String topic; + + @Override + public void init(final String topic) { + this.topic = topic; + } @Override public Bytes upperRange(final Bytes key, final long to) { final Windowed<Bytes> sessionKey = new Windowed<>(key, new SessionWindow(to, Long.MAX_VALUE)); - return SessionKeySerde.toBinary(sessionKey, Serdes.Bytes().serializer()); + return SessionKeySerde.toBinary(sessionKey, Serdes.Bytes().serializer(), topic); } @Override public Bytes lowerRange(final Bytes key, final long from) { final Windowed<Bytes> sessionKey = new Windowed<>(key, new SessionWindow(0, Math.max(0, from))); - return SessionKeySerde.toBinary(sessionKey, Serdes.Bytes().serializer()); + return SessionKeySerde.toBinary(sessionKey, Serdes.Bytes().serializer(), topic); } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/65c8f680/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreKeySchema.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreKeySchema.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreKeySchema.java index a4d347c..9d32592 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreKeySchema.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreKeySchema.java @@ -24,7 +24,12 @@ import org.apache.kafka.streams.state.StateSerdes; import java.util.List; class WindowStoreKeySchema implements RocksDBSegmentedBytesStore.KeySchema { - private final StateSerdes<Bytes, byte[]> serdes = new StateSerdes<>("window-store-key-schema", Serdes.Bytes(), Serdes.ByteArray()); + private StateSerdes<Bytes, byte[]> serdes; + + @Override + public void init(final String topic) { + serdes = new StateSerdes<>(topic, Serdes.Bytes(), Serdes.ByteArray()); + } @Override public Bytes upperRange(final Bytes key, final long to) { http://git-wip-us.apache.org/repos/asf/kafka/blob/65c8f680/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java index 074cf8a..0f491ac 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java @@ -34,9 +34,12 @@ public class WindowStoreUtils { /** Inner byte array serde used for segments */ public static final Serde<Bytes> INNER_KEY_SERDE = Serdes.Bytes(); public static final Serde<byte[]> INNER_VALUE_SERDE = Serdes.ByteArray(); - public static final StateSerdes<Bytes, byte[]> INNER_SERDES = new StateSerdes<>("rocksDB-inner", INNER_KEY_SERDE, INNER_VALUE_SERDE); + static StateSerdes<Bytes, byte[]> getInnerStateSerde(final String topic) { + return new StateSerdes<>(topic, INNER_KEY_SERDE, INNER_VALUE_SERDE); + } + public static <K> byte[] toBinaryKey(K key, final long timestamp, final int seqnum, StateSerdes<K, ?> serdes) { byte[] serializedKey = serdes.rawKey(key); return toBinaryKey(serializedKey, timestamp, seqnum); http://git-wip-us.apache.org/repos/asf/kafka/blob/65c8f680/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionKeySerdeTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionKeySerdeTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionKeySerdeTest.java index 3ccdc2c..5c70c8a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionKeySerdeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionKeySerdeTest.java @@ -58,29 +58,29 @@ public class SessionKeySerdeTest { @Test public void shouldConvertToBinaryAndBack() throws Exception { final Windowed<String> key = new Windowed<>("key", new SessionWindow(10, 20)); - final Bytes serialized = SessionKeySerde.toBinary(key, Serdes.String().serializer()); - final Windowed<String> result = SessionKeySerde.from(serialized.get(), Serdes.String().deserializer()); + final Bytes serialized = SessionKeySerde.toBinary(key, Serdes.String().serializer(), "topic"); + final Windowed<String> result = SessionKeySerde.from(serialized.get(), Serdes.String().deserializer(), "topic"); assertEquals(key, result); } @Test public void shouldExtractEndTimeFromBinary() throws Exception { final Windowed<String> key = new Windowed<>("key", new SessionWindow(10, 100)); - final Bytes serialized = SessionKeySerde.toBinary(key, Serdes.String().serializer()); + final Bytes serialized = SessionKeySerde.toBinary(key, Serdes.String().serializer(), "topic"); assertEquals(100, SessionKeySerde.extractEnd(serialized.get())); } @Test public void shouldExtractStartTimeFromBinary() throws Exception { final Windowed<String> key = new Windowed<>("key", new SessionWindow(50, 100)); - final Bytes serialized = SessionKeySerde.toBinary(key, Serdes.String().serializer()); + final Bytes serialized = SessionKeySerde.toBinary(key, Serdes.String().serializer(), "topic"); assertEquals(50, SessionKeySerde.extractStart(serialized.get())); } @Test public void shouldExtractKeyBytesFromBinary() throws Exception { final Windowed<String> key = new Windowed<>("blah", new SessionWindow(50, 100)); - final Bytes serialized = SessionKeySerde.toBinary(key, Serdes.String().serializer()); + final Bytes serialized = SessionKeySerde.toBinary(key, Serdes.String().serializer(), "topic"); assertArrayEquals("blah".getBytes(), SessionKeySerde.extractKeyBytes(serialized.get())); } http://git-wip-us.apache.org/repos/asf/kafka/blob/65c8f680/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java index 0b6288f..efb3fd2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java @@ -70,7 +70,7 @@ public class WindowedStreamPartitionerTest { DefaultPartitioner defaultPartitioner = new DefaultPartitioner(); WindowedSerializer<Integer> windowedSerializer = new WindowedSerializer<>(intSerializer); - WindowedStreamPartitioner<Integer, String> streamPartitioner = new WindowedStreamPartitioner<>(windowedSerializer); + WindowedStreamPartitioner<Integer, String> streamPartitioner = new WindowedStreamPartitioner<>(topicName, windowedSerializer); for (int k = 0; k < 10; k++) { Integer key = rand.nextInt(); http://git-wip-us.apache.org/repos/asf/kafka/blob/65c8f680/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java index 2f5b368..1f2665a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java @@ -170,10 +170,10 @@ public class KeyValueStoreTestDriver<K, V> { * @param valueDeserializer the value deserializer for the {@link ProcessorContext}; may not be null * @return the test driver; never null */ - public static <K, V> KeyValueStoreTestDriver<K, V> create(Serializer<K> keySerializer, - Deserializer<K> keyDeserializer, - Serializer<V> valueSerializer, - Deserializer<V> valueDeserializer) { + public static <K, V> KeyValueStoreTestDriver<K, V> create(final Serializer<K> keySerializer, + final Deserializer<K> keyDeserializer, + final Serializer<V> valueSerializer, + final Deserializer<V> valueDeserializer) { StateSerdes<K, V> serdes = new StateSerdes<K, V>("unexpected", Serdes.serdeFrom(keySerializer, keyDeserializer), Serdes.serdeFrom(valueSerializer, valueDeserializer)); http://git-wip-us.apache.org/repos/asf/kafka/blob/65c8f680/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java index 0475031..e12d693 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java @@ -117,7 +117,7 @@ public class CachingSessionStoreTest { assertEquals(added.size() - 1, cache.size()); final KeyValueIterator<Bytes, byte[]> iterator = underlying.fetch(Bytes.wrap(added.get(0).key.key().getBytes()), 0, 0); final KeyValue<Bytes, byte[]> next = iterator.next(); - assertEquals(added.get(0).key, SessionKeySerde.from(next.key.get(), Serdes.String().deserializer())); + assertEquals(added.get(0).key, SessionKeySerde.from(next.key.get(), Serdes.String().deserializer(), "topic")); assertArrayEquals(serdes.rawValue(added.get(0).value), next.value); } http://git-wip-us.apache.org/repos/asf/kafka/blob/65c8f680/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java index 11e605c..d50b551 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java @@ -162,7 +162,7 @@ public class CachingWindowStoreTest { @Test public void shouldIterateCacheAndStore() throws Exception { final Bytes key = Bytes.wrap("1" .getBytes()); - underlying.put(Bytes.wrap(WindowStoreUtils.toBinaryKey(key, DEFAULT_TIMESTAMP, 0, WindowStoreUtils.INNER_SERDES)), "a".getBytes()); + underlying.put(Bytes.wrap(WindowStoreUtils.toBinaryKey(key, DEFAULT_TIMESTAMP, 0, WindowStoreUtils.getInnerStateSerde("topic"))), "a".getBytes()); cachingStore.put("1", "b", DEFAULT_TIMESTAMP + WINDOW_SIZE); final WindowStoreIterator<String> fetch = cachingStore.fetch("1", DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE); assertEquals(KeyValue.pair(DEFAULT_TIMESTAMP, "a"), fetch.next()); http://git-wip-us.apache.org/repos/asf/kafka/blob/65c8f680/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java index 0fd6001..579f129 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java @@ -59,7 +59,7 @@ public class CompositeReadOnlyKeyValueStoreTest { } private KeyValueStore<String, String> newStoreInstance() { - return StateStoreTestUtils.newKeyValueStore(storeName, String.class, String.class); + return StateStoreTestUtils.newKeyValueStore(storeName, "app-id", String.class, String.class); } @Test http://git-wip-us.apache.org/repos/asf/kafka/blob/65c8f680/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIteratorTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIteratorTest.java index e7c2eb3..3cc217d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIteratorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIteratorTest.java @@ -46,7 +46,7 @@ public class MergedSortedCacheSessionStoreIteratorTest { private final SessionWindow cacheWindow = new SessionWindow(10, 20); private final Iterator<KeyValue<Bytes, LRUCacheEntry>> cacheKvs = Collections.singleton(KeyValue.pair( SessionKeySerde.toBinary( - new Windowed<>(cacheKey, cacheWindow), Serdes.String().serializer()), new LRUCacheEntry(cacheKey.getBytes()))) + new Windowed<>(cacheKey, cacheWindow), Serdes.String().serializer(), "topic"), new LRUCacheEntry(cacheKey.getBytes()))) .iterator(); @Test http://git-wip-us.apache.org/repos/asf/kafka/blob/65c8f680/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java index 3763290..e9c8f89 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java @@ -155,7 +155,7 @@ public class RocksDBSegmentedBytesStoreTest { } private Bytes serializeKey(final Windowed<String> key) { - return SessionKeySerde.toBinary(key, Serdes.String().serializer()); + return SessionKeySerde.toBinary(key, Serdes.String().serializer(), "topic"); } private List<KeyValue<Windowed<String>, Long>> toList(final KeyValueIterator<Bytes, byte[]> iterator) { @@ -163,7 +163,7 @@ public class RocksDBSegmentedBytesStoreTest { while (iterator.hasNext()) { final KeyValue<Bytes, byte[]> next = iterator.next(); final KeyValue<Windowed<String>, Long> deserialized - = KeyValue.pair(SessionKeySerde.from(next.key.get(), Serdes.String().deserializer()), Serdes.Long().deserializer().deserialize("", next.value)); + = KeyValue.pair(SessionKeySerde.from(next.key.get(), Serdes.String().deserializer(), "topic"), Serdes.Long().deserializer().deserialize("", next.value)); results.add(deserialized); } return results; http://git-wip-us.apache.org/repos/asf/kafka/blob/65c8f680/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java index 5fc9e1f..bb95827 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StateSerdes; import org.apache.kafka.test.MockProcessorContext; @@ -28,7 +29,7 @@ import java.util.Collections; @SuppressWarnings("unchecked") public class StateStoreTestUtils { - public static <K, V> KeyValueStore<K, V> newKeyValueStore(String name, Class<K> keyType, Class<V> valueType) { + public static <K, V> KeyValueStore<K, V> newKeyValueStore(String name, String applicationId, Class<K> keyType, Class<V> valueType) { final InMemoryKeyValueStoreSupplier<K, V> supplier = new InMemoryKeyValueStoreSupplier<>(name, null, null, @@ -37,8 +38,14 @@ public class StateStoreTestUtils { Collections.<String, String>emptyMap()); final StateStore stateStore = supplier.get(); - stateStore.init(new MockProcessorContext(StateSerdes.withBuiltinTypes(name, keyType, valueType), - new NoOpRecordCollector()), stateStore); + stateStore.init( + new MockProcessorContext( + StateSerdes.withBuiltinTypes( + ProcessorStateManager.storeChangelogTopic(applicationId, name), + keyType, + valueType), + new NoOpRecordCollector()), + stateStore); return (KeyValueStore<K, V>) stateStore; } http://git-wip-us.apache.org/repos/asf/kafka/blob/65c8f680/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java index 708e153..85270ce 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java @@ -42,9 +42,9 @@ public class WrappingStoreProviderTest { final StateStoreProviderStub stubProviderTwo = new StateStoreProviderStub(false); - stubProviderOne.addStore("kv", StateStoreTestUtils.newKeyValueStore("kv", String.class, String.class)); + stubProviderOne.addStore("kv", StateStoreTestUtils.newKeyValueStore("kv", "app-id", String.class, String.class)); stubProviderOne.addStore("window", new NoOpWindowStore()); - stubProviderTwo.addStore("kv", StateStoreTestUtils.newKeyValueStore("kv", String.class, String.class)); + stubProviderTwo.addStore("kv", StateStoreTestUtils.newKeyValueStore("kv", "app-id", String.class, String.class)); stubProviderTwo.addStore("window", new NoOpWindowStore()); wrappingStoreProvider = new WrappingStoreProvider(
