[ https://issues.apache.org/jira/browse/KAFKA-6813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16469741#comment-16469741 ]
ASF GitHub Bot commented on KAFKA-6813: --------------------------------------- guozhangwang closed pull request #4976: KAFKA-6813: Remove deprecated APIs in KIP-182, Part II URL: https://github.com/apache/kafka/pull/4976 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java index 3c6539902e8..497bdac6d89 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java @@ -16,8 +16,6 @@ */ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.common.internals.Topic; -import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.kstream.ValueTransformer; @@ -26,12 +24,7 @@ import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier; import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.streams.kstream.ValueMapperWithKey; -import org.apache.kafka.streams.kstream.Window; -import org.apache.kafka.streams.kstream.Windows; import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.streams.state.Stores; -import org.apache.kafka.streams.state.WindowStore; import java.util.HashSet; import java.util.Objects; @@ -81,38 +74,6 @@ public R apply(T2 value2, T1 value1) { }; } - @SuppressWarnings({"unchecked", "deprecation"}) - static <T, K> org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> keyValueStore(final Serde<K> keySerde, - final Serde<T> aggValueSerde, - final String storeName) { - Objects.requireNonNull(storeName, "storeName can't be null"); - Topic.validate(storeName); - return storeFactory(keySerde, aggValueSerde, storeName).build(); - } - - @SuppressWarnings({"unchecked", "deprecation"}) - static <W extends Window, T, K> org.apache.kafka.streams.processor.StateStoreSupplier<WindowStore> windowedStore(final Serde<K> keySerde, - final Serde<T> aggValSerde, - final Windows<W> windows, - final String storeName) { - Objects.requireNonNull(storeName, "storeName can't be null"); - Topic.validate(storeName); - return storeFactory(keySerde, aggValSerde, storeName) - .windowed(windows.size(), windows.maintainMs(), windows.segments, false) - .build(); - } - - @SuppressWarnings("deprecation") - static <T, K> Stores.PersistentKeyValueFactory<K, T> storeFactory(final Serde<K> keySerde, - final Serde<T> aggValueSerde, - final String storeName) { - return Stores.create(storeName) - .withKeys(keySerde) - .withValues(aggValueSerde) - .persistent() - .enableCaching(); - } - static <K, V, VR> ValueMapperWithKey<K, V, VR> withKey(final ValueMapper<V, VR> valueMapper) { Objects.requireNonNull(valueMapper, "valueMapper can't be null"); return new ValueMapperWithKey<K, V, VR>() { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java index daa2915d2c1..27b985b7160 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java @@ -22,13 +22,8 @@ import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore; -import org.apache.kafka.streams.state.internals.InMemoryKeyValueStoreSupplier; -import org.apache.kafka.streams.state.internals.InMemoryLRUCacheStoreSupplier; import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder; import org.apache.kafka.streams.state.internals.MemoryNavigableLRUCache; -import org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreSupplier; -import org.apache.kafka.streams.state.internals.RocksDBSessionStoreSupplier; -import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier; import org.apache.kafka.streams.state.internals.RocksDbKeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.internals.RocksDbSessionBytesStoreSupplier; import org.apache.kafka.streams.state.internals.RocksDbWindowBytesStoreSupplier; @@ -37,9 +32,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.nio.ByteBuffer; -import java.util.HashMap; -import java.util.Map; import java.util.Objects; /** @@ -244,411 +236,5 @@ public static SessionBytesStoreSupplier persistentSessionStore(final String name Objects.requireNonNull(supplier, "supplier cannot be null"); return new SessionStoreBuilder<>(supplier, keySerde, valueSerde, Time.SYSTEM); } - - /** - * Begin to create a new {@link org.apache.kafka.streams.processor.StateStoreSupplier} instance. - * - * @param name the name of the store - * @return the factory that can be used to specify other options or configurations for the store; never null - * @deprecated use {@link #persistentKeyValueStore(String)}, {@link #persistentWindowStore(String, long, int, long, boolean)} - * {@link #persistentSessionStore(String, long)}, {@link #lruMap(String, int)}, or {@link #inMemoryKeyValueStore(String)} - */ - @Deprecated - public static StoreFactory create(final String name) { - return new StoreFactory() { - @Override - public <K> ValueFactory<K> withKeys(final Serde<K> keySerde) { - return new ValueFactory<K>() { - @Override - public <V> KeyValueFactory<K, V> withValues(final Serde<V> valueSerde) { - - return new KeyValueFactory<K, V>() { - - @Override - public InMemoryKeyValueFactory<K, V> inMemory() { - return new InMemoryKeyValueFactory<K, V>() { - private int capacity = Integer.MAX_VALUE; - private final Map<String, String> logConfig = new HashMap<>(); - private boolean logged = true; - - /** - * @param capacity the maximum capacity of the in-memory cache; should be one less than a power of 2 - * @throws IllegalArgumentException if the capacity of the store is zero or negative - */ - @Override - public InMemoryKeyValueFactory<K, V> maxEntries(int capacity) { - if (capacity < 1) throw new IllegalArgumentException("The capacity must be positive"); - this.capacity = capacity; - return this; - } - - @Override - public InMemoryKeyValueFactory<K, V> enableLogging(final Map<String, String> config) { - logged = true; - logConfig.putAll(config); - return this; - } - - @Override - public InMemoryKeyValueFactory<K, V> disableLogging() { - logged = false; - logConfig.clear(); - return this; - } - - @Override - public org.apache.kafka.streams.processor.StateStoreSupplier build() { - log.trace("Defining InMemory Store name={} capacity={} logged={}", name, capacity, logged); - if (capacity < Integer.MAX_VALUE) { - return new InMemoryLRUCacheStoreSupplier<>(name, capacity, keySerde, valueSerde, logged, logConfig); - } - return new InMemoryKeyValueStoreSupplier<>(name, keySerde, valueSerde, logged, logConfig); - } - }; - } - - @Override - public PersistentKeyValueFactory<K, V> persistent() { - return new PersistentKeyValueFactory<K, V>() { - boolean cachingEnabled; - private long windowSize; - private final Map<String, String> logConfig = new HashMap<>(); - private int numSegments = 0; - private long retentionPeriod = 0L; - private boolean retainDuplicates = false; - private boolean sessionWindows; - private boolean logged = true; - - @Override - public PersistentKeyValueFactory<K, V> windowed(final long windowSize, final long retentionPeriod, final int numSegments, final boolean retainDuplicates) { - if (numSegments < RocksDBWindowStoreSupplier.MIN_SEGMENTS) { - throw new IllegalArgumentException("numSegments must be >= " + RocksDBWindowStoreSupplier.MIN_SEGMENTS); - } - this.windowSize = windowSize; - this.numSegments = numSegments; - this.retentionPeriod = retentionPeriod; - this.retainDuplicates = retainDuplicates; - this.sessionWindows = false; - - return this; - } - - @Override - public PersistentKeyValueFactory<K, V> sessionWindowed(final long retentionPeriod) { - this.sessionWindows = true; - this.retentionPeriod = retentionPeriod; - return this; - } - - @Override - public PersistentKeyValueFactory<K, V> enableLogging(final Map<String, String> config) { - logged = true; - logConfig.putAll(config); - return this; - } - - @Override - public PersistentKeyValueFactory<K, V> disableLogging() { - logged = false; - logConfig.clear(); - return this; - } - - @Override - public PersistentKeyValueFactory<K, V> enableCaching() { - cachingEnabled = true; - return this; - } - - @Override - public org.apache.kafka.streams.processor.StateStoreSupplier build() { - log.trace("Defining RocksDb Store name={} numSegments={} logged={}", name, numSegments, logged); - if (sessionWindows) { - return new RocksDBSessionStoreSupplier<>(name, retentionPeriod, keySerde, valueSerde, logged, logConfig, cachingEnabled); - } else if (numSegments > 0) { - return new RocksDBWindowStoreSupplier<>(name, retentionPeriod, numSegments, retainDuplicates, keySerde, valueSerde, windowSize, logged, logConfig, cachingEnabled); - } - return new RocksDBKeyValueStoreSupplier<>(name, keySerde, valueSerde, logged, logConfig, cachingEnabled); - } - - }; - } - - - }; - } - }; - } - }; - } - - - public static abstract class StoreFactory { - /** - * Begin to create a {@link KeyValueStore} by specifying the keys will be {@link String}s. - * - * @return the interface used to specify the type of values; never null - */ - public ValueFactory<String> withStringKeys() { - return withKeys(Serdes.String()); - } - - /** - * Begin to create a {@link KeyValueStore} by specifying the keys will be {@link Integer}s. - * - * @return the interface used to specify the type of values; never null - */ - public ValueFactory<Integer> withIntegerKeys() { - return withKeys(Serdes.Integer()); - } - - /** - * Begin to create a {@link KeyValueStore} by specifying the keys will be {@link Long}s. - * - * @return the interface used to specify the type of values; never null - */ - public ValueFactory<Long> withLongKeys() { - return withKeys(Serdes.Long()); - } - - /** - * Begin to create a {@link KeyValueStore} by specifying the keys will be {@link Double}s. - * - * @return the interface used to specify the type of values; never null - */ - public ValueFactory<Double> withDoubleKeys() { - return withKeys(Serdes.Double()); - } - - /** - * Begin to create a {@link KeyValueStore} by specifying the keys will be {@link ByteBuffer}. - * - * @return the interface used to specify the type of values; never null - */ - public ValueFactory<ByteBuffer> withByteBufferKeys() { - return withKeys(Serdes.ByteBuffer()); - } - - /** - * Begin to create a {@link KeyValueStore} by specifying the keys will be byte arrays. - * - * @return the interface used to specify the type of values; never null - */ - public ValueFactory<byte[]> withByteArrayKeys() { - return withKeys(Serdes.ByteArray()); - } - - /** - * Begin to create a {@link KeyValueStore} by specifying the keys. - * - * @param keyClass the class for the keys, which must be one of the types for which Kafka has built-in serdes - * @return the interface used to specify the type of values; never null - */ - public <K> ValueFactory<K> withKeys(Class<K> keyClass) { - return withKeys(Serdes.serdeFrom(keyClass)); - } - - /** - * Begin to create a {@link KeyValueStore} by specifying the serializer and deserializer for the keys. - * - * @param keySerde the serialization factory for keys; may be null - * @return the interface used to specify the type of values; never null - */ - public abstract <K> ValueFactory<K> withKeys(Serde<K> keySerde); - } - - /** - * The factory for creating off-heap key-value stores. - * - * @param <K> the type of keys - */ - public static abstract class ValueFactory<K> { - /** - * Use {@link String} values. - * - * @return the interface used to specify the remaining key-value store options; never null - */ - public KeyValueFactory<K, String> withStringValues() { - return withValues(Serdes.String()); - } - - /** - * Use {@link Integer} values. - * - * @return the interface used to specify the remaining key-value store options; never null - */ - public KeyValueFactory<K, Integer> withIntegerValues() { - return withValues(Serdes.Integer()); - } - - /** - * Use {@link Long} values. - * - * @return the interface used to specify the remaining key-value store options; never null - */ - public KeyValueFactory<K, Long> withLongValues() { - return withValues(Serdes.Long()); - } - - /** - * Use {@link Double} values. - * - * @return the interface used to specify the remaining key-value store options; never null - */ - public KeyValueFactory<K, Double> withDoubleValues() { - return withValues(Serdes.Double()); - } - - /** - * Use {@link ByteBuffer} for values. - * - * @return the interface used to specify the remaining key-value store options; never null - */ - public KeyValueFactory<K, ByteBuffer> withByteBufferValues() { - return withValues(Serdes.ByteBuffer()); - } - - /** - * Use byte arrays for values. - * - * @return the interface used to specify the remaining key-value store options; never null - */ - public KeyValueFactory<K, byte[]> withByteArrayValues() { - return withValues(Serdes.ByteArray()); - } - - /** - * Use values of the specified type. - * - * @param valueClass the class for the values, which must be one of the types for which Kafka has built-in serdes - * @return the interface used to specify the remaining key-value store options; never null - */ - public <V> KeyValueFactory<K, V> withValues(Class<V> valueClass) { - return withValues(Serdes.serdeFrom(valueClass)); - } - - /** - * Use the specified serializer and deserializer for the values. - * - * @param valueSerde the serialization factory for values; may be null - * @return the interface used to specify the remaining key-value store options; never null - */ - public abstract <V> KeyValueFactory<K, V> withValues(Serde<V> valueSerde); - } - - - public interface KeyValueFactory<K, V> { - /** - * Keep all key-value entries in-memory, although for durability all entries are recorded in a Kafka topic that can be - * read to restore the entries if they are lost. - * - * @return the factory to create in-memory key-value stores; never null - */ - InMemoryKeyValueFactory<K, V> inMemory(); - - /** - * Keep all key-value entries off-heap in a local database, although for durability all entries are recorded in a Kafka - * topic that can be read to restore the entries if they are lost. - * - * @return the factory to create persistent key-value stores; never null - */ - PersistentKeyValueFactory<K, V> persistent(); - } - - /** - * The interface used to create in-memory key-value stores. - * - * @param <K> the type of keys - * @param <V> the type of values - */ - @Deprecated - public interface InMemoryKeyValueFactory<K, V> { - /** - * Limits the in-memory key-value store to hold a maximum number of entries. The default is {@link Integer#MAX_VALUE}, which is - * equivalent to not placing a limit on the number of entries. - * - * @param capacity the maximum capacity of the in-memory cache; should be one less than a power of 2 - * @return this factory - * @throws IllegalArgumentException if the capacity is not positive - */ - InMemoryKeyValueFactory<K, V> maxEntries(int capacity); - - /** - * Indicates that a changelog should be created for the store. The changelog will be created - * with the provided cleanupPolicy and configs. - * - * Note: Any unrecognized configs will be ignored. - * @param config any configs that should be applied to the changelog - * @return the factory to create an in-memory key-value store - */ - InMemoryKeyValueFactory<K, V> enableLogging(final Map<String, String> config); - - /** - * Indicates that a changelog should not be created for the key-value store - * @return the factory to create an in-memory key-value store - */ - InMemoryKeyValueFactory<K, V> disableLogging(); - - - /** - * Return the instance of StateStoreSupplier of new key-value store. - * @return the state store supplier; never null - */ - org.apache.kafka.streams.processor.StateStoreSupplier build(); - } - - /** - * The interface used to create off-heap key-value stores that use a local database. - * - * @param <K> the type of keys - * @param <V> the type of values - */ - @Deprecated - public interface PersistentKeyValueFactory<K, V> { - - /** - * Set the persistent store as a windowed key-value store - * @param windowSize size of the windows - * @param retentionPeriod the maximum period of time in milli-second to keep each window in this store - * @param numSegments the maximum number of segments for rolling the windowed store - * @param retainDuplicates whether or not to retain duplicate data within the window - */ - PersistentKeyValueFactory<K, V> windowed(final long windowSize, long retentionPeriod, int numSegments, boolean retainDuplicates); - - /** - * Set the persistent store as a {@link SessionStore} for use with {@link org.apache.kafka.streams.kstream.SessionWindows} - * @param retentionPeriod period of time in milliseconds to keep each window in this store - */ - PersistentKeyValueFactory<K, V> sessionWindowed(final long retentionPeriod); - - /** - * Indicates that a changelog should be created for the store. The changelog will be created - * with the provided cleanupPolicy and configs. - * - * Note: Any unrecognized configs will be ignored. - * @param config any configs that should be applied to the changelog - * @return the factory to create a persistent key-value store - */ - PersistentKeyValueFactory<K, V> enableLogging(final Map<String, String> config); - - /** - * Indicates that a changelog should not be created for the key-value store - * @return the factory to create a persistent key-value store - */ - PersistentKeyValueFactory<K, V> disableLogging(); - - /** - * Caching should be enabled on the created store. - * @return the factory to create a persistent key-value store - */ - PersistentKeyValueFactory<K, V> enableCaching(); - - /** - * Return the instance of StateStoreSupplier of new key-value store. - * @return the key-value store; never null - */ - org.apache.kafka.streams.processor.StateStoreSupplier build(); - - } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java deleted file mode 100644 index f9554212e01..00000000000 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.state.internals; - -import org.apache.kafka.common.serialization.Serde; -import org.apache.kafka.common.utils.Time; -import org.apache.kafka.streams.state.KeyValueStore; - -import java.util.Map; - -/** - * An in-memory key-value store based on a TreeMap. - * - * Note that the use of array-typed keys is discouraged because they result in incorrect ordering behavior. - * If you intend to work on byte arrays as key, for example, you may want to wrap them with the {@code Bytes} class, - * i.e. use {@code RocksDBStore<Bytes, ...>} rather than {@code RocksDBStore<byte[], ...>}. - * - * @param <K> The key type - * @param <V> The value type - * - * @see org.apache.kafka.streams.state.Stores#create(String) - */ -@Deprecated -public class InMemoryKeyValueStoreSupplier<K, V> extends AbstractStoreSupplier<K, V, KeyValueStore> { - - public InMemoryKeyValueStoreSupplier(String name, Serde<K> keySerde, Serde<V> valueSerde, boolean logged, Map<String, String> logConfig) { - this(name, keySerde, valueSerde, null, logged, logConfig); - } - - public InMemoryKeyValueStoreSupplier(String name, Serde<K> keySerde, Serde<V> valueSerde, Time time, boolean logged, Map<String, String> logConfig) { - super(name, keySerde, valueSerde, time, logged, logConfig); - } - - public KeyValueStore get() { - InMemoryKeyValueStore<K, V> store = new InMemoryKeyValueStore<>(name, keySerde, valueSerde); - - return new MeteredKeyValueStore<>(logged ? store.enableLogging() : store, "in-memory-state", time); - } -} diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java deleted file mode 100644 index 0f897ba8823..00000000000 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.state.internals; - -import org.apache.kafka.common.serialization.Serde; -import org.apache.kafka.common.utils.Time; -import org.apache.kafka.streams.state.KeyValueStore; - -import java.util.Map; - -/** - * An in-memory key-value store that is limited in size and retains a maximum number of most recently used entries. - * - * @param <K> The key type - * @param <V> The value type - * - */ -@Deprecated -public class InMemoryLRUCacheStoreSupplier<K, V> extends AbstractStoreSupplier<K, V, KeyValueStore> { - - private final int capacity; - - public InMemoryLRUCacheStoreSupplier(String name, int capacity, Serde<K> keySerde, Serde<V> valueSerde, boolean logged, Map<String, String> logConfig) { - this(name, capacity, keySerde, valueSerde, null, logged, logConfig); - } - - private InMemoryLRUCacheStoreSupplier(String name, int capacity, Serde<K> keySerde, Serde<V> valueSerde, Time time, boolean logged, Map<String, String> logConfig) { - super(name, keySerde, valueSerde, time, logged, logConfig); - this.capacity = capacity; - } - - public KeyValueStore get() { - MemoryNavigableLRUCache<K, V> cache = new MemoryNavigableLRUCache<>(name, capacity, keySerde, valueSerde); - return new MeteredKeyValueStore<>(logged ? cache.enableLogging() : cache, "in-memory-lru-state", time); - } - -} diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java index b99c907ab44..1957aa4753e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java @@ -37,12 +37,9 @@ * * Note that the use of array-typed keys is discouraged because they result in incorrect ordering behavior. * If you intend to work on byte arrays as key, for example, you may want to wrap them with the {@code Bytes} class, * i.e. use {@code RocksDBStore<Bytes, ...>} rather than {@code RocksDBStore<byte[], ...>}. - * * @param <K> The key type * @param <V> The value type - * - * @see org.apache.kafka.streams.state.Stores#create(String) */ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java index 4b233f069ea..3bc56c2b670 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java @@ -27,7 +27,6 @@ * * @param <K> the type of keys * @param <V> the type of values - * @see org.apache.kafka.streams.state.Stores#create(String) */ @Deprecated public class RocksDBKeyValueStoreSupplier<K, V> extends AbstractStoreSupplier<K, V, KeyValueStore> { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java deleted file mode 100644 index 1552f7dfbfd..00000000000 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.state.internals; - -import org.apache.kafka.common.serialization.Serde; -import org.apache.kafka.common.utils.Time; -import org.apache.kafka.streams.state.SessionStore; - -import java.util.Map; - -/** - * A {@link org.apache.kafka.streams.state.KeyValueStore} that stores all entries in a local RocksDB database. - * - * @param <K> the type of keys - * @param <V> the type of values - * - * @see org.apache.kafka.streams.state.Stores#create(String) - */ -@Deprecated -public class RocksDBSessionStoreSupplier<K, V> extends AbstractStoreSupplier<K, V, SessionStore> implements WindowStoreSupplier<SessionStore> { - - static final int NUM_SEGMENTS = 3; - private final long retentionPeriod; - private final SessionStoreBuilder<K, V> builder; - - public RocksDBSessionStoreSupplier(String name, long retentionPeriod, Serde<K> keySerde, Serde<V> valueSerde, boolean logged, Map<String, String> logConfig, boolean cached) { - super(name, keySerde, valueSerde, Time.SYSTEM, logged, logConfig); - this.retentionPeriod = retentionPeriod; - builder = new SessionStoreBuilder<>(new RocksDbSessionBytesStoreSupplier(name, - retentionPeriod), - keySerde, - valueSerde, - time); - if (cached) { - builder.withCachingEnabled(); - } - // logged by default so we only need to worry about when it is disabled. - if (!logged) { - builder.withLoggingDisabled(); - } - } - - public SessionStore<K, V> get() { - return builder.build(); - - } - - public long retentionPeriod() { - return retentionPeriod; - } -} diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index 281304140d5..d2b8cd2bac6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -62,8 +62,6 @@ * Note that the use of array-typed keys is discouraged because they result in incorrect caching behavior. * If you intend to work on byte arrays as key, for example, you may want to wrap them with the {@code Bytes} class, * i.e. use {@code RocksDBStore<Bytes, ...>} rather than {@code RocksDBStore<byte[], ...>}. - * - * @see org.apache.kafka.streams.state.Stores#create(String) */ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java deleted file mode 100644 index 2a82f798745..00000000000 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.state.internals; - -import org.apache.kafka.common.serialization.Serde; -import org.apache.kafka.common.utils.Time; -import org.apache.kafka.streams.state.WindowStore; - -import java.util.Map; - -/** - * A {@link org.apache.kafka.streams.state.KeyValueStore} that stores all entries in a local RocksDB database. - * - * @param <K> the type of keys - * @param <V> the type of values - * - * @see org.apache.kafka.streams.state.Stores#create(String) - */ -@Deprecated -public class RocksDBWindowStoreSupplier<K, V> extends AbstractStoreSupplier<K, V, WindowStore> implements WindowStoreSupplier<WindowStore> { - public static final int MIN_SEGMENTS = 2; - private final long retentionPeriod; - private WindowStoreBuilder<K, V> builder; - - public RocksDBWindowStoreSupplier(String name, long retentionPeriod, int numSegments, boolean retainDuplicates, Serde<K> keySerde, Serde<V> valueSerde, long windowSize, boolean logged, Map<String, String> logConfig, boolean enableCaching) { - this(name, retentionPeriod, numSegments, retainDuplicates, keySerde, valueSerde, Time.SYSTEM, windowSize, logged, logConfig, enableCaching); - } - - public RocksDBWindowStoreSupplier(String name, long retentionPeriod, int numSegments, boolean retainDuplicates, Serde<K> keySerde, Serde<V> valueSerde, Time time, long windowSize, boolean logged, Map<String, String> logConfig, boolean enableCaching) { - super(name, keySerde, valueSerde, time, logged, logConfig); - if (numSegments < MIN_SEGMENTS) { - throw new IllegalArgumentException("numSegments must be >= " + MIN_SEGMENTS); - } - this.retentionPeriod = retentionPeriod; - builder = new WindowStoreBuilder<>(new RocksDbWindowBytesStoreSupplier(name, - retentionPeriod, - numSegments, - windowSize, - retainDuplicates), - keySerde, - valueSerde, - time); - if (enableCaching) { - builder.withCachingEnabled(); - } - // logged by default so we only need to worry about when it is disabled. - if (!logged) { - builder.withLoggingDisabled(); - } - } - - public WindowStore<K, V> get() { - return builder.build(); - } - - @Override - public long retentionPeriod() { - return retentionPeriod; - } - -} diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java index b9b7181c5c3..5a87bc57d56 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java @@ -25,6 +25,8 @@ private final String name; private final long retentionPeriod; + private static final int NUM_SEGMENTS = 3; + public RocksDbSessionBytesStoreSupplier(final String name, final long retentionPeriod) { this.name = name; @@ -36,13 +38,12 @@ public String name() { return name; } - @SuppressWarnings("deprecation") @Override public SessionStore<Bytes, byte[]> get() { final RocksDBSegmentedBytesStore segmented = new RocksDBSegmentedBytesStore( name, retentionPeriod, - org.apache.kafka.streams.state.internals.RocksDBSessionStoreSupplier.NUM_SEGMENTS, + NUM_SEGMENTS, new SessionKeySchema()); return new RocksDBSessionStore<>(segmented, Serdes.Bytes(), Serdes.ByteArray()); } @@ -52,11 +53,10 @@ public String metricsScope() { return "rocksdb-session"; } - @SuppressWarnings("deprecation") @Override public long segmentIntervalMs() { return Segments.segmentInterval( retentionPeriod, - org.apache.kafka.streams.state.internals.RocksDBSessionStoreSupplier.NUM_SEGMENTS); + NUM_SEGMENTS); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java index e1521f8438c..5fbf491dfc4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java @@ -28,14 +28,15 @@ private final long windowSize; private final boolean retainDuplicates; - @SuppressWarnings("deprecation") + private static final int MIN_SEGMENTS = 2; + public RocksDbWindowBytesStoreSupplier(final String name, final long retentionPeriod, final int segments, final long windowSize, final boolean retainDuplicates) { - if (segments < org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier.MIN_SEGMENTS) { - throw new IllegalArgumentException("numSegments must be >= " + org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier.MIN_SEGMENTS); + if (segments < MIN_SEGMENTS) { + throw new IllegalArgumentException("numSegments must be >= " + MIN_SEGMENTS); } this.name = name; this.retentionPeriod = retentionPeriod; diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java index b947664fe29..8c3716b3542 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java @@ -33,8 +33,6 @@ /** * An in-memory LRU cache store similar to {@link MemoryLRUCache} but byte-based, not * record based - * - * @see org.apache.kafka.streams.state.Stores#create(String) */ public class ThreadCache { private final Logger log; diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyWrapper.java b/streams/src/test/java/org/apache/kafka/streams/TopologyWrapper.java new file mode 100644 index 00000000000..f1067667f31 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/TopologyWrapper.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams; + +import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; + +/** + * This class allows to access the {@link InternalTopologyBuilder} a {@link Topology} object. + * + */ +public class TopologyWrapper extends Topology { + + public InternalTopologyBuilder getInternalBuilder() { + return internalTopologyBuilder; + } + + public void setApplicationId(String applicationId) { + internalTopologyBuilder.setApplicationId(applicationId); + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java index d0361dc675c..e5160e1db91 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java @@ -29,17 +29,17 @@ import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.TopologyWrapper; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.processor.ProcessorSupplier; -import org.apache.kafka.streams.processor.TopologyBuilder; import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.Stores; import org.apache.kafka.test.IntegrationTest; import org.apache.kafka.test.MockProcessorSupplier; -import org.apache.kafka.test.MockStateStoreSupplier; import org.apache.kafka.test.StreamsTestUtils; import org.apache.kafka.test.TestCondition; import org.apache.kafka.test.TestUtils; @@ -136,8 +136,6 @@ public void testRegexMatchesTopicsAWhenCreated() throws Exception { final List<String> expectedFirstAssignment = Arrays.asList("TEST-TOPIC-1"); final List<String> expectedSecondAssignment = Arrays.asList("TEST-TOPIC-1", "TEST-TOPIC-2"); - final StreamsConfig streamsConfig = new StreamsConfig(streamsConfiguration); - CLUSTER.createTopic("TEST-TOPIC-1"); final StreamsBuilder builder = new StreamsBuilder(); @@ -227,28 +225,27 @@ public boolean conditionMet() { }, STREAM_TASKS_NOT_UPDATED); } - @SuppressWarnings("deprecation") @Test public void shouldAddStateStoreToRegexDefinedSource() throws InterruptedException { final ProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>(); - final MockStateStoreSupplier stateStoreSupplier = new MockStateStoreSupplier("testStateStore", false); + final StoreBuilder storeBuilder = Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("testStateStore"), Serdes.String(), Serdes.String()); final long thirtySecondTimeout = 30 * 1000; - final TopologyBuilder builder = new TopologyBuilder() - .addSource("ingest", Pattern.compile("topic-\\d+")) - .addProcessor("my-processor", processorSupplier, "ingest") - .addStateStore(stateStoreSupplier, "my-processor"); + final TopologyWrapper topology = new TopologyWrapper(); + topology.addSource("ingest", Pattern.compile("topic-\\d+")); + topology.addProcessor("my-processor", processorSupplier, "ingest"); + topology.addStateStore(storeBuilder, "my-processor"); + streams = new KafkaStreams(topology, streamsConfiguration); - streams = new KafkaStreams(builder, streamsConfiguration); try { streams.start(); final TestCondition stateStoreNameBoundToSourceTopic = new TestCondition() { @Override public boolean conditionMet() { - final Map<String, List<String>> stateStoreToSourceTopic = builder.stateStoreNameToSourceTopics(); + final Map<String, List<String>> stateStoreToSourceTopic = topology.getInternalBuilder().stateStoreNameToSourceTopics(); final List<String> topicNamesList = stateStoreToSourceTopic.get("testStateStore"); return topicNamesList != null && !topicNamesList.isEmpty() && topicNamesList.get(0).equals("topic-1"); } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java index 255c3ebb046..27f0833d760 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java @@ -18,9 +18,9 @@ import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.TopologyTestDriverWrapper; import org.apache.kafka.streams.errors.TopologyBuilderException; diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java index 8cb2eaebe1c..afc9be12c3f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java @@ -31,7 +31,8 @@ import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.SessionStore; -import org.apache.kafka.streams.state.internals.RocksDBSessionStoreSupplier; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.internals.ThreadCache; import org.apache.kafka.test.InternalMockProcessorContext; import org.apache.kafka.test.NoOpRecordCollector; @@ -43,7 +44,6 @@ import java.io.File; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.List; import static org.apache.kafka.test.StreamsTestUtils.getMetricByName; @@ -106,16 +106,16 @@ public void initializeStore() { } private void initStore(final boolean enableCaching) { - final RocksDBSessionStoreSupplier<String, Long> supplier = - new RocksDBSessionStoreSupplier<>( - STORE_NAME, - GAP_MS * 3, + final StoreBuilder<SessionStore<String, Long>> storeBuilder = Stores.sessionStoreBuilder(Stores.persistentSessionStore(STORE_NAME, GAP_MS * 3), Serdes.String(), - Serdes.Long(), - false, - Collections.<String, String>emptyMap(), - enableCaching); - sessionStore = supplier.get(); + Serdes.Long()) + .withLoggingDisabled(); + + if (enableCaching) { + storeBuilder.withCachingEnabled(); + } + + sessionStore = storeBuilder.build(); sessionStore.init(context, sessionStore); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java index d1d25e9ce4a..93b233b593c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java @@ -31,8 +31,6 @@ import org.apache.kafka.streams.processor.internals.ProcessorTopology; import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor; import org.apache.kafka.streams.processor.internals.UnwindowedChangelogTopicConfig; -import org.apache.kafka.streams.state.Stores; -import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier; import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.MockStateStoreSupplier; import org.apache.kafka.test.MockTimestampExtractor; @@ -547,23 +545,6 @@ public void shouldCorrectlyMapStateStoreToInternalTopics() { assertEquals(Collections.singletonList("appId-internal-topic"), stateStoreNameToSourceTopic.get("store")); } - @SuppressWarnings("unchecked") - @Test - public void shouldAddInternalTopicConfigForWindowStores() { - final TopologyBuilder builder = new TopologyBuilder(); - builder.setApplicationId("appId"); - builder.addSource("source", "topic"); - builder.addProcessor("processor", new MockProcessorSupplier(), "source"); - builder.addStateStore(new RocksDBWindowStoreSupplier("store", 30000, 3, false, null, null, 10000, true, Collections.<String, String>emptyMap(), false), "processor"); - final Map<Integer, TopicsInfo> topicGroups = builder.topicGroups(); - final TopicsInfo topicsInfo = topicGroups.values().iterator().next(); - final InternalTopicConfig topicConfig = topicsInfo.stateChangelogTopics.get("appId-store-changelog"); - final Map<String, String> properties = topicConfig.getProperties(Collections.<String, String>emptyMap(), 10000); - assertEquals(2, properties.size()); - assertEquals("40000", properties.get(TopicConfig.RETENTION_MS_CONFIG)); - assertEquals("appId-store-changelog", topicConfig.name()); - } - @Test public void shouldAddInternalTopicConfigForNonWindowStores() { final TopologyBuilder builder = new TopologyBuilder(); @@ -594,7 +575,7 @@ public void shouldAddInternalTopicConfigForRepartitionTopics() { } @Test - public void shouldThroughOnUnassignedStateStoreAccess() throws Exception { + public void shouldThrowOnUnassignedStateStoreAccess() { final String sourceNodeName = "source"; final String goodNodeName = "goodGuy"; final String badNodeName = "badGuy"; @@ -603,12 +584,11 @@ public void shouldThroughOnUnassignedStateStoreAccess() throws Exception { config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "host:1"); config.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId"); config.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath()); - final StreamsConfig streamsConfig = new StreamsConfig(config); final TopologyBuilder builder = new TopologyBuilder(); builder.addSource(sourceNodeName, "topic") .addProcessor(goodNodeName, new LocalMockProcessorSupplier(), sourceNodeName) - .addStateStore(Stores.create(LocalMockProcessorSupplier.STORE_NAME).withStringKeys().withStringValues().inMemory().build(), goodNodeName) + .addStateStore(new MockStateStoreSupplier(LocalMockProcessorSupplier.STORE_NAME, false), goodNodeName) .addProcessor(badNodeName, new LocalMockProcessorSupplier(), sourceNodeName); try { final TopologyTestDriverWrapper driver = new TopologyTestDriverWrapper(builder.internalTopologyBuilder, config); @@ -724,6 +704,7 @@ public void shouldAddTimestampExtractorWithOffsetResetAndKeyValSerdesAndPatternP assertThat(processorTopology.source(pattern.pattern()).getTimestampExtractor(), instanceOf(MockTimestampExtractor.class)); } + @SuppressWarnings("unchecked") @Test public void shouldConnectRegexMatchedTopicsToStateStore() throws Exception { @@ -755,10 +736,11 @@ public void shouldConnectRegexMatchedTopicsToStateStore() throws Exception { assertFalse(topics.contains("topic-A")); } + @SuppressWarnings("unchecked") @Test(expected = TopologyBuilderException.class) public void shouldNotAllowToAddGlobalStoreWithSourceNameEqualsProcessorName() { final String sameNameForSourceAndProcessor = "sameName"; - final TopologyBuilder topologyBuilder = new TopologyBuilder() + new TopologyBuilder() .addGlobalStore(new MockStateStoreSupplier("anyName", false, false), sameNameForSourceAndProcessor, null, diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java index 149a1584f2d..c73593e7165 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java @@ -29,11 +29,10 @@ import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.processor.StateStore; -import org.apache.kafka.streams.processor.StateStoreSupplier; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.Stores; -import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier; import org.apache.kafka.test.MockProcessorSupplier; -import org.apache.kafka.test.MockStateStoreSupplier; import org.apache.kafka.test.MockTimestampExtractor; import org.apache.kafka.test.TestUtils; import org.junit.Test; @@ -63,8 +62,9 @@ public class InternalTopologyBuilderTest { - private final InternalTopologyBuilder builder = new InternalTopologyBuilder(); private final Serde<String> stringSerde = Serdes.String(); + private final InternalTopologyBuilder builder = new InternalTopologyBuilder(); + private final StoreBuilder storeBuilder = Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("store"), Serdes.ByteArray(), Serdes.ByteArray()); @Test public void shouldAddSourceWithOffsetReset() { @@ -266,14 +266,14 @@ public void testNamedTopicMatchesAlreadyProvidedPattern() { @Test(expected = TopologyException.class) public void testAddStateStoreWithNonExistingProcessor() { - builder.addStateStore(new MockStateStoreSupplier("store", false), "no-such-processsor"); + builder.addStateStore(storeBuilder, "no-such-processsor"); } @Test public void testAddStateStoreWithSource() { builder.addSource(null, "source-1", null, null, null, "topic-1"); try { - builder.addStateStore(new MockStateStoreSupplier("store", false), "source-1"); + builder.addStateStore(storeBuilder, "source-1"); fail("Should throw TopologyException with store cannot be added to source"); } catch (final TopologyException expected) { /* ok */ } } @@ -282,36 +282,34 @@ public void testAddStateStoreWithSource() { public void testAddStateStoreWithSink() { builder.addSink("sink-1", "topic-1", null, null, null); try { - builder.addStateStore(new MockStateStoreSupplier("store", false), "sink-1"); + builder.addStateStore(storeBuilder, "sink-1"); fail("Should throw TopologyException with store cannot be added to sink"); } catch (final TopologyException expected) { /* ok */ } } @Test public void testAddStateStoreWithDuplicates() { - builder.addStateStore(new MockStateStoreSupplier("store", false)); + builder.addStateStore(storeBuilder); try { - builder.addStateStore(new MockStateStoreSupplier("store", false)); + builder.addStateStore(storeBuilder); fail("Should throw TopologyException with store name conflict"); } catch (final TopologyException expected) { /* ok */ } } - @SuppressWarnings("deprecation") @Test public void testAddStateStore() { - final StateStoreSupplier supplier = new MockStateStoreSupplier("store-1", false); - builder.addStateStore(supplier); + builder.addStateStore(storeBuilder); builder.setApplicationId("X"); builder.addSource(null, "source-1", null, null, null, "topic-1"); builder.addProcessor("processor-1", new MockProcessorSupplier(), "source-1"); assertEquals(0, builder.build(null).stateStores().size()); - builder.connectProcessorAndStateStores("processor-1", "store-1"); + builder.connectProcessorAndStateStores("processor-1", storeBuilder.name()); final List<StateStore> suppliers = builder.build(null).stateStores(); assertEquals(1, suppliers.size()); - assertEquals(supplier.name(), suppliers.get(0).name()); + assertEquals(storeBuilder.name(), suppliers.get(0).name()); } @Test @@ -346,7 +344,6 @@ public void testTopicGroups() { assertEquals(mkSet(mkSet("topic-1", "X-topic-1x", "topic-2")), new HashSet<>(copartitionGroups)); } - @SuppressWarnings("deprecation") @Test public void testTopicGroupsByStateStore() { builder.setApplicationId("X"); @@ -358,15 +355,14 @@ public void testTopicGroupsByStateStore() { builder.addProcessor("processor-1", new MockProcessorSupplier(), "source-1"); builder.addProcessor("processor-2", new MockProcessorSupplier(), "source-2"); - builder.addStateStore(new MockStateStoreSupplier("store-1", false), "processor-1", "processor-2"); + builder.addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("store-1"), Serdes.ByteArray(), Serdes.ByteArray()), "processor-1", "processor-2"); builder.addProcessor("processor-3", new MockProcessorSupplier(), "source-3"); builder.addProcessor("processor-4", new MockProcessorSupplier(), "source-4"); - builder.addStateStore(new MockStateStoreSupplier("store-2", false), "processor-3", "processor-4"); + builder.addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("store-2"), Serdes.ByteArray(), Serdes.ByteArray()), "processor-3", "processor-4"); builder.addProcessor("processor-5", new MockProcessorSupplier(), "source-5"); - final StateStoreSupplier supplier = new MockStateStoreSupplier("store-3", false); - builder.addStateStore(supplier); + builder.addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("store-3"), Serdes.ByteArray(), Serdes.ByteArray())); builder.connectProcessorAndStateStores("processor-5", "store-3"); final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups(); @@ -415,17 +411,17 @@ public void testBuild() { } @Test(expected = NullPointerException.class) - public void shouldNotAllowNullNameWhenAddingSink() throws Exception { + public void shouldNotAllowNullNameWhenAddingSink() { builder.addSink(null, "topic", null, null, null); } @Test(expected = NullPointerException.class) - public void shouldNotAllowNullTopicWhenAddingSink() throws Exception { + public void shouldNotAllowNullTopicWhenAddingSink() { builder.addSink("name", null, null, null, null); } @Test(expected = NullPointerException.class) - public void shouldNotAllowNullNameWhenAddingProcessor() throws Exception { + public void shouldNotAllowNullNameWhenAddingProcessor() { builder.addProcessor(null, new ProcessorSupplier() { @Override public Processor get() { @@ -435,39 +431,38 @@ public Processor get() { } @Test(expected = NullPointerException.class) - public void shouldNotAllowNullProcessorSupplier() throws Exception { + public void shouldNotAllowNullProcessorSupplier() { builder.addProcessor("name", null); } @Test(expected = NullPointerException.class) - public void shouldNotAllowNullNameWhenAddingSource() throws Exception { + public void shouldNotAllowNullNameWhenAddingSource() { builder.addSource(null, null, null, null, null, Pattern.compile(".*")); } @Test(expected = NullPointerException.class) - public void shouldNotAllowNullProcessorNameWhenConnectingProcessorAndStateStores() throws Exception { + public void shouldNotAllowNullProcessorNameWhenConnectingProcessorAndStateStores() { builder.connectProcessorAndStateStores(null, "store"); } @Test(expected = NullPointerException.class) - public void shouldNotAllowNullStateStoreNameWhenConnectingProcessorAndStateStores() throws Exception { + public void shouldNotAllowNullStateStoreNameWhenConnectingProcessorAndStateStores() { builder.connectProcessorAndStateStores("processor", new String[]{null}); } @Test(expected = NullPointerException.class) - public void shouldNotAddNullInternalTopic() throws Exception { + public void shouldNotAddNullInternalTopic() { builder.addInternalTopic(null); } @Test(expected = NullPointerException.class) - public void shouldNotSetApplicationIdToNull() throws Exception { + public void shouldNotSetApplicationIdToNull() { builder.setApplicationId(null); } - @SuppressWarnings("deprecation") @Test(expected = NullPointerException.class) - public void shouldNotAddNullStateStoreSupplier() throws Exception { - builder.addStateStore((StateStoreSupplier) null); + public void shouldNotAddNullStateStoreSupplier() { + builder.addStateStore((StoreBuilder) null); } private Set<String> nodeNames(final Collection<ProcessorNode> nodes) { @@ -479,44 +474,43 @@ public void shouldNotAddNullStateStoreSupplier() throws Exception { } @Test - public void shouldAssociateStateStoreNameWhenStateStoreSupplierIsInternal() throws Exception { + public void shouldAssociateStateStoreNameWhenStateStoreSupplierIsInternal() { builder.addSource(null, "source", null, null, null, "topic"); builder.addProcessor("processor", new MockProcessorSupplier(), "source"); - builder.addStateStore(new MockStateStoreSupplier("store", false), "processor"); + builder.addStateStore(storeBuilder, "processor"); final Map<String, List<String>> stateStoreNameToSourceTopic = builder.stateStoreNameToSourceTopics(); assertEquals(1, stateStoreNameToSourceTopic.size()); assertEquals(Collections.singletonList("topic"), stateStoreNameToSourceTopic.get("store")); } @Test - public void shouldAssociateStateStoreNameWhenStateStoreSupplierIsExternal() throws Exception { + public void shouldAssociateStateStoreNameWhenStateStoreSupplierIsExternal() { builder.addSource(null, "source", null, null, null, "topic"); builder.addProcessor("processor", new MockProcessorSupplier(), "source"); - builder.addStateStore(new MockStateStoreSupplier("store", false), "processor"); + builder.addStateStore(storeBuilder, "processor"); final Map<String, List<String>> stateStoreNameToSourceTopic = builder.stateStoreNameToSourceTopics(); assertEquals(1, stateStoreNameToSourceTopic.size()); assertEquals(Collections.singletonList("topic"), stateStoreNameToSourceTopic.get("store")); } @Test - public void shouldCorrectlyMapStateStoreToInternalTopics() throws Exception { + public void shouldCorrectlyMapStateStoreToInternalTopics() { builder.setApplicationId("appId"); builder.addInternalTopic("internal-topic"); builder.addSource(null, "source", null, null, null, "internal-topic"); builder.addProcessor("processor", new MockProcessorSupplier(), "source"); - builder.addStateStore(new MockStateStoreSupplier("store", false), "processor"); + builder.addStateStore(storeBuilder, "processor"); final Map<String, List<String>> stateStoreNameToSourceTopic = builder.stateStoreNameToSourceTopics(); assertEquals(1, stateStoreNameToSourceTopic.size()); assertEquals(Collections.singletonList("appId-internal-topic"), stateStoreNameToSourceTopic.get("store")); } - @SuppressWarnings("unchecked") @Test - public void shouldAddInternalTopicConfigForWindowStores() throws Exception { + public void shouldAddInternalTopicConfigForWindowStores() { builder.setApplicationId("appId"); builder.addSource(null, "source", null, null, null, "topic"); builder.addProcessor("processor", new MockProcessorSupplier(), "source"); - builder.addStateStore(new RocksDBWindowStoreSupplier("store", 30000, 3, false, null, null, 10000, true, Collections.<String, String>emptyMap(), false), "processor"); + builder.addStateStore(Stores.windowStoreBuilder(Stores.persistentWindowStore("store", 30000, 3, 10000, false), Serdes.String(), Serdes.String()), "processor"); final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups(); final InternalTopologyBuilder.TopicsInfo topicsInfo = topicGroups.values().iterator().next(); final InternalTopicConfig topicConfig = topicsInfo.stateChangelogTopics.get("appId-store-changelog"); @@ -528,13 +522,12 @@ public void shouldAddInternalTopicConfigForWindowStores() throws Exception { assertTrue(topicConfig instanceof WindowedChangelogTopicConfig); } - @SuppressWarnings("unchecked") @Test - public void shouldAddInternalTopicConfigForNonWindowStores() throws Exception { + public void shouldAddInternalTopicConfigForNonWindowStores() { builder.setApplicationId("appId"); builder.addSource(null, "source", null, null, null, "topic"); builder.addProcessor("processor", new MockProcessorSupplier(), "source"); - builder.addStateStore(new MockStateStoreSupplier("store", true), "processor"); + builder.addStateStore(storeBuilder, "processor"); final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups(); final InternalTopologyBuilder.TopicsInfo topicsInfo = topicGroups.values().iterator().next(); final InternalTopicConfig topicConfig = topicsInfo.stateChangelogTopics.get("appId-store-changelog"); @@ -545,9 +538,8 @@ public void shouldAddInternalTopicConfigForNonWindowStores() throws Exception { assertTrue(topicConfig instanceof UnwindowedChangelogTopicConfig); } - @SuppressWarnings("unchecked") @Test - public void shouldAddInternalTopicConfigForRepartitionTopics() throws Exception { + public void shouldAddInternalTopicConfigForRepartitionTopics() { builder.setApplicationId("appId"); builder.addInternalTopic("foo"); builder.addSource(null, "source", null, null, null, "foo"); @@ -561,7 +553,6 @@ public void shouldAddInternalTopicConfigForRepartitionTopics() throws Exception assertTrue(topicConfig instanceof RepartitionTopicConfig); } - @SuppressWarnings("deprecation") @Test public void shouldThrowOnUnassignedStateStoreAccess() { final String sourceNodeName = "source"; @@ -572,12 +563,11 @@ public void shouldThrowOnUnassignedStateStoreAccess() { config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "host:1"); config.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId"); config.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath()); - final StreamsConfig streamsConfig = new StreamsConfig(config); builder.addSource(null, sourceNodeName, null, null, null, "topic"); builder.addProcessor(goodNodeName, new LocalMockProcessorSupplier(), sourceNodeName); builder.addStateStore( - Stores.create(LocalMockProcessorSupplier.STORE_NAME).withStringKeys().withStringValues().inMemory().build(), + Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(LocalMockProcessorSupplier.STORE_NAME), Serdes.String(), Serdes.String()), goodNodeName); builder.addProcessor(badNodeName, new LocalMockProcessorSupplier(), sourceNodeName); @@ -641,17 +631,15 @@ public void shouldSetCorrectSourceNodesWithRegexUpdatedTopics() throws Exception } - @SuppressWarnings("unchecked") @Test - public void shouldAddTimestampExtractorPerSource() throws Exception { + public void shouldAddTimestampExtractorPerSource() { builder.addSource(null, "source", new MockTimestampExtractor(), null, null, "topic"); final ProcessorTopology processorTopology = builder.build(null); assertThat(processorTopology.source("topic").getTimestampExtractor(), instanceOf(MockTimestampExtractor.class)); } - @SuppressWarnings("unchecked") @Test - public void shouldAddTimestampExtractorWithPatternPerSource() throws Exception { + public void shouldAddTimestampExtractorWithPatternPerSource() { final Pattern pattern = Pattern.compile("t.*"); builder.addSource(null, "source", new MockTimestampExtractor(), null, null, pattern); final ProcessorTopology processorTopology = builder.build(null); @@ -659,7 +647,7 @@ public void shouldAddTimestampExtractorWithPatternPerSource() throws Exception { } @Test - public void shouldSortProcessorNodesCorrectly() throws Exception { + public void shouldSortProcessorNodesCorrectly() { builder.addSource(null, "source1", null, null, null, "topic1"); builder.addSource(null, "source2", null, null, null, "topic2"); builder.addProcessor("processor1", new MockProcessorSupplier(), "source1"); @@ -702,11 +690,12 @@ public void shouldSortProcessorNodesCorrectly() throws Exception { assertEquals(1, node.size); } + @SuppressWarnings("unchecked") @Test public void shouldConnectRegexMatchedTopicsToStateStore() throws Exception { builder.addSource(null, "ingest", null, null, null, Pattern.compile("topic-\\d+")); builder.addProcessor("my-processor", new MockProcessorSupplier(), "ingest"); - builder.addStateStore(new MockStateStoreSupplier("testStateStore", false), "my-processor"); + builder.addStateStore(storeBuilder, "my-processor"); final InternalTopologyBuilder.SubscriptionUpdates subscriptionUpdates = new InternalTopologyBuilder.SubscriptionUpdates(); final Field updatedTopicsField = subscriptionUpdates.getClass().getDeclaredField("updatedTopicSubscriptions"); @@ -722,7 +711,7 @@ public void shouldConnectRegexMatchedTopicsToStateStore() throws Exception { builder.setApplicationId("test-app"); final Map<String, List<String>> stateStoreAndTopics = builder.stateStoreNameToSourceTopics(); - final List<String> topics = stateStoreAndTopics.get("testStateStore"); + final List<String> topics = stateStoreAndTopics.get(storeBuilder.name()); assertTrue("Expected to contain two topics", topics.size() == 2); @@ -731,11 +720,12 @@ public void shouldConnectRegexMatchedTopicsToStateStore() throws Exception { assertFalse(topics.contains("topic-A")); } + @SuppressWarnings("unchecked") @Test(expected = TopologyException.class) public void shouldNotAllowToAddGlobalStoreWithSourceNameEqualsProcessorName() { final String sameNameForSourceAndProcessor = "sameName"; builder.addGlobalStore( - new MockStateStoreSupplier("anyName", false, false), + (StoreBuilder<KeyValueStore>) storeBuilder, sameNameForSourceAndProcessor, null, null, diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java index 51d4e05b5a2..9f2b2422ba2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java @@ -25,15 +25,14 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.TopologyTestDriverWrapper; +import org.apache.kafka.streams.TopologyWrapper; import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.ProcessorSupplier; -import org.apache.kafka.streams.processor.StateStoreSupplier; import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.processor.TimestampExtractor; import org.apache.kafka.streams.processor.To; -import org.apache.kafka.streams.processor.TopologyBuilder; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.test.ConsumerRecordFactory; @@ -64,7 +63,7 @@ private static final String OUTPUT_TOPIC_2 = "output-topic-2"; private static final String THROUGH_TOPIC_1 = "through-topic-1"; - private final TopologyBuilder builder = new TopologyBuilder(); + private final TopologyWrapper topology = new TopologyWrapper(); private final MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier(); private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER, 0L); @@ -94,36 +93,36 @@ public void cleanup() { @Test public void testTopologyMetadata() { - builder.setApplicationId("X"); + topology.setApplicationId("X"); - builder.addSource("source-1", "topic-1"); - builder.addSource("source-2", "topic-2", "topic-3"); - builder.addProcessor("processor-1", new MockProcessorSupplier<>(), "source-1"); - builder.addProcessor("processor-2", new MockProcessorSupplier<>(), "source-1", "source-2"); - builder.addSink("sink-1", "topic-3", "processor-1"); - builder.addSink("sink-2", "topic-4", "processor-1", "processor-2"); + topology.addSource("source-1", "topic-1"); + topology.addSource("source-2", "topic-2", "topic-3"); + topology.addProcessor("processor-1", new MockProcessorSupplier<>(), "source-1"); + topology.addProcessor("processor-2", new MockProcessorSupplier<>(), "source-1", "source-2"); + topology.addSink("sink-1", "topic-3", "processor-1"); + topology.addSink("sink-2", "topic-4", "processor-1", "processor-2"); - final ProcessorTopology topology = builder.build(null); + final ProcessorTopology processorTopology = topology.getInternalBuilder().build(); - assertEquals(6, topology.processors().size()); + assertEquals(6, processorTopology.processors().size()); - assertEquals(2, topology.sources().size()); + assertEquals(2, processorTopology.sources().size()); - assertEquals(3, topology.sourceTopics().size()); + assertEquals(3, processorTopology.sourceTopics().size()); - assertNotNull(topology.source("topic-1")); + assertNotNull(processorTopology.source("topic-1")); - assertNotNull(topology.source("topic-2")); + assertNotNull(processorTopology.source("topic-2")); - assertNotNull(topology.source("topic-3")); + assertNotNull(processorTopology.source("topic-3")); - assertEquals(topology.source("topic-2"), topology.source("topic-3")); + assertEquals(processorTopology.source("topic-2"), processorTopology.source("topic-3")); } @Test public void testDrivingSimpleTopology() { int partition = 10; - driver = new TopologyTestDriverWrapper(createSimpleTopology(partition).internalTopologyBuilder, props); + driver = new TopologyTestDriverWrapper(createSimpleTopology(partition), props); driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1")); assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1", partition); assertNoOutputRecord(OUTPUT_TOPIC_2); @@ -144,7 +143,7 @@ public void testDrivingSimpleTopology() { @Test public void testDrivingMultiplexingTopology() { - driver = new TopologyTestDriverWrapper(createMultiplexingTopology().internalTopologyBuilder, props); + driver = new TopologyTestDriverWrapper(createMultiplexingTopology(), props); driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1")); assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1(1)"); assertNextOutputRecord(OUTPUT_TOPIC_2, "key1", "value1(2)"); @@ -166,7 +165,7 @@ public void testDrivingMultiplexingTopology() { @Test public void testDrivingMultiplexByNameTopology() { - driver = new TopologyTestDriverWrapper(createMultiplexByNameTopology().internalTopologyBuilder, props); + driver = new TopologyTestDriverWrapper(createMultiplexByNameTopology(), props); driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1")); assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1(1)"); assertNextOutputRecord(OUTPUT_TOPIC_2, "key1", "value1(2)"); @@ -189,7 +188,7 @@ public void testDrivingMultiplexByNameTopology() { @Test public void testDrivingStatefulTopology() { String storeName = "entries"; - driver = new TopologyTestDriverWrapper(createStatefulTopology(storeName).internalTopologyBuilder, props); + driver = new TopologyTestDriverWrapper(createStatefulTopology(storeName), props); driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1")); driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key2", "value2")); driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key3", "value3")); @@ -203,19 +202,17 @@ public void testDrivingStatefulTopology() { assertNull(store.get("key4")); } - @SuppressWarnings("unchecked") @Test public void shouldDriveGlobalStore() { final String storeName = "my-store"; - final StateStoreSupplier storeSupplier = Stores.create(storeName) - .withStringKeys().withStringValues().inMemory().disableLogging().build(); final String global = "global"; final String topic = "topic"; - final TopologyBuilder topologyBuilder = this.builder - .addGlobalStore(storeSupplier, global, STRING_DESERIALIZER, STRING_DESERIALIZER, topic, "processor", define(new StatefulProcessor(storeName))); - driver = new TopologyTestDriverWrapper(topologyBuilder.internalTopologyBuilder, props); - final KeyValueStore<String, String> globalStore = (KeyValueStore<String, String>) topologyBuilder.globalStateStores().get("my-store"); + topology.addGlobalStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), Serdes.String(), Serdes.String()).withLoggingDisabled(), + global, STRING_DESERIALIZER, STRING_DESERIALIZER, topic, "processor", define(new StatefulProcessor(storeName))); + + driver = new TopologyTestDriverWrapper(topology.getInternalBuilder(), props); + final KeyValueStore<String, String> globalStore = driver.getKeyValueStore(storeName); driver.pipeInput(recordFactory.create(topic, "key1", "value1")); driver.pipeInput(recordFactory.create(topic, "key2", "value2")); assertEquals("value1", globalStore.get("key1")); @@ -225,7 +222,7 @@ public void shouldDriveGlobalStore() { @Test public void testDrivingSimpleMultiSourceTopology() { final int partition = 10; - driver = new TopologyTestDriverWrapper(createSimpleMultiSourceTopology(partition).internalTopologyBuilder, props); + driver = new TopologyTestDriverWrapper(createSimpleMultiSourceTopology(partition), props); driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1")); assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1", partition); @@ -238,7 +235,7 @@ public void testDrivingSimpleMultiSourceTopology() { @Test public void testDrivingForwardToSourceTopology() { - driver = new TopologyTestDriverWrapper(createForwardToSourceTopology().internalTopologyBuilder, props); + driver = new TopologyTestDriverWrapper(createForwardToSourceTopology(), props); driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1")); driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key2", "value2")); driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key3", "value3")); @@ -249,7 +246,7 @@ public void testDrivingForwardToSourceTopology() { @Test public void testDrivingInternalRepartitioningTopology() { - driver = new TopologyTestDriverWrapper(createInternalRepartitioningTopology().internalTopologyBuilder, props); + driver = new TopologyTestDriverWrapper(createInternalRepartitioningTopology(), props); driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1")); driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key2", "value2")); driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key3", "value3")); @@ -260,7 +257,7 @@ public void testDrivingInternalRepartitioningTopology() { @Test public void testDrivingInternalRepartitioningForwardingTimestampTopology() { - driver = new TopologyTestDriverWrapper(createInternalRepartitioningWithValueTimestampTopology().internalTopologyBuilder, props); + driver = new TopologyTestDriverWrapper(createInternalRepartitioningWithValueTimestampTopology(), props); driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1@1000")); driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key2", "value2@2000")); driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key3", "value3@3000")); @@ -274,29 +271,29 @@ public void testDrivingInternalRepartitioningForwardingTimestampTopology() { @Test public void shouldCreateStringWithSourceAndTopics() { - builder.addSource("source", "topic1", "topic2"); - final ProcessorTopology topology = builder.build(null); - final String result = topology.toString(); + topology.addSource("source", "topic1", "topic2"); + final ProcessorTopology processorTopology = topology.getInternalBuilder().build(); + final String result = processorTopology.toString(); assertThat(result, containsString("source:\n\t\ttopics:\t\t[topic1, topic2]\n")); } @Test public void shouldCreateStringWithMultipleSourcesAndTopics() { - builder.addSource("source", "topic1", "topic2"); - builder.addSource("source2", "t", "t1", "t2"); - final ProcessorTopology topology = builder.build(null); - final String result = topology.toString(); + topology.addSource("source", "topic1", "topic2"); + topology.addSource("source2", "t", "t1", "t2"); + final ProcessorTopology processorTopology = topology.getInternalBuilder().build(); + final String result = processorTopology.toString(); assertThat(result, containsString("source:\n\t\ttopics:\t\t[topic1, topic2]\n")); assertThat(result, containsString("source2:\n\t\ttopics:\t\t[t, t1, t2]\n")); } @Test public void shouldCreateStringWithProcessors() { - builder.addSource("source", "t") + topology.addSource("source", "t") .addProcessor("processor", mockProcessorSupplier, "source") .addProcessor("other", mockProcessorSupplier, "source"); - final ProcessorTopology topology = builder.build(null); - final String result = topology.toString(); + final ProcessorTopology processorTopology = topology.getInternalBuilder().build(); + final String result = processorTopology.toString(); assertThat(result, containsString("\t\tchildren:\t[processor, other]")); assertThat(result, containsString("processor:\n")); assertThat(result, containsString("other:\n")); @@ -304,14 +301,14 @@ public void shouldCreateStringWithProcessors() { @Test public void shouldRecursivelyPrintChildren() { - builder.addSource("source", "t") + topology.addSource("source", "t") .addProcessor("processor", mockProcessorSupplier, "source") .addProcessor("child-one", mockProcessorSupplier, "processor") .addProcessor("child-one-one", mockProcessorSupplier, "child-one") .addProcessor("child-two", mockProcessorSupplier, "processor") .addProcessor("child-two-one", mockProcessorSupplier, "child-two"); - final String result = builder.build(null).toString(); + final String result = topology.getInternalBuilder().build().toString(); assertThat(result, containsString("child-one:\n\t\tchildren:\t[child-one-one]")); assertThat(result, containsString("child-two:\n\t\tchildren:\t[child-two-one]")); } @@ -319,7 +316,7 @@ public void shouldRecursivelyPrintChildren() { @Test public void shouldConsiderTimeStamps() { final int partition = 10; - driver = new TopologyTestDriverWrapper(createSimpleTopology(partition).internalTopologyBuilder, props); + driver = new TopologyTestDriverWrapper(createSimpleTopology(partition), props); driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1", 10L)); driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key2", "value2", 20L)); driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key3", "value3", 30L)); @@ -331,7 +328,7 @@ public void shouldConsiderTimeStamps() { @Test public void shouldConsiderModifiedTimeStamps() { final int partition = 10; - driver = new TopologyTestDriverWrapper(createTimestampTopology(partition).internalTopologyBuilder, props); + driver = new TopologyTestDriverWrapper(createTimestampTopology(partition), props); driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1", 10L)); driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key2", "value2", 20L)); driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key3", "value3", 30L)); @@ -379,80 +376,92 @@ public Integer partition(final Object key, final Object value, final int numPart }; } - private TopologyBuilder createSimpleTopology(final int partition) { - return builder + private InternalTopologyBuilder createSimpleTopology(final int partition) { + return ((TopologyWrapper) topology .addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1) .addProcessor("processor", define(new ForwardingProcessor()), "source") - .addSink("sink", OUTPUT_TOPIC_1, constantPartitioner(partition), "processor"); + .addSink("sink", OUTPUT_TOPIC_1, constantPartitioner(partition), "processor")) + .getInternalBuilder(); } - private TopologyBuilder createTimestampTopology(final int partition) { - return builder + private InternalTopologyBuilder createTimestampTopology(final int partition) { + return ((TopologyWrapper) topology .addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1) .addProcessor("processor", define(new TimestampProcessor()), "source") - .addSink("sink", OUTPUT_TOPIC_1, constantPartitioner(partition), "processor"); + .addSink("sink", OUTPUT_TOPIC_1, constantPartitioner(partition), "processor")) + .getInternalBuilder(); } - private TopologyBuilder createMultiplexingTopology() { - return builder + private InternalTopologyBuilder createMultiplexingTopology() { + return ((TopologyWrapper) topology .addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1) .addProcessor("processor", define(new MultiplexingProcessor(2)), "source") .addSink("sink1", OUTPUT_TOPIC_1, "processor") - .addSink("sink2", OUTPUT_TOPIC_2, "processor"); + .addSink("sink2", OUTPUT_TOPIC_2, "processor")) + .getInternalBuilder(); } - private TopologyBuilder createMultiplexByNameTopology() { - return builder + private InternalTopologyBuilder createMultiplexByNameTopology() { + return ((TopologyWrapper) topology .addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1) .addProcessor("processor", define(new MultiplexByNameProcessor(2)), "source") .addSink("sink0", OUTPUT_TOPIC_1, "processor") - .addSink("sink1", OUTPUT_TOPIC_2, "processor"); + .addSink("sink1", OUTPUT_TOPIC_2, "processor")) + .getInternalBuilder(); } - private TopologyBuilder createStatefulTopology(final String storeName) { - return builder + private InternalTopologyBuilder createStatefulTopology(final String storeName) { + return ((TopologyWrapper) topology .addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1) .addProcessor("processor", define(new StatefulProcessor(storeName)), "source") - .addStateStore( - Stores.create(storeName).withStringKeys().withStringValues().inMemory().build(), - "processor" - ) - .addSink("counts", OUTPUT_TOPIC_1, "processor"); + .addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), Serdes.String(), Serdes.String()), "processor") + .addSink("counts", OUTPUT_TOPIC_1, "processor")) + .getInternalBuilder(); } - private TopologyBuilder createInternalRepartitioningTopology() { - return builder + private InternalTopologyBuilder createInternalRepartitioningTopology() { + final InternalTopologyBuilder internalTopologyBuilder = ((TopologyWrapper) topology .addSource("source", INPUT_TOPIC_1) - .addInternalTopic(THROUGH_TOPIC_1) .addSink("sink0", THROUGH_TOPIC_1, "source") .addSource("source1", THROUGH_TOPIC_1) - .addSink("sink1", OUTPUT_TOPIC_1, "source1"); + .addSink("sink1", OUTPUT_TOPIC_1, "source1")) + .getInternalBuilder(); + + internalTopologyBuilder.addInternalTopic(THROUGH_TOPIC_1); + + return internalTopologyBuilder; } - private TopologyBuilder createInternalRepartitioningWithValueTimestampTopology() { - return builder + private InternalTopologyBuilder createInternalRepartitioningWithValueTimestampTopology() { + final InternalTopologyBuilder internalTopologyBuilder = ((TopologyWrapper) topology .addSource("source", INPUT_TOPIC_1) - .addInternalTopic(THROUGH_TOPIC_1) .addProcessor("processor", define(new ValueTimestampProcessor()), "source") .addSink("sink0", THROUGH_TOPIC_1, "processor") .addSource("source1", THROUGH_TOPIC_1) - .addSink("sink1", OUTPUT_TOPIC_1, "source1"); + .addSink("sink1", OUTPUT_TOPIC_1, "source1")) + .getInternalBuilder(); + + internalTopologyBuilder.addInternalTopic(THROUGH_TOPIC_1); + + return internalTopologyBuilder; } - private TopologyBuilder createForwardToSourceTopology() { - return builder.addSource("source-1", INPUT_TOPIC_1) + private InternalTopologyBuilder createForwardToSourceTopology() { + return ((TopologyWrapper) topology.addSource("source-1", INPUT_TOPIC_1) .addSink("sink-1", OUTPUT_TOPIC_1, "source-1") .addSource("source-2", OUTPUT_TOPIC_1) - .addSink("sink-2", OUTPUT_TOPIC_2, "source-2"); + .addSink("sink-2", OUTPUT_TOPIC_2, "source-2")) + .getInternalBuilder(); } - private TopologyBuilder createSimpleMultiSourceTopology(int partition) { - return builder.addSource("source-1", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1) + private InternalTopologyBuilder createSimpleMultiSourceTopology(int partition) { + return ((TopologyWrapper) topology.addSource("source-1", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1) .addProcessor("processor-1", define(new ForwardingProcessor()), "source-1") .addSink("sink-1", OUTPUT_TOPIC_1, constantPartitioner(partition), "processor-1") .addSource("source-2", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_2) .addProcessor("processor-2", define(new ForwardingProcessor()), "source-2") - .addSink("sink-2", OUTPUT_TOPIC_2, constantPartitioner(partition), "processor-2"); + .addSink("sink-2", OUTPUT_TOPIC_2, constantPartitioner(partition), "processor-2")) + .getInternalBuilder(); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java index 54ea1ce7329..9090012fba2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java @@ -347,7 +347,6 @@ public void shouldCheckpointStoreOffsetsOnCommit() throws IOException { ); task.initializeStateStores(); - restoreStateConsumer.assign(new ArrayList<>(task.checkpointedOffsets().keySet())); final byte[] serializedValue = Serdes.Integer().serializer().serialize("", 1); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java index 4e04b4985ed..0048e730318 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java @@ -23,6 +23,7 @@ import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KeyValue; @@ -40,10 +41,10 @@ import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo; import org.apache.kafka.streams.state.HostInfo; import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.Stores; import org.apache.kafka.test.MockClientSupplier; import org.apache.kafka.test.MockInternalTopicManager; import org.apache.kafka.test.MockProcessorSupplier; -import org.apache.kafka.test.MockStateStoreSupplier; import org.easymock.Capture; import org.easymock.EasyMock; import org.junit.Test; @@ -328,10 +329,10 @@ public void shouldAssignEvenlyAcrossConsumersOneClientMultipleThreads() { public void testAssignWithPartialTopology() { builder.addSource(null, "source1", null, null, null, "topic1"); builder.addProcessor("processor1", new MockProcessorSupplier(), "source1"); - builder.addStateStore(new MockStateStoreSupplier("store1", false), "processor1"); + builder.addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("store1"), Serdes.ByteArray(), Serdes.ByteArray()), "processor1"); builder.addSource(null, "source2", null, null, null, "topic2"); builder.addProcessor("processor2", new MockProcessorSupplier(), "source2"); - builder.addStateStore(new MockStateStoreSupplier("store2", false), "processor2"); + builder.addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("store2"), Serdes.ByteArray(), Serdes.ByteArray()), "processor2"); List<String> topics = Utils.mkList("topic1", "topic2"); Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2); @@ -469,11 +470,11 @@ public void testAssignWithStates() { builder.addSource(null, "source2", null, null, null, "topic2"); builder.addProcessor("processor-1", new MockProcessorSupplier(), "source1"); - builder.addStateStore(new MockStateStoreSupplier("store1", false), "processor-1"); + builder.addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("store1"), Serdes.ByteArray(), Serdes.ByteArray()), "processor-1"); builder.addProcessor("processor-2", new MockProcessorSupplier(), "source2"); - builder.addStateStore(new MockStateStoreSupplier("store2", false), "processor-2"); - builder.addStateStore(new MockStateStoreSupplier("store3", false), "processor-2"); + builder.addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("store2"), Serdes.ByteArray(), Serdes.ByteArray()), "processor-2"); + builder.addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("store3"), Serdes.ByteArray(), Serdes.ByteArray()), "processor-2"); List<String> topics = Utils.mkList("topic1", "topic2"); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java b/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java index 665ebc0b418..5383c27ac43 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java @@ -17,7 +17,6 @@ package org.apache.kafka.streams.state; import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.processor.StateStoreSupplier; import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore; import org.apache.kafka.streams.state.internals.MemoryNavigableLRUCache; import org.apache.kafka.streams.state.internals.RocksDBSessionStore; @@ -25,16 +24,10 @@ import org.apache.kafka.streams.state.internals.RocksDBWindowStore; import org.junit.Test; -import java.util.Collections; -import java.util.Map; - import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.IsInstanceOf.instanceOf; import static org.hamcrest.core.IsNot.not; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; public class StoresTest { @@ -104,70 +97,10 @@ public void shouldThrowIfSupplierIsNullForSessionStoreBuilder() { Stores.sessionStoreBuilder(null, Serdes.ByteArray(), Serdes.ByteArray()); } - @SuppressWarnings("deprecation") - @Test - public void shouldCreateInMemoryStoreSupplierWithLoggedConfig() { - final StateStoreSupplier supplier = Stores.create("store") - .withKeys(Serdes.String()) - .withValues(Serdes.String()) - .inMemory() - .enableLogging(Collections.singletonMap("retention.ms", "1000")) - .build(); - - final Map<String, String> config = supplier.logConfig(); - assertTrue(supplier.loggingEnabled()); - assertEquals("1000", config.get("retention.ms")); - } - - @SuppressWarnings("deprecation") - @Test - public void shouldCreateInMemoryStoreSupplierNotLogged() { - final StateStoreSupplier supplier = Stores.create("store") - .withKeys(Serdes.String()) - .withValues(Serdes.String()) - .inMemory() - .disableLogging() - .build(); - - assertFalse(supplier.loggingEnabled()); - } - - @SuppressWarnings("deprecation") - @Test - public void shouldCreatePersistenStoreSupplierWithLoggedConfig() { - final StateStoreSupplier supplier = Stores.create("store") - .withKeys(Serdes.String()) - .withValues(Serdes.String()) - .persistent() - .enableLogging(Collections.singletonMap("retention.ms", "1000")) - .build(); - - final Map<String, String> config = supplier.logConfig(); - assertTrue(supplier.loggingEnabled()); - assertEquals("1000", config.get("retention.ms")); - } - - @SuppressWarnings("deprecation") - @Test - public void shouldCreatePersistenStoreSupplierNotLogged() { - final StateStoreSupplier supplier = Stores.create("store") - .withKeys(Serdes.String()) - .withValues(Serdes.String()) - .persistent() - .disableLogging() - .build(); - - assertFalse(supplier.loggingEnabled()); - } - @Test public void shouldThrowIllegalArgumentExceptionWhenTryingToConstructWindowStoreWithLessThanTwoSegments() { - final Stores.PersistentKeyValueFactory<String, String> storeFactory = Stores.create("store") - .withKeys(Serdes.String()) - .withValues(Serdes.String()) - .persistent(); try { - storeFactory.windowed(1, 1, 1, false); + Stores.persistentWindowStore("store", 1, 1, 1, false); fail("Should have thrown illegal argument exception as number of segments is less than 2"); } catch (final IllegalArgumentException e) { // ok diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java index 4ff0b907916..a061ff2117d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java @@ -16,12 +16,18 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.errors.InvalidStateStoreException; +import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.QueryableStoreTypes; +import org.apache.kafka.streams.state.StateSerdes; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.test.InternalMockProcessorContext; import org.apache.kafka.test.NoOpReadOnlyStore; +import org.apache.kafka.test.NoOpRecordCollector; import org.apache.kafka.test.StateStoreProviderStub; import org.junit.Before; import org.junit.Test; @@ -64,7 +70,16 @@ public void before() { } private KeyValueStore<String, String> newStoreInstance() { - return StateStoreTestUtils.newKeyValueStore(storeName, "app-id", String.class, String.class); + final KeyValueStore<String, String> store = Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), + Serdes.String(), + Serdes.String()) + .build(); + + store.init(new InternalMockProcessorContext(new StateSerdes<>(ProcessorStateManager.storeChangelogTopic("appId", storeName), Serdes.String(), Serdes.String()), + new NoOpRecordCollector()), + store); + + return store; } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java deleted file mode 100644 index b25b8cbb00a..00000000000 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java +++ /dev/null @@ -1,162 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.state.internals; - -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.metrics.Metrics; -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.common.utils.LogContext; -import org.apache.kafka.streams.StreamsMetrics; -import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; -import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.test.InternalMockProcessorContext; -import org.apache.kafka.test.NoOpRecordCollector; -import org.apache.kafka.test.TestUtils; -import org.junit.After; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.core.IsInstanceOf.instanceOf; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -public class RocksDBKeyValueStoreSupplierTest { - - private static final String STORE_NAME = "name"; - private final ThreadCache cache = new ThreadCache(new LogContext("test "), 1024, new MockStreamsMetrics(new Metrics())); - private final InternalMockProcessorContext context = new InternalMockProcessorContext(TestUtils.tempDirectory(), - Serdes.String(), - Serdes.String(), - new NoOpRecordCollector(), - cache); - private KeyValueStore<String, String> store; - - @After - public void close() { - store.close(); - } - - @Test - public void shouldCreateLoggingEnabledStoreWhenStoreLogged() { - store = createStore(true, false); - final List<ProducerRecord> logged = new ArrayList<>(); - final NoOpRecordCollector collector = new NoOpRecordCollector() { - @Override - public <K, V> void send(final String topic, - K key, - V value, - Integer partition, - Long timestamp, - Serializer<K> keySerializer, - Serializer<V> valueSerializer) { - logged.add(new ProducerRecord<K, V>(topic, partition, timestamp, key, value)); - } - }; - final InternalMockProcessorContext context = new InternalMockProcessorContext(TestUtils.tempDirectory(), - Serdes.String(), - Serdes.String(), - collector, - cache); - context.setTime(1); - store.init(context, store); - store.put("a", "b"); - assertFalse(logged.isEmpty()); - } - - @Test - public void shouldNotBeLoggingEnabledStoreWhenLoggingNotEnabled() { - store = createStore(false, false); - final List<ProducerRecord> logged = new ArrayList<>(); - final NoOpRecordCollector collector = new NoOpRecordCollector() { - @Override - public <K, V> void send(final String topic, - K key, - V value, - Integer partition, - Long timestamp, - Serializer<K> keySerializer, - Serializer<V> valueSerializer) { - logged.add(new ProducerRecord<>(topic, partition, timestamp, key, value)); - } - }; - final InternalMockProcessorContext context = new InternalMockProcessorContext(TestUtils.tempDirectory(), - Serdes.String(), - Serdes.String(), - collector, - cache); - context.setTime(1); - store.init(context, store); - store.put("a", "b"); - assertTrue(logged.isEmpty()); - } - - @Test - public void shouldHaveCachedKeyValueStoreWhenCachingEnabled() { - store = createStore(false, true); - store.init(context, store); - context.setTime(1); - store.put("a", "b"); - store.put("b", "c"); - assertThat(((WrappedStateStore) store).wrappedStore(), is(instanceOf(CachingKeyValueStore.class))); - assertThat(cache.size(), is(2L)); - } - - @Test - public void shouldReturnMeteredStoreWhenCachingAndLoggingDisabled() { - store = createStore(false, false); - assertThat(store, is(instanceOf(MeteredKeyValueBytesStore.class))); - } - - @Test - public void shouldReturnMeteredStoreWhenCachingDisabled() { - store = createStore(true, false); - assertThat(store, is(instanceOf(MeteredKeyValueBytesStore.class))); - } - - @Test - public void shouldHaveMeteredStoreWhenCached() { - store = createStore(false, true); - store.init(context, store); - final StreamsMetrics metrics = context.metrics(); - assertFalse(metrics.metrics().isEmpty()); - } - - @Test - public void shouldHaveMeteredStoreWhenLogged() { - store = createStore(true, false); - store.init(context, store); - final StreamsMetrics metrics = context.metrics(); - assertFalse(metrics.metrics().isEmpty()); - } - - @SuppressWarnings("unchecked") - private KeyValueStore<String, String> createStore(final boolean logged, final boolean cached) { - return new RocksDBKeyValueStoreSupplier<>(STORE_NAME, - Serdes.String(), - Serdes.String(), - logged, - Collections.EMPTY_MAP, - cached).get(); - } - -} \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java deleted file mode 100644 index c50dfba2942..00000000000 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.state.internals; - -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.metrics.Metrics; -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.common.utils.LogContext; -import org.apache.kafka.streams.StreamsMetrics; -import org.apache.kafka.streams.kstream.Windowed; -import org.apache.kafka.streams.kstream.internals.SessionWindow; -import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; -import org.apache.kafka.streams.state.SessionStore; -import org.apache.kafka.test.InternalMockProcessorContext; -import org.apache.kafka.test.NoOpRecordCollector; -import org.apache.kafka.test.TestUtils; -import org.junit.After; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.core.IsInstanceOf.instanceOf; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -public class RocksDBSessionStoreSupplierTest { - - private static final String STORE_NAME = "name"; - private final List<ProducerRecord> logged = new ArrayList<>(); - private final ThreadCache cache = new ThreadCache(new LogContext("test "), 1024, new MockStreamsMetrics(new Metrics())); - private final InternalMockProcessorContext context = new InternalMockProcessorContext(TestUtils.tempDirectory(), - Serdes.String(), - Serdes.String(), - new NoOpRecordCollector() { - @Override - public <K, V> void send(final String topic, - final K key, - final V value, - final Integer partition, - final Long timestamp, - final Serializer<K> keySerializer, - final Serializer<V> valueSerializer) { - logged.add(new ProducerRecord<>(topic, partition, timestamp, key, value)); - } - }, - cache); - - private SessionStore<String, String> store; - - @After - public void close() { - store.close(); - } - - @Test - public void shouldCreateLoggingEnabledStoreWhenStoreLogged() { - store = createStore(true, false); - context.setTime(1); - store.init(context, store); - store.put(new Windowed<>("a", new SessionWindow(0, 10)), "b"); - assertFalse(logged.isEmpty()); - } - - @Test - public void shouldNotBeLoggingEnabledStoreWhenLoggingNotEnabled() { - store = createStore(false, false); - context.setTime(1); - store.init(context, store); - store.put(new Windowed<>("a", new SessionWindow(0, 10)), "b"); - assertTrue(logged.isEmpty()); - } - - @Test - public void shouldReturnCachedSessionStoreWhenCachingEnabled() { - store = createStore(false, true); - store.init(context, store); - context.setTime(1); - store.put(new Windowed<>("a", new SessionWindow(0, 10)), "b"); - store.put(new Windowed<>("b", new SessionWindow(0, 10)), "c"); - assertThat(((WrappedStateStore) store).wrappedStore(), is(instanceOf(CachingSessionStore.class))); - assertThat(cache.size(), is(2L)); - } - - @Test - public void shouldHaveMeteredStoreWhenCached() { - store = createStore(false, true); - store.init(context, store); - final StreamsMetrics metrics = context.metrics(); - assertFalse(metrics.metrics().isEmpty()); - } - - @Test - public void shouldHaveMeteredStoreWhenLogged() { - store = createStore(true, false); - store.init(context, store); - final StreamsMetrics metrics = context.metrics(); - assertFalse(metrics.metrics().isEmpty()); - } - - @Test - public void shouldHaveMeteredStoreWhenNotLoggedOrCached() { - store = createStore(false, false); - store.init(context, store); - final StreamsMetrics metrics = context.metrics(); - assertFalse(metrics.metrics().isEmpty()); - } - - - - private SessionStore<String, String> createStore(final boolean logged, final boolean cached) { - return new RocksDBSessionStoreSupplier<>(STORE_NAME, - 10, - Serdes.String(), - Serdes.String(), - logged, - Collections.<String, String>emptyMap(), - cached).get(); - } - -} \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java deleted file mode 100644 index 7409a13c154..00000000000 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java +++ /dev/null @@ -1,175 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.state.internals; - -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.metrics.Metrics; -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.common.utils.LogContext; -import org.apache.kafka.streams.StreamsMetrics; -import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; -import org.apache.kafka.streams.state.WindowStore; -import org.apache.kafka.test.InternalMockProcessorContext; -import org.apache.kafka.test.NoOpRecordCollector; -import org.apache.kafka.test.TestUtils; -import org.junit.After; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.core.IsInstanceOf.instanceOf; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -public class RocksDBWindowStoreSupplierTest { - - private static final String STORE_NAME = "name"; - private WindowStore<String, String> store; - private final ThreadCache cache = new ThreadCache(new LogContext("test "), 1024, new MockStreamsMetrics(new Metrics())); - private final InternalMockProcessorContext context = new InternalMockProcessorContext(TestUtils.tempDirectory(), - Serdes.String(), - Serdes.String(), - new NoOpRecordCollector(), - cache); - - @After - public void close() { - if (store != null) { - store.close(); - } - } - - @Test - public void shouldCreateLoggingEnabledStoreWhenWindowStoreLogged() { - store = createStore(true, false, 3); - final List<ProducerRecord> logged = new ArrayList<>(); - final NoOpRecordCollector collector = new NoOpRecordCollector() { - @Override - public <K, V> void send(final String topic, - K key, - V value, - Integer partition, - Long timestamp, - Serializer<K> keySerializer, - Serializer<V> valueSerializer) { - logged.add(new ProducerRecord<K, V>(topic, partition, timestamp, key, value)); - } - }; - final InternalMockProcessorContext context = new InternalMockProcessorContext(TestUtils.tempDirectory(), - Serdes.String(), - Serdes.String(), - collector, - cache); - context.setTime(1); - store.init(context, store); - store.put("a", "b"); - assertFalse(logged.isEmpty()); - } - - @Test - public void shouldNotBeLoggingEnabledStoreWhenLogginNotEnabled() { - store = createStore(false, false, 3); - final List<ProducerRecord> logged = new ArrayList<>(); - final NoOpRecordCollector collector = new NoOpRecordCollector() { - @Override - public <K, V> void send(final String topic, - K key, - V value, - Integer partition, - Long timestamp, - Serializer<K> keySerializer, - Serializer<V> valueSerializer) { - logged.add(new ProducerRecord<K, V>(topic, partition, timestamp, key, value)); - } - }; - final InternalMockProcessorContext context = new InternalMockProcessorContext(TestUtils.tempDirectory(), - Serdes.String(), - Serdes.String(), - collector, - cache); - context.setTime(1); - store.init(context, store); - store.put("a", "b"); - assertTrue(logged.isEmpty()); - } - - @Test - public void shouldBeCachedWindowStoreWhenCachingEnabled() { - store = createStore(false, true, 3); - store.init(context, store); - context.setTime(1); - store.put("a", "b"); - store.put("b", "c"); - assertThat(((WrappedStateStore) store).wrappedStore(), is(instanceOf(CachingWindowStore.class))); - assertThat(context.getCache().size(), is(2L)); - } - - @Test - public void shouldHaveMeteredStoreAsOuterMost() { - assertThat(createStore(false, false, 2), instanceOf(MeteredWindowStore.class)); - assertThat(createStore(false, true, 2), instanceOf(MeteredWindowStore.class)); - assertThat(createStore(true, false, 2), instanceOf(MeteredWindowStore.class)); - } - @Test - public void shouldHaveMeteredStoreWhenCached() { - store = createStore(false, true, 3); - store.init(context, store); - final StreamsMetrics metrics = context.metrics(); - assertFalse(metrics.metrics().isEmpty()); - } - - @Test - public void shouldHaveMeteredStoreWhenLogged() { - store = createStore(true, false, 3); - store.init(context, store); - final StreamsMetrics metrics = context.metrics(); - assertFalse(metrics.metrics().isEmpty()); - } - - @Test - public void shouldHaveMeteredStoreWhenNotLoggedOrCached() { - store = createStore(false, false, 3); - store.init(context, store); - final StreamsMetrics metrics = context.metrics(); - assertFalse(metrics.metrics().isEmpty()); - } - - @Test(expected = IllegalArgumentException.class) - public void shouldThrowIllegalArgumentExceptionIfNumSegmentsLessThanTwo() { - createStore(true, true, 1); - } - - @SuppressWarnings("unchecked") - private WindowStore<String, String> createStore(final boolean logged, final boolean cached, final int numSegments) { - return new RocksDBWindowStoreSupplier<>(STORE_NAME, - 10, - numSegments, - false, - Serdes.String(), - Serdes.String(), - 10, - logged, - Collections.<String, String>emptyMap(), - cached).get(); - } - -} \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java deleted file mode 100644 index b1818c21ef1..00000000000 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.state.internals; - -import org.apache.kafka.common.utils.MockTime; -import org.apache.kafka.streams.processor.StateStore; -import org.apache.kafka.streams.processor.internals.ProcessorStateManager; -import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.streams.state.StateSerdes; -import org.apache.kafka.test.InternalMockProcessorContext; -import org.apache.kafka.test.NoOpRecordCollector; - -import java.util.Collections; - -@SuppressWarnings("unchecked") -public class StateStoreTestUtils { - - public static <K, V> KeyValueStore<K, V> newKeyValueStore(final String name, - final String applicationId, - final Class<K> keyType, - final Class<V> valueType) { - final InMemoryKeyValueStoreSupplier<K, V> supplier = new InMemoryKeyValueStoreSupplier<>(name, - null, - null, - new MockTime(), - false, - Collections.<String, String>emptyMap()); - - final StateStore stateStore = supplier.get(); - stateStore.init( - new InternalMockProcessorContext( - StateSerdes.withBuiltinTypes( - ProcessorStateManager.storeChangelogTopic(applicationId, name), - keyType, - valueType), - new NoOpRecordCollector()), - stateStore); - return (KeyValueStore<K, V>) stateStore; - - } - -} diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java index dc045360c1f..c24122abd13 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java @@ -20,13 +20,14 @@ import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.TopologyWrapper; import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.processor.TaskId; -import org.apache.kafka.streams.processor.TopologyBuilder; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.processor.internals.ProcessorTopology; import org.apache.kafka.streams.processor.internals.StateDirectory; @@ -68,21 +69,13 @@ private StreamThread threadMock; private Map<TaskId, StreamTask> tasks; - @SuppressWarnings("deprecation") @Before public void before() { - final TopologyBuilder builder = new TopologyBuilder(); - builder.addSource("the-source", topicName); - builder.addProcessor("the-processor", new MockProcessorSupplier(), "the-source"); - builder.addStateStore(Stores.create("kv-store") - .withStringKeys() - .withStringValues().inMemory().build(), "the-processor"); - - builder.addStateStore(Stores.create("window-store") - .withStringKeys() - .withStringValues() - .persistent() - .windowed(10, 10, 2, false).build(), "the-processor"); + final TopologyWrapper topology = new TopologyWrapper(); + topology.addSource("the-source", topicName); + topology.addProcessor("the-processor", new MockProcessorSupplier(), "the-source"); + topology.addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("kv-store"), Serdes.String(), Serdes.String()), "the-processor"); + topology.addStateStore(Stores.windowStoreBuilder(Stores.persistentWindowStore("window-store", 10, 2, 2, false), Serdes.String(), Serdes.String()), "the-processor"); final Properties properties = new Properties(); final String applicationId = "applicationId"; @@ -96,17 +89,17 @@ public void before() { configureRestoreConsumer(clientSupplier, "applicationId-kv-store-changelog"); configureRestoreConsumer(clientSupplier, "applicationId-window-store-changelog"); - builder.setApplicationId(applicationId); - final ProcessorTopology topology = builder.build(null); + topology.setApplicationId(applicationId); + final ProcessorTopology processorTopology = topology.getInternalBuilder().build(); tasks = new HashMap<>(); stateDirectory = new StateDirectory(streamsConfig, new MockTime()); - taskOne = createStreamsTask(streamsConfig, clientSupplier, topology, new TaskId(0, 0)); + taskOne = createStreamsTask(streamsConfig, clientSupplier, processorTopology, new TaskId(0, 0)); taskOne.initializeStateStores(); tasks.put(new TaskId(0, 0), taskOne); - final StreamTask taskTwo = createStreamsTask(streamsConfig, clientSupplier, topology, new TaskId(0, 1)); + final StreamTask taskTwo = createStreamsTask(streamsConfig, clientSupplier, processorTopology, new TaskId(0, 1)); taskTwo.initializeStateStores(); tasks.put(new TaskId(0, 1), taskTwo); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java index b5379d7142f..06c14eef124 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java @@ -17,11 +17,13 @@ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.state.NoOpWindowStore; import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import org.apache.kafka.streams.state.ReadOnlyWindowStore; +import org.apache.kafka.streams.state.Stores; import org.apache.kafka.test.StateStoreProviderStub; import org.junit.Before; import org.junit.Test; @@ -42,9 +44,15 @@ public void before() { final StateStoreProviderStub stubProviderTwo = new StateStoreProviderStub(false); - stubProviderOne.addStore("kv", StateStoreTestUtils.newKeyValueStore("kv", "app-id", String.class, String.class)); + stubProviderOne.addStore("kv", Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("kv"), + Serdes.serdeFrom(String.class), + Serdes.serdeFrom(String.class)) + .build()); stubProviderOne.addStore("window", new NoOpWindowStore()); - stubProviderTwo.addStore("kv", StateStoreTestUtils.newKeyValueStore("kv", "app-id", String.class, String.class)); + stubProviderTwo.addStore("kv", Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("kv"), + Serdes.serdeFrom(String.class), + Serdes.serdeFrom(String.class)) + .build()); stubProviderTwo.addStore("window", new NoOpWindowStore()); wrappingStoreProvider = new WrappingStoreProvider( ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Remove deprecated APIs from KIP-120 and KIP-182 in Streams > ---------------------------------------------------------- > > Key: KAFKA-6813 > URL: https://issues.apache.org/jira/browse/KAFKA-6813 > Project: Kafka > Issue Type: Improvement > Components: streams > Reporter: Guozhang Wang > Assignee: Guozhang Wang > Priority: Major > Fix For: 2.0.0 > > > As we move on to the next major release 2.0, we can consider removing the > deprecated APIs from KIP-120 and KIP-182. -- This message was sent by Atlassian JIRA (v7.6.3#76005)