This is an automated email from the ASF dual-hosted git repository. bbejeck pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 04e2061 KAFKA-3522: Add TimestampedWindowStore builder/runtime classes (#6173) 04e2061 is described below commit 04e206154ac614b7d4d34a7a1b6ba2c882f607b9 Author: Matthias J. Sax <mj...@apache.org> AuthorDate: Fri Mar 8 09:30:00 2019 -0800 KAFKA-3522: Add TimestampedWindowStore builder/runtime classes (#6173) Add TimestampedWindowStore builder/runtime classes Reviewers: Guozhang Wang <wangg...@gmail.com>, Matthias J. Sax <mj...@apache.org>, John Roesler <j...@confluent.io>, Bill Bejeck <bbej...@gmail.com> --- .../internals/InternalTopologyBuilder.java | 7 +- .../processor/internals/ProcessorContextImpl.java | 31 +++++- .../state/internals/CachingWindowStore.java | 47 +++++---- ...ChangeLoggingTimestampedKeyValueBytesStore.java | 1 + ... ChangeLoggingTimestampedWindowBytesStore.java} | 12 ++- .../internals/ChangeLoggingWindowBytesStore.java | 48 ++++++--- .../internals/MeteredTimestampedWindowStore.java | 58 +++++++++++ .../state/internals/MeteredWindowStore.java | 20 ++-- .../internals/TimestampedWindowStoreBuilder.java | 74 ++++++++++++++ .../internals/ProcessorContextImplTest.java | 112 +++++++++++++++++---- ...ngeLoggingTimestampedWindowBytesStoreTest.java} | 61 +++++++---- .../ChangeLoggingWindowBytesStoreTest.java | 31 +++--- .../internals/MeteredTimestampWindowStoreTest.java | 92 +++++++++++++++++ 13 files changed, 486 insertions(+), 108 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java index 0648fec..334adce 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java @@ -30,6 +30,7 @@ import org.apache.kafka.streams.processor.TopicNameExtractor; import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.internals.SessionStoreBuilder; import org.apache.kafka.streams.state.internals.WindowStoreBuilder; +import org.apache.kafka.streams.state.internals.TimestampedWindowStoreBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -140,6 +141,8 @@ public class InternalTopologyBuilder { long retentionPeriod() { if (builder instanceof WindowStoreBuilder) { return ((WindowStoreBuilder) builder).retentionPeriod(); + } else if (builder instanceof TimestampedWindowStoreBuilder) { + return ((TimestampedWindowStoreBuilder) builder).retentionPeriod(); } else if (builder instanceof SessionStoreBuilder) { return ((SessionStoreBuilder) builder).retentionPeriod(); } else { @@ -160,7 +163,9 @@ public class InternalTopologyBuilder { } private boolean isWindowStore() { - return builder instanceof WindowStoreBuilder || builder instanceof SessionStoreBuilder; + return builder instanceof WindowStoreBuilder + || builder instanceof TimestampedWindowStoreBuilder + || builder instanceof SessionStoreBuilder; } // Apparently Java strips the generics from this method because we're using the raw type for builder, diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java index 36a3750..764d50c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java @@ -32,6 +32,8 @@ import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.SessionStore; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.state.WindowStoreIterator; import org.apache.kafka.streams.state.internals.ThreadCache; @@ -84,6 +86,8 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re if (global != null) { if (global instanceof KeyValueStore) { return new KeyValueStoreReadOnlyDecorator((KeyValueStore) global); + } else if (global instanceof TimestampedWindowStore) { + return new TimestampedWindowStoreReadOnlyDecorator((TimestampedWindowStore) global); } else if (global instanceof WindowStore) { return new WindowStoreReadOnlyDecorator((WindowStore) global); } else if (global instanceof SessionStore) { @@ -106,6 +110,8 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re final StateStore store = stateManager.getStore(name); if (store instanceof KeyValueStore) { return new KeyValueStoreReadWriteDecorator((KeyValueStore) store); + } else if (store instanceof TimestampedWindowStore) { + return new TimestampedWindowStoreReadWriteDecorator((TimestampedWindowStore) store); } else if (store instanceof WindowStore) { return new WindowStoreReadWriteDecorator((WindowStore) store); } else if (store instanceof SessionStore) { @@ -339,6 +345,15 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re } } + private static class TimestampedWindowStoreReadOnlyDecorator<K, V> + extends WindowStoreReadOnlyDecorator<K, ValueAndTimestamp<V>> + implements TimestampedWindowStore<K, V> { + + private TimestampedWindowStoreReadOnlyDecorator(final TimestampedWindowStore<K, V> inner) { + super(inner); + } + } + private static class SessionStoreReadOnlyDecorator<K, AGG> extends StateStoreReadOnlyDecorator<SessionStore<K, AGG>, K, AGG> implements SessionStore<K, AGG> { @@ -520,6 +535,15 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re } } + private static class TimestampedWindowStoreReadWriteDecorator<K, V> + extends WindowStoreReadWriteDecorator<K, ValueAndTimestamp<V>> + implements TimestampedWindowStore<K, V> { + + TimestampedWindowStoreReadWriteDecorator(final TimestampedWindowStore<K, V> inner) { + super(inner); + } + } + static class SessionStoreReadWriteDecorator<K, AGG> extends StateStoreReadWriteDecorator<SessionStore<K, AGG>, K, AGG> implements SessionStore<K, AGG> { @@ -549,12 +573,15 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re } @Override - public void put(final Windowed<K> sessionKey, final AGG aggregate) { + public void put(final Windowed<K> sessionKey, + final AGG aggregate) { wrapped().put(sessionKey, aggregate); } @Override - public AGG fetchSession(final K key, final long startTime, final long endTime) { + public AGG fetchSession(final K key, + final long startTime, + final long endTime) { return wrapped().fetchSession(key, startTime, endTime); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java index 50a2c7c..0a869da 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java @@ -64,9 +64,10 @@ class CachingWindowStore this.context = context; final String topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()); - bytesSerdes = new StateSerdes<>(topic, - Serdes.Bytes(), - Serdes.ByteArray()); + bytesSerdes = new StateSerdes<>( + topic, + Serdes.Bytes(), + Serdes.ByteArray()); name = context.taskId() + "-" + name(); cache = this.context.getCache(); @@ -121,12 +122,15 @@ class CachingWindowStore } @Override - public synchronized void put(final Bytes key, final byte[] value) { + public synchronized void put(final Bytes key, + final byte[] value) { put(key, value, context.timestamp()); } @Override - public synchronized void put(final Bytes key, final byte[] value, final long windowStartTimestamp) { + public synchronized void put(final Bytes key, + final byte[] value, + final long windowStartTimestamp) { // since this function may not access the underlying inner store, we need to validate // if store is open outside as well. validateStoreOpen(); @@ -145,7 +149,8 @@ class CachingWindowStore } @Override - public byte[] fetch(final Bytes key, final long timestamp) { + public byte[] fetch(final Bytes key, + final long timestamp) { validateStoreOpen(); final Bytes bytesKey = WindowKeySchema.toStoreKeyBinary(key, timestamp, 0); final Bytes cacheKey = cacheFunction.cacheKey(bytesKey); @@ -162,7 +167,9 @@ class CachingWindowStore @SuppressWarnings("deprecation") @Override - public synchronized WindowStoreIterator<byte[]> fetch(final Bytes key, final long timeFrom, final long timeTo) { + public synchronized WindowStoreIterator<byte[]> fetch(final Bytes key, + final long timeFrom, + final long timeTo) { // since this function may not access the underlying inner store, we need to validate // if store is open outside as well. validateStoreOpen(); @@ -175,10 +182,7 @@ class CachingWindowStore final Bytes cacheKeyTo = cacheFunction.cacheKey(keySchema.upperRangeFixedSize(key, timeTo)); final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(name, cacheKeyFrom, cacheKeyTo); - final HasNextCondition hasNextCondition = keySchema.hasNextCondition(key, - key, - timeFrom, - timeTo); + final HasNextCondition hasNextCondition = keySchema.hasNextCondition(key, key, timeFrom, timeTo); final PeekingKeyValueIterator<Bytes, LRUCacheEntry> filteredCacheIterator = new FilteredCacheIterator( cacheIterator, hasNextCondition, cacheFunction ); @@ -188,12 +192,16 @@ class CachingWindowStore @SuppressWarnings("deprecation") @Override - public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes from, final Bytes to, final long timeFrom, final long timeTo) { + public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes from, + final Bytes to, + final long timeFrom, + final long timeTo) { // since this function may not access the underlying inner store, we need to validate // if store is open outside as well. validateStoreOpen(); - final KeyValueIterator<Windowed<Bytes>, byte[]> underlyingIterator = wrapped().fetch(from, to, timeFrom, timeTo); + final KeyValueIterator<Windowed<Bytes>, byte[]> underlyingIterator = + wrapped().fetch(from, to, timeFrom, timeTo); if (cache == null) { return underlyingIterator; } @@ -201,10 +209,7 @@ class CachingWindowStore final Bytes cacheKeyTo = cacheFunction.cacheKey(keySchema.upperRange(to, timeTo)); final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(name, cacheKeyFrom, cacheKeyTo); - final HasNextCondition hasNextCondition = keySchema.hasNextCondition(from, - to, - timeFrom, - timeTo); + final HasNextCondition hasNextCondition = keySchema.hasNextCondition(from, to, timeFrom, timeTo); final PeekingKeyValueIterator<Bytes, LRUCacheEntry> filteredCacheIterator = new FilteredCacheIterator(cacheIterator, hasNextCondition, cacheFunction); return new MergedSortedCacheWindowStoreKeyValueIterator( @@ -218,16 +223,16 @@ class CachingWindowStore @SuppressWarnings("deprecation") @Override - public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final long timeFrom, final long timeTo) { + public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final long timeFrom, + final long timeTo) { validateStoreOpen(); final KeyValueIterator<Windowed<Bytes>, byte[]> underlyingIterator = wrapped().fetchAll(timeFrom, timeTo); final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.all(name); final HasNextCondition hasNextCondition = keySchema.hasNextCondition(null, null, timeFrom, timeTo); - final PeekingKeyValueIterator<Bytes, LRUCacheEntry> filteredCacheIterator = new FilteredCacheIterator(cacheIterator, - hasNextCondition, - cacheFunction); + final PeekingKeyValueIterator<Bytes, LRUCacheEntry> filteredCacheIterator = + new FilteredCacheIterator(cacheIterator, hasNextCondition, cacheFunction); return new MergedSortedCacheWindowStoreKeyValueIterator( filteredCacheIterator, underlyingIterator, diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStore.java index 02568b6..02e4c6a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStore.java @@ -23,6 +23,7 @@ import static org.apache.kafka.streams.state.internals.ValueAndTimestampDeserial import static org.apache.kafka.streams.state.internals.ValueAndTimestampDeserializer.timestamp; public class ChangeLoggingTimestampedKeyValueBytesStore extends ChangeLoggingKeyValueBytesStore { + ChangeLoggingTimestampedKeyValueBytesStore(final KeyValueStore<Bytes, byte[]> inner) { super(inner); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStore.java similarity index 79% copy from streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStore.java copy to streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStore.java index 02568b6..94362d4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStore.java @@ -17,14 +17,16 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.utils.Bytes; -import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.WindowStore; import static org.apache.kafka.streams.state.internals.ValueAndTimestampDeserializer.rawValue; import static org.apache.kafka.streams.state.internals.ValueAndTimestampDeserializer.timestamp; -public class ChangeLoggingTimestampedKeyValueBytesStore extends ChangeLoggingKeyValueBytesStore { - ChangeLoggingTimestampedKeyValueBytesStore(final KeyValueStore<Bytes, byte[]> inner) { - super(inner); +class ChangeLoggingTimestampedWindowBytesStore extends ChangeLoggingWindowBytesStore { + + ChangeLoggingTimestampedWindowBytesStore(final WindowStore<Bytes, byte[]> bytesStore, + final boolean retainDuplicates) { + super(bytesStore, retainDuplicates); } @Override @@ -36,4 +38,4 @@ public class ChangeLoggingTimestampedKeyValueBytesStore extends ChangeLoggingKey changeLogger.logChange(key, null); } } -} \ No newline at end of file +} diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java index 7f7612e..ef5a4c7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java @@ -36,10 +36,11 @@ class ChangeLoggingWindowBytesStore implements WindowStore<Bytes, byte[]> { private final boolean retainDuplicates; - private StoreChangeLogger<Bytes, byte[]> changeLogger; private ProcessorContext context; private int seqnum = 0; + StoreChangeLogger<Bytes, byte[]> changeLogger; + ChangeLoggingWindowBytesStore(final WindowStore<Bytes, byte[]> bytesStore, final boolean retainDuplicates) { super(bytesStore); @@ -47,19 +48,37 @@ class ChangeLoggingWindowBytesStore } @Override - public byte[] fetch(final Bytes key, final long timestamp) { + public void init(final ProcessorContext context, + final StateStore root) { + this.context = context; + super.init(context, root); + final String topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()); + changeLogger = new StoreChangeLogger<>( + name(), + context, + new StateSerdes<>(topic, Serdes.Bytes(), Serdes.ByteArray())); + } + + @Override + public byte[] fetch(final Bytes key, + final long timestamp) { return wrapped().fetch(key, timestamp); } @SuppressWarnings("deprecation") @Override - public WindowStoreIterator<byte[]> fetch(final Bytes key, final long from, final long to) { + public WindowStoreIterator<byte[]> fetch(final Bytes key, + final long from, + final long to) { return wrapped().fetch(key, from, to); } @SuppressWarnings("deprecation") @Override - public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes keyFrom, final Bytes keyTo, final long from, final long to) { + public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes keyFrom, + final Bytes keyTo, + final long from, + final long to) { return wrapped().fetch(keyFrom, keyTo, from, to); } @@ -70,7 +89,8 @@ class ChangeLoggingWindowBytesStore @SuppressWarnings("deprecation") @Override - public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final long timeFrom, final long timeTo) { + public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final long timeFrom, + final long timeTo) { return wrapped().fetchAll(timeFrom, timeTo); } @@ -84,20 +104,16 @@ class ChangeLoggingWindowBytesStore } @Override - public void put(final Bytes key, final byte[] value, final long windowStartTimestamp) { + public void put(final Bytes key, + final byte[] value, + final long windowStartTimestamp) { wrapped().put(key, value, windowStartTimestamp); - changeLogger.logChange(WindowKeySchema.toStoreKeyBinary(key, windowStartTimestamp, maybeUpdateSeqnumForDups()), value); + log(WindowKeySchema.toStoreKeyBinary(key, windowStartTimestamp, maybeUpdateSeqnumForDups()), value); } - @Override - public void init(final ProcessorContext context, final StateStore root) { - this.context = context; - super.init(context, root); - final String topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()); - changeLogger = new StoreChangeLogger<>( - name(), - context, - new StateSerdes<>(topic, Serdes.Bytes(), Serdes.ByteArray())); + void log(final Bytes key, + final byte[] value) { + changeLogger.logChange(key, value); } private int maybeUpdateSeqnumForDups() { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java new file mode 100644 index 0000000..1c10491 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.ProcessorStateManager; +import org.apache.kafka.streams.state.StateSerdes; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.apache.kafka.streams.state.WindowStore; + +/** + * A Metered {@link MeteredTimestampedWindowStore} wrapper that is used for recording operation metrics, and hence its + * inner WindowStore implementation do not need to provide its own metrics collecting functionality. + * The inner {@link WindowStore} of this class is of type <Bytes,byte[]>, hence we use {@link Serde}s + * to convert from <K,ValueAndTimestamp<V>> to <Bytes,byte[]> + * @param <K> + * @param <V> + */ +class MeteredTimestampedWindowStore<K, V> + extends MeteredWindowStore<K, ValueAndTimestamp<V>> + implements TimestampedWindowStore<K, V> { + + MeteredTimestampedWindowStore(final WindowStore<Bytes, byte[]> inner, + final long windowSizeMs, + final String metricScope, + final Time time, + final Serde<K> keySerde, + final Serde<ValueAndTimestamp<V>> valueSerde) { + super(inner, windowSizeMs, metricScope, time, keySerde, valueSerde); + } + + @SuppressWarnings("unchecked") + @Override + void initStoreSerde(final ProcessorContext context) { + serdes = new StateSerdes<>( + ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()), + keySerde == null ? (Serde<K>) context.keySerde() : keySerde, + valueSerde == null ? new ValueAndTimestampSerde<>((Serde<V>) context.keySerde()) : valueSerde); + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java index c040e67..681b210 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java @@ -43,13 +43,13 @@ public class MeteredWindowStore<K, V> private final long windowSizeMs; private final String metricScope; private final Time time; - private final Serde<K> keySerde; - private final Serde<V> valueSerde; + final Serde<K> keySerde; + final Serde<V> valueSerde; + StateSerdes<K, V> serdes; private StreamsMetricsImpl metrics; private Sensor putTime; private Sensor fetchTime; private Sensor flushTime; - private StateSerdes<K, V> serdes; private ProcessorContext context; private String taskName; @@ -67,15 +67,11 @@ public class MeteredWindowStore<K, V> this.valueSerde = valueSerde; } - @SuppressWarnings("unchecked") @Override public void init(final ProcessorContext context, final StateStore root) { this.context = context; - serdes = new StateSerdes<>( - ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()), - keySerde == null ? (Serde<K>) context.keySerde() : keySerde, - valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde); + initStoreSerde(context); metrics = (StreamsMetricsImpl) context.metrics(); taskName = context.taskId().toString(); @@ -102,6 +98,14 @@ public class MeteredWindowStore<K, V> } @SuppressWarnings("unchecked") + void initStoreSerde(final ProcessorContext context) { + serdes = new StateSerdes<>( + ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()), + keySerde == null ? (Serde<K>) context.keySerde() : keySerde, + valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde); + } + + @SuppressWarnings("unchecked") @Override public boolean setFlushListener(final CacheFlushListener<Windowed<K>, V> listener, final boolean sendOldValues) { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java new file mode 100644 index 0000000..dcb0d44 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.apache.kafka.streams.state.WindowBytesStoreSupplier; +import org.apache.kafka.streams.state.WindowStore; + +import java.util.Objects; + +public class TimestampedWindowStoreBuilder<K, V> + extends AbstractStoreBuilder<K, ValueAndTimestamp<V>, TimestampedWindowStore<K, V>> { + + private final WindowBytesStoreSupplier storeSupplier; + + public TimestampedWindowStoreBuilder(final WindowBytesStoreSupplier storeSupplier, + final Serde<K> keySerde, + final Serde<V> valueSerde, + final Time time) { + super(storeSupplier.name(), keySerde, new ValueAndTimestampSerde<>(valueSerde), time); + Objects.requireNonNull(storeSupplier, "bytesStoreSupplier can't be null"); + this.storeSupplier = storeSupplier; + } + + @Override + public TimestampedWindowStore<K, V> build() { + return new MeteredTimestampedWindowStore<>( + maybeWrapCaching(maybeWrapLogging(storeSupplier.get())), + storeSupplier.windowSize(), + storeSupplier.metricsScope(), + time, + keySerde, + valueSerde); + } + + private WindowStore<Bytes, byte[]> maybeWrapCaching(final WindowStore<Bytes, byte[]> inner) { + if (!enableCaching) { + return inner; + } + return new CachingWindowStore( + inner, + storeSupplier.windowSize(), + storeSupplier.segmentIntervalMs()); + } + + private WindowStore<Bytes, byte[]> maybeWrapLogging(final WindowStore<Bytes, byte[]> inner) { + if (!enableLogging) { + return inner; + } + return new ChangeLoggingTimestampedWindowBytesStore(inner, storeSupplier.retainDuplicates()); + } + + public long retentionPeriod() { + return storeSupplier.retentionPeriod(); + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java index b29b04c..9b36ec7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java @@ -27,6 +27,8 @@ import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.SessionStore; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.state.WindowStoreIterator; import org.apache.kafka.streams.state.internals.ThreadCache; @@ -55,22 +57,24 @@ public class ProcessorContextImplTest { private ProcessorContextImpl context; private static final String KEY = "key"; - private static final long VAL = 42L; + private static final long VALUE = 42L; + private static final ValueAndTimestamp<Long> VALUE_AND_TIMESTAMP = ValueAndTimestamp.make(42L, 21L); private static final String STORE_NAME = "underlying-store"; private boolean flushExecuted; private boolean putExecuted; + private boolean putWithTimestampExecuted; private boolean putIfAbsentExecuted; private boolean putAllExecuted; private boolean deleteExecuted; private boolean removeExecuted; - private boolean put3argExecuted; private KeyValueIterator<String, Long> rangeIter; private KeyValueIterator<String, Long> allIter; - private List<KeyValueIterator<Windowed<String>, Long>> iters = new ArrayList<>(7); - private WindowStoreIterator<Long> windowStoreIter; + private final List<KeyValueIterator<Windowed<String>, Long>> iters = new ArrayList<>(7); + private final List<KeyValueIterator<Windowed<String>, ValueAndTimestamp<Long>>> timestampedIters = new ArrayList<>(7); + private WindowStoreIterator windowStoreIter; @Before public void setup() { @@ -80,7 +84,6 @@ public class ProcessorContextImplTest { putAllExecuted = false; deleteExecuted = false; removeExecuted = false; - put3argExecuted = false; rangeIter = mock(KeyValueIterator.class); allIter = mock(KeyValueIterator.class); @@ -88,6 +91,7 @@ public class ProcessorContextImplTest { for (int i = 0; i < 7; i++) { iters.add(i, mock(KeyValueIterator.class)); + timestampedIters.add(i, mock(KeyValueIterator.class)); } final StreamsConfig streamsConfig = mock(StreamsConfig.class); @@ -100,11 +104,13 @@ public class ProcessorContextImplTest { expect(stateManager.getGlobalStore("GlobalKeyValueStore")).andReturn(keyValueStoreMock()); expect(stateManager.getGlobalStore("GlobalWindowStore")).andReturn(windowStoreMock()); + expect(stateManager.getGlobalStore("GlobalTimestampedWindowStore")).andReturn(timestampedWindowStoreMock()); expect(stateManager.getGlobalStore("GlobalSessionStore")).andReturn(sessionStoreMock()); expect(stateManager.getGlobalStore(anyString())).andReturn(null); expect(stateManager.getStore("LocalKeyValueStore")).andReturn(keyValueStoreMock()); expect(stateManager.getStore("LocalWindowStore")).andReturn(windowStoreMock()); + expect(stateManager.getStore("LocalTimestampedWindowStore")).andReturn(timestampedWindowStoreMock()); expect(stateManager.getStore("LocalSessionStore")).andReturn(sessionStoreMock()); replay(stateManager); @@ -120,7 +126,11 @@ public class ProcessorContextImplTest { ); context.setCurrentNode(new ProcessorNode<String, Long>("fake", null, - new HashSet<>(asList("LocalKeyValueStore", "LocalWindowStore", "LocalSessionStore")))); + new HashSet<>(asList( + "LocalKeyValueStore", + "LocalWindowStore", + "LocalTimestampedWindowStore", + "LocalSessionStore")))); } @Test @@ -134,10 +144,10 @@ public class ProcessorContextImplTest { checkThrowsUnsupportedOperation(() -> store.putAll(Collections.emptyList()), "putAll()"); checkThrowsUnsupportedOperation(() -> store.delete("1"), "delete()"); - assertEquals((Long) VAL, store.get(KEY)); + assertEquals((Long) VALUE, store.get(KEY)); assertEquals(rangeIter, store.range("one", "two")); assertEquals(allIter, store.all()); - assertEquals(VAL, store.approximateNumEntries()); + assertEquals(VALUE, store.approximateNumEntries()); }); } @@ -153,12 +163,29 @@ public class ProcessorContextImplTest { assertEquals(iters.get(0), store.fetchAll(0L, 0L)); assertEquals(windowStoreIter, store.fetch(KEY, 0L, 1L)); assertEquals(iters.get(1), store.fetch(KEY, KEY, 0L, 1L)); - assertEquals((Long) VAL, store.fetch(KEY, 1L)); + assertEquals((Long) VALUE, store.fetch(KEY, 1L)); assertEquals(iters.get(2), store.all()); }); } @Test + public void globalTimestampedWindowStoreShouldBeReadOnly() { + doTest("GlobalTimestampedWindowStore", (Consumer<TimestampedWindowStore<String, Long>>) store -> { + verifyStoreCannotBeInitializedOrClosed(store); + + checkThrowsUnsupportedOperation(store::flush, "flush()"); + checkThrowsUnsupportedOperation(() -> store.put("1", ValueAndTimestamp.make(1L, 1L), 1L), "put() [with timestamp]"); + checkThrowsUnsupportedOperation(() -> store.put("1", ValueAndTimestamp.make(1L, 1L)), "put() [no timestamp]"); + + assertEquals(timestampedIters.get(0), store.fetchAll(0L, 0L)); + assertEquals(windowStoreIter, store.fetch(KEY, 0L, 1L)); + assertEquals(timestampedIters.get(1), store.fetch(KEY, KEY, 0L, 1L)); + assertEquals(VALUE_AND_TIMESTAMP, store.fetch(KEY, 1L)); + assertEquals(timestampedIters.get(2), store.all()); + }); + } + + @Test public void globalSessionStoreShouldBeReadOnly() { doTest("GlobalSessionStore", (Consumer<SessionStore<String, Long>>) store -> { verifyStoreCannotBeInitializedOrClosed(store); @@ -194,10 +221,10 @@ public class ProcessorContextImplTest { store.delete("1"); assertTrue(deleteExecuted); - assertEquals((Long) VAL, store.get(KEY)); + assertEquals((Long) VALUE, store.get(KEY)); assertEquals(rangeIter, store.range("one", "two")); assertEquals(allIter, store.all()); - assertEquals(VAL, store.approximateNumEntries()); + assertEquals(VALUE, store.approximateNumEntries()); }); } @@ -212,18 +239,37 @@ public class ProcessorContextImplTest { store.put("1", 1L); assertTrue(putExecuted); - store.put("1", 1L, 1L); - assertTrue(put3argExecuted); - assertEquals(iters.get(0), store.fetchAll(0L, 0L)); assertEquals(windowStoreIter, store.fetch(KEY, 0L, 1L)); assertEquals(iters.get(1), store.fetch(KEY, KEY, 0L, 1L)); - assertEquals((Long) VAL, store.fetch(KEY, 1L)); + assertEquals((Long) VALUE, store.fetch(KEY, 1L)); assertEquals(iters.get(2), store.all()); }); } @Test + public void localTimestampedWindowStoreShouldNotAllowInitOrClose() { + doTest("LocalTimestampedWindowStore", (Consumer<TimestampedWindowStore<String, Long>>) store -> { + verifyStoreCannotBeInitializedOrClosed(store); + + store.flush(); + assertTrue(flushExecuted); + + store.put("1", ValueAndTimestamp.make(1L, 1L)); + assertTrue(putExecuted); + + store.put("1", ValueAndTimestamp.make(1L, 1L), 1L); + assertTrue(putWithTimestampExecuted); + + assertEquals(timestampedIters.get(0), store.fetchAll(0L, 0L)); + assertEquals(windowStoreIter, store.fetch(KEY, 0L, 1L)); + assertEquals(timestampedIters.get(1), store.fetch(KEY, KEY, 0L, 1L)); + assertEquals(VALUE_AND_TIMESTAMP, store.fetch(KEY, 1L)); + assertEquals(timestampedIters.get(2), store.all()); + }); + } + + @Test public void localSessionStoreShouldNotAllowInitOrClose() { doTest("LocalSessionStore", (Consumer<SessionStore<String, Long>>) store -> { verifyStoreCannotBeInitializedOrClosed(store); @@ -250,8 +296,8 @@ public class ProcessorContextImplTest { initStateStoreMock(keyValueStoreMock); - expect(keyValueStoreMock.get(KEY)).andReturn(VAL); - expect(keyValueStoreMock.approximateNumEntries()).andReturn(VAL); + expect(keyValueStoreMock.get(KEY)).andReturn(VALUE); + expect(keyValueStoreMock.approximateNumEntries()).andReturn(VALUE); expect(keyValueStoreMock.range("one", "two")).andReturn(rangeIter); expect(keyValueStoreMock.all()).andReturn(allIter); @@ -286,6 +332,7 @@ public class ProcessorContextImplTest { return keyValueStoreMock; } + @SuppressWarnings("unchecked") private WindowStore<String, Long> windowStoreMock() { final WindowStore<String, Long> windowStore = mock(WindowStore.class); @@ -294,7 +341,7 @@ public class ProcessorContextImplTest { expect(windowStore.fetchAll(anyLong(), anyLong())).andReturn(iters.get(0)); expect(windowStore.fetch(anyString(), anyString(), anyLong(), anyLong())).andReturn(iters.get(1)); expect(windowStore.fetch(anyString(), anyLong(), anyLong())).andReturn(windowStoreIter); - expect(windowStore.fetch(anyString(), anyLong())).andReturn(VAL); + expect(windowStore.fetch(anyString(), anyLong())).andReturn(VALUE); expect(windowStore.all()).andReturn(iters.get(2)); windowStore.put(anyString(), anyLong()); @@ -303,9 +350,32 @@ public class ProcessorContextImplTest { return null; }); - windowStore.put(anyString(), anyLong(), anyLong()); + replay(windowStore); + + return windowStore; + } + + @SuppressWarnings("unchecked") + private TimestampedWindowStore<String, Long> timestampedWindowStoreMock() { + final TimestampedWindowStore<String, Long> windowStore = mock(TimestampedWindowStore.class); + + initStateStoreMock(windowStore); + + expect(windowStore.fetchAll(anyLong(), anyLong())).andReturn(timestampedIters.get(0)); + expect(windowStore.fetch(anyString(), anyString(), anyLong(), anyLong())).andReturn(timestampedIters.get(1)); + expect(windowStore.fetch(anyString(), anyLong(), anyLong())).andReturn(windowStoreIter); + expect(windowStore.fetch(anyString(), anyLong())).andReturn(VALUE_AND_TIMESTAMP); + expect(windowStore.all()).andReturn(timestampedIters.get(2)); + + windowStore.put(anyString(), anyObject(ValueAndTimestamp.class)); + expectLastCall().andAnswer(() -> { + putExecuted = true; + return null; + }); + + windowStore.put(anyString(), anyObject(ValueAndTimestamp.class), anyLong()); expectLastCall().andAnswer(() -> { - put3argExecuted = true; + putWithTimestampExecuted = true; return null; }); @@ -360,9 +430,7 @@ public class ProcessorContextImplTest { @SuppressWarnings("unchecked") public void init(final ProcessorContext context) { final T store = (T) context.getStateStore(name); - checker.accept(store); - } @Override diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java similarity index 63% copy from streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java copy to streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java index a36b101..edf210e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java @@ -20,9 +20,9 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Bytes; -import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.ProcessorContextImpl; +import org.apache.kafka.streams.state.ValueAndTimestamp; import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.test.NoOpRecordCollector; import org.easymock.EasyMock; @@ -38,12 +38,13 @@ import java.util.Map; import static java.time.Instant.ofEpochMilli; import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; @RunWith(EasyMockRunner.class) -public class ChangeLoggingWindowBytesStoreTest { +public class ChangeLoggingTimestampedWindowBytesStoreTest { private final TaskId taskId = new TaskId(0, 0); - private final Map<Object, Object> sent = new HashMap<>(); + private final Map<Object, ValueAndTimestamp<Object>> sent = new HashMap<>(); private final NoOpRecordCollector collector = new NoOpRecordCollector() { @Override public <K, V> void send(final String topic, @@ -54,23 +55,24 @@ public class ChangeLoggingWindowBytesStoreTest { final Long timestamp, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) { - sent.put(key, value); + sent.put(key, ValueAndTimestamp.make(value, timestamp)); } }; - private final byte[] value1 = {0}; - private final Bytes bytesKey = Bytes.wrap(value1); + private final byte[] value = {0}; + private final byte[] valueAndTimestamp = {0, 0, 0, 0, 0, 0, 0, 42, 0}; + private final Bytes bytesKey = Bytes.wrap(value); @Mock(type = MockType.NICE) private WindowStore<Bytes, byte[]> inner; @Mock(type = MockType.NICE) private ProcessorContextImpl context; - private ChangeLoggingWindowBytesStore store; + private ChangeLoggingTimestampedWindowBytesStore store; @Before public void setUp() { - store = new ChangeLoggingWindowBytesStore(inner, false); + store = new ChangeLoggingTimestampedWindowBytesStore(inner, false); } private void init() { @@ -85,20 +87,27 @@ public class ChangeLoggingWindowBytesStoreTest { @Test public void shouldLogPuts() { - inner.put(bytesKey, value1, 0); + inner.put(bytesKey, valueAndTimestamp, 0); EasyMock.expectLastCall(); init(); - store.put(bytesKey, value1); + store.put(bytesKey, valueAndTimestamp); - assertArrayEquals(value1, (byte[]) sent.get(WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 0))); + assertArrayEquals( + value, + (byte[]) sent.get(WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 0)).value()); + assertEquals( + 42L, + sent.get(WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 0)).timestamp()); EasyMock.verify(inner); } @Test public void shouldDelegateToUnderlyingStoreWhenFetching() { - EasyMock.expect(inner.fetch(bytesKey, 0, 10)).andReturn(KeyValueIterators.<byte[]>emptyWindowStoreIterator()); + EasyMock + .expect(inner.fetch(bytesKey, 0, 10)) + .andReturn(KeyValueIterators.emptyWindowStoreIterator()); init(); @@ -108,7 +117,9 @@ public class ChangeLoggingWindowBytesStoreTest { @Test public void shouldDelegateToUnderlyingStoreWhenFetchingRange() { - EasyMock.expect(inner.fetch(bytesKey, bytesKey, 0, 1)).andReturn(KeyValueIterators.<Windowed<Bytes>, byte[]>emptyIterator()); + EasyMock + .expect(inner.fetch(bytesKey, bytesKey, 0, 1)) + .andReturn(KeyValueIterators.emptyIterator()); init(); @@ -118,16 +129,26 @@ public class ChangeLoggingWindowBytesStoreTest { @Test public void shouldRetainDuplicatesWhenSet() { - store = new ChangeLoggingWindowBytesStore(inner, true); - inner.put(bytesKey, value1, 0); + store = new ChangeLoggingTimestampedWindowBytesStore(inner, true); + inner.put(bytesKey, valueAndTimestamp, 0); EasyMock.expectLastCall().times(2); init(); - store.put(bytesKey, value1); - store.put(bytesKey, value1); - - assertArrayEquals(value1, (byte[]) sent.get(WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 1))); - assertArrayEquals(value1, (byte[]) sent.get(WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 2))); + store.put(bytesKey, valueAndTimestamp); + store.put(bytesKey, valueAndTimestamp); + + assertArrayEquals( + value, + (byte[]) sent.get(WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 1)).value()); + assertEquals( + 42L, + sent.get(WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 1)).timestamp()); + assertArrayEquals( + value, + (byte[]) sent.get(WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 2)).value()); + assertEquals( + 42L, + sent.get(WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 2)).timestamp()); EasyMock.verify(inner); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java index a36b101..d7ad6d2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java @@ -20,7 +20,6 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Bytes; -import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.ProcessorContextImpl; import org.apache.kafka.streams.state.WindowStore; @@ -58,8 +57,8 @@ public class ChangeLoggingWindowBytesStoreTest { } }; - private final byte[] value1 = {0}; - private final Bytes bytesKey = Bytes.wrap(value1); + private final byte[] value = {0}; + private final Bytes bytesKey = Bytes.wrap(value); @Mock(type = MockType.NICE) private WindowStore<Bytes, byte[]> inner; @@ -85,20 +84,24 @@ public class ChangeLoggingWindowBytesStoreTest { @Test public void shouldLogPuts() { - inner.put(bytesKey, value1, 0); + inner.put(bytesKey, value, 0); EasyMock.expectLastCall(); init(); - store.put(bytesKey, value1); + store.put(bytesKey, value); - assertArrayEquals(value1, (byte[]) sent.get(WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 0))); + assertArrayEquals( + value, + (byte[]) sent.get(WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 0))); EasyMock.verify(inner); } @Test public void shouldDelegateToUnderlyingStoreWhenFetching() { - EasyMock.expect(inner.fetch(bytesKey, 0, 10)).andReturn(KeyValueIterators.<byte[]>emptyWindowStoreIterator()); + EasyMock + .expect(inner.fetch(bytesKey, 0, 10)) + .andReturn(KeyValueIterators.emptyWindowStoreIterator()); init(); @@ -108,7 +111,9 @@ public class ChangeLoggingWindowBytesStoreTest { @Test public void shouldDelegateToUnderlyingStoreWhenFetchingRange() { - EasyMock.expect(inner.fetch(bytesKey, bytesKey, 0, 1)).andReturn(KeyValueIterators.<Windowed<Bytes>, byte[]>emptyIterator()); + EasyMock + .expect(inner.fetch(bytesKey, bytesKey, 0, 1)) + .andReturn(KeyValueIterators.emptyIterator()); init(); @@ -119,15 +124,15 @@ public class ChangeLoggingWindowBytesStoreTest { @Test public void shouldRetainDuplicatesWhenSet() { store = new ChangeLoggingWindowBytesStore(inner, true); - inner.put(bytesKey, value1, 0); + inner.put(bytesKey, value, 0); EasyMock.expectLastCall().times(2); init(); - store.put(bytesKey, value1); - store.put(bytesKey, value1); + store.put(bytesKey, value); + store.put(bytesKey, value); - assertArrayEquals(value1, (byte[]) sent.get(WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 1))); - assertArrayEquals(value1, (byte[]) sent.get(WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 2))); + assertArrayEquals(value, (byte[]) sent.get(WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 1))); + assertArrayEquals(value, (byte[]) sent.get(WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 2))); EasyMock.verify(inner); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampWindowStoreTest.java new file mode 100644 index 0000000..a3522f3 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampWindowStoreTest.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.test.InternalMockProcessorContext; +import org.apache.kafka.test.NoOpRecordCollector; +import org.apache.kafka.test.StreamsTestUtils; +import org.apache.kafka.test.TestUtils; +import org.easymock.EasyMock; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertNull; + +public class MeteredTimestampWindowStoreTest { + private InternalMockProcessorContext context; + @SuppressWarnings("unchecked") + private final WindowStore<Bytes, byte[]> innerStoreMock = EasyMock.createNiceMock(WindowStore.class); + private final MeteredTimestampedWindowStore<String, String> store = new MeteredTimestampedWindowStore<>( + innerStoreMock, + 10L, // any size + "scope", + new MockTime(), + Serdes.String(), + new ValueAndTimestampSerde<>(new SerdeThatDoesntHandleNull()) + ); + private final Metrics metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.DEBUG)); + + { + EasyMock.expect(innerStoreMock.name()).andReturn("mocked-store").anyTimes(); + } + + @Before + public void setUp() { + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test"); + + context = new InternalMockProcessorContext( + TestUtils.tempDirectory(), + Serdes.String(), + Serdes.Long(), + streamsMetrics, + new StreamsConfig(StreamsTestUtils.getStreamsConfig()), + NoOpRecordCollector::new, + new ThreadCache(new LogContext("testCache "), 0, streamsMetrics) + ); + } + + @Test + public void shouldCloseUnderlyingStore() { + innerStoreMock.close(); + EasyMock.expectLastCall(); + EasyMock.replay(innerStoreMock); + + store.init(context, store); + store.close(); + EasyMock.verify(innerStoreMock); + } + + @Test + public void shouldNotExceptionIfFetchReturnsNull() { + EasyMock.expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), 0)).andReturn(null); + EasyMock.replay(innerStoreMock); + + store.init(context, store); + assertNull(store.fetch("a", 0)); + } + +}