[ 
https://issues.apache.org/jira/browse/KAFKA-6813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16469741#comment-16469741
 ] 

ASF GitHub Bot commented on KAFKA-6813:
---------------------------------------

guozhangwang closed pull request #4976: KAFKA-6813: Remove deprecated APIs in 
KIP-182, Part II
URL: https://github.com/apache/kafka/pull/4976
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
index 3c6539902e8..497bdac6d89 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
@@ -16,8 +16,6 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.common.internals.Topic;
-import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.ValueTransformer;
@@ -26,12 +24,7 @@
 import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.kstream.ValueMapperWithKey;
-import org.apache.kafka.streams.kstream.Window;
-import org.apache.kafka.streams.kstream.Windows;
 import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.Stores;
-import org.apache.kafka.streams.state.WindowStore;
 
 import java.util.HashSet;
 import java.util.Objects;
@@ -81,38 +74,6 @@ public R apply(T2 value2, T1 value1) {
         };
     }
 
-    @SuppressWarnings({"unchecked", "deprecation"})
-    static <T, K>  
org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> 
keyValueStore(final Serde<K> keySerde,
-                                                                   final 
Serde<T> aggValueSerde,
-                                                                   final 
String storeName) {
-        Objects.requireNonNull(storeName, "storeName can't be null");
-        Topic.validate(storeName);
-        return storeFactory(keySerde, aggValueSerde, storeName).build();
-    }
-
-    @SuppressWarnings({"unchecked", "deprecation"})
-    static  <W extends Window, T, K> 
org.apache.kafka.streams.processor.StateStoreSupplier<WindowStore> 
windowedStore(final Serde<K> keySerde,
-                                                                               
    final Serde<T> aggValSerde,
-                                                                               
    final Windows<W> windows,
-                                                                               
    final String storeName) {
-        Objects.requireNonNull(storeName, "storeName can't be null");
-        Topic.validate(storeName);
-        return storeFactory(keySerde, aggValSerde, storeName)
-                .windowed(windows.size(), windows.maintainMs(), 
windows.segments, false)
-                .build();
-    }
-
-    @SuppressWarnings("deprecation")
-    static  <T, K> Stores.PersistentKeyValueFactory<K, T> storeFactory(final 
Serde<K> keySerde,
-                                                                       final 
Serde<T> aggValueSerde,
-                                                                       final 
String storeName) {
-        return Stores.create(storeName)
-                .withKeys(keySerde)
-                .withValues(aggValueSerde)
-                .persistent()
-                .enableCaching();
-    }
-
     static <K, V, VR> ValueMapperWithKey<K, V, VR> withKey(final 
ValueMapper<V, VR> valueMapper) {
         Objects.requireNonNull(valueMapper, "valueMapper can't be null");
         return new ValueMapperWithKey<K, V, VR>() {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java 
b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index daa2915d2c1..27b985b7160 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -22,13 +22,8 @@
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
-import org.apache.kafka.streams.state.internals.InMemoryKeyValueStoreSupplier;
-import org.apache.kafka.streams.state.internals.InMemoryLRUCacheStoreSupplier;
 import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
 import org.apache.kafka.streams.state.internals.MemoryNavigableLRUCache;
-import org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreSupplier;
-import org.apache.kafka.streams.state.internals.RocksDBSessionStoreSupplier;
-import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier;
 import 
org.apache.kafka.streams.state.internals.RocksDbKeyValueBytesStoreSupplier;
 import 
org.apache.kafka.streams.state.internals.RocksDbSessionBytesStoreSupplier;
 import 
org.apache.kafka.streams.state.internals.RocksDbWindowBytesStoreSupplier;
@@ -37,9 +32,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Map;
 import java.util.Objects;
 
 /**
@@ -244,411 +236,5 @@ public static SessionBytesStoreSupplier 
persistentSessionStore(final String name
         Objects.requireNonNull(supplier, "supplier cannot be null");
         return new SessionStoreBuilder<>(supplier, keySerde, valueSerde, 
Time.SYSTEM);
     }
-
-    /**
-     * Begin to create a new {@link 
org.apache.kafka.streams.processor.StateStoreSupplier} instance.
-     *
-     * @param name the name of the store
-     * @return the factory that can be used to specify other options or 
configurations for the store; never null
-     * @deprecated use {@link #persistentKeyValueStore(String)}, {@link 
#persistentWindowStore(String, long, int, long, boolean)}
-     * {@link #persistentSessionStore(String, long)}, {@link #lruMap(String, 
int)}, or {@link #inMemoryKeyValueStore(String)}
-     */
-    @Deprecated
-    public static StoreFactory create(final String name) {
-        return new StoreFactory() {
-            @Override
-            public <K> ValueFactory<K> withKeys(final Serde<K> keySerde) {
-                return new ValueFactory<K>() {
-                    @Override
-                    public <V> KeyValueFactory<K, V> withValues(final Serde<V> 
valueSerde) {
-
-                        return new KeyValueFactory<K, V>() {
-
-                            @Override
-                            public InMemoryKeyValueFactory<K, V> inMemory() {
-                                return new InMemoryKeyValueFactory<K, V>() {
-                                    private int capacity = Integer.MAX_VALUE;
-                                    private final Map<String, String> 
logConfig = new HashMap<>();
-                                    private boolean logged = true;
-
-                                    /**
-                                     * @param capacity the maximum capacity of 
the in-memory cache; should be one less than a power of 2
-                                     * @throws IllegalArgumentException if the 
capacity of the store is zero or negative
-                                     */
-                                    @Override
-                                    public InMemoryKeyValueFactory<K, V> 
maxEntries(int capacity) {
-                                        if (capacity < 1) throw new 
IllegalArgumentException("The capacity must be positive");
-                                        this.capacity = capacity;
-                                        return this;
-                                    }
-
-                                    @Override
-                                    public InMemoryKeyValueFactory<K, V> 
enableLogging(final Map<String, String> config) {
-                                        logged = true;
-                                        logConfig.putAll(config);
-                                        return this;
-                                    }
-
-                                    @Override
-                                    public InMemoryKeyValueFactory<K, V> 
disableLogging() {
-                                        logged = false;
-                                        logConfig.clear();
-                                        return this;
-                                    }
-
-                                    @Override
-                                    public 
org.apache.kafka.streams.processor.StateStoreSupplier build() {
-                                        log.trace("Defining InMemory Store 
name={} capacity={} logged={}", name, capacity, logged);
-                                        if (capacity < Integer.MAX_VALUE) {
-                                            return new 
InMemoryLRUCacheStoreSupplier<>(name, capacity, keySerde, valueSerde, logged, 
logConfig);
-                                        }
-                                        return new 
InMemoryKeyValueStoreSupplier<>(name, keySerde, valueSerde, logged, logConfig);
-                                    }
-                                };
-                            }
-
-                            @Override
-                            public PersistentKeyValueFactory<K, V> 
persistent() {
-                                return new PersistentKeyValueFactory<K, V>() {
-                                    boolean cachingEnabled;
-                                    private long windowSize;
-                                    private final Map<String, String> 
logConfig = new HashMap<>();
-                                    private int numSegments = 0;
-                                    private long retentionPeriod = 0L;
-                                    private boolean retainDuplicates = false;
-                                    private boolean sessionWindows;
-                                    private boolean logged = true;
-
-                                    @Override
-                                    public PersistentKeyValueFactory<K, V> 
windowed(final long windowSize, final long retentionPeriod, final int 
numSegments, final boolean retainDuplicates) {
-                                        if (numSegments < 
RocksDBWindowStoreSupplier.MIN_SEGMENTS) {
-                                            throw new 
IllegalArgumentException("numSegments must be >= " + 
RocksDBWindowStoreSupplier.MIN_SEGMENTS);
-                                        }
-                                        this.windowSize = windowSize;
-                                        this.numSegments = numSegments;
-                                        this.retentionPeriod = retentionPeriod;
-                                        this.retainDuplicates = 
retainDuplicates;
-                                        this.sessionWindows = false;
-
-                                        return this;
-                                    }
-
-                                    @Override
-                                    public PersistentKeyValueFactory<K, V> 
sessionWindowed(final long retentionPeriod) {
-                                        this.sessionWindows = true;
-                                        this.retentionPeriod = retentionPeriod;
-                                        return this;
-                                    }
-
-                                    @Override
-                                    public PersistentKeyValueFactory<K, V> 
enableLogging(final Map<String, String> config) {
-                                        logged = true;
-                                        logConfig.putAll(config);
-                                        return this;
-                                    }
-
-                                    @Override
-                                    public PersistentKeyValueFactory<K, V> 
disableLogging() {
-                                        logged = false;
-                                        logConfig.clear();
-                                        return this;
-                                    }
-
-                                    @Override
-                                    public PersistentKeyValueFactory<K, V> 
enableCaching() {
-                                        cachingEnabled = true;
-                                        return this;
-                                    }
-
-                                    @Override
-                                    public 
org.apache.kafka.streams.processor.StateStoreSupplier build() {
-                                        log.trace("Defining RocksDb Store 
name={} numSegments={} logged={}", name, numSegments, logged);
-                                        if (sessionWindows) {
-                                            return new 
RocksDBSessionStoreSupplier<>(name, retentionPeriod, keySerde, valueSerde, 
logged, logConfig, cachingEnabled);
-                                        } else if (numSegments > 0) {
-                                            return new 
RocksDBWindowStoreSupplier<>(name, retentionPeriod, numSegments, 
retainDuplicates, keySerde, valueSerde, windowSize, logged, logConfig, 
cachingEnabled);
-                                        }
-                                        return new 
RocksDBKeyValueStoreSupplier<>(name, keySerde, valueSerde, logged, logConfig, 
cachingEnabled);
-                                    }
-
-                                };
-                            }
-
-
-                        };
-                    }
-                };
-            }
-        };
-    }
-
-
-    public static abstract class StoreFactory {
-        /**
-         * Begin to create a {@link KeyValueStore} by specifying the keys will 
be {@link String}s.
-         *
-         * @return the interface used to specify the type of values; never null
-         */
-        public ValueFactory<String> withStringKeys() {
-            return withKeys(Serdes.String());
-        }
-
-        /**
-         * Begin to create a {@link KeyValueStore} by specifying the keys will 
be {@link Integer}s.
-         *
-         * @return the interface used to specify the type of values; never null
-         */
-        public ValueFactory<Integer> withIntegerKeys() {
-            return withKeys(Serdes.Integer());
-        }
-
-        /**
-         * Begin to create a {@link KeyValueStore} by specifying the keys will 
be {@link Long}s.
-         *
-         * @return the interface used to specify the type of values; never null
-         */
-        public ValueFactory<Long> withLongKeys() {
-            return withKeys(Serdes.Long());
-        }
-
-        /**
-         * Begin to create a {@link KeyValueStore} by specifying the keys will 
be {@link Double}s.
-         *
-         * @return the interface used to specify the type of values; never null
-         */
-        public ValueFactory<Double> withDoubleKeys() {
-            return withKeys(Serdes.Double());
-        }
-
-        /**
-         * Begin to create a {@link KeyValueStore} by specifying the keys will 
be {@link ByteBuffer}.
-         *
-         * @return the interface used to specify the type of values; never null
-         */
-        public ValueFactory<ByteBuffer> withByteBufferKeys() {
-            return withKeys(Serdes.ByteBuffer());
-        }
-
-        /**
-         * Begin to create a {@link KeyValueStore} by specifying the keys will 
be byte arrays.
-         *
-         * @return the interface used to specify the type of values; never null
-         */
-        public ValueFactory<byte[]> withByteArrayKeys() {
-            return withKeys(Serdes.ByteArray());
-        }
-
-        /**
-         * Begin to create a {@link KeyValueStore} by specifying the keys.
-         *
-         * @param keyClass the class for the keys, which must be one of the 
types for which Kafka has built-in serdes
-         * @return the interface used to specify the type of values; never null
-         */
-        public <K> ValueFactory<K> withKeys(Class<K> keyClass) {
-            return withKeys(Serdes.serdeFrom(keyClass));
-        }
-
-        /**
-         * Begin to create a {@link KeyValueStore} by specifying the 
serializer and deserializer for the keys.
-         *
-         * @param keySerde  the serialization factory for keys; may be null
-         * @return          the interface used to specify the type of values; 
never null
-         */
-        public abstract <K> ValueFactory<K> withKeys(Serde<K> keySerde);
-    }
-
-    /**
-     * The factory for creating off-heap key-value stores.
-     *
-     * @param <K> the type of keys
-     */
-    public static abstract class ValueFactory<K> {
-        /**
-         * Use {@link String} values.
-         *
-         * @return the interface used to specify the remaining key-value store 
options; never null
-         */
-        public KeyValueFactory<K, String> withStringValues() {
-            return withValues(Serdes.String());
-        }
-
-        /**
-         * Use {@link Integer} values.
-         *
-         * @return the interface used to specify the remaining key-value store 
options; never null
-         */
-        public KeyValueFactory<K, Integer> withIntegerValues() {
-            return withValues(Serdes.Integer());
-        }
-
-        /**
-         * Use {@link Long} values.
-         *
-         * @return the interface used to specify the remaining key-value store 
options; never null
-         */
-        public KeyValueFactory<K, Long> withLongValues() {
-            return withValues(Serdes.Long());
-        }
-
-        /**
-         * Use {@link Double} values.
-         *
-         * @return the interface used to specify the remaining key-value store 
options; never null
-         */
-        public KeyValueFactory<K, Double> withDoubleValues() {
-            return withValues(Serdes.Double());
-        }
-
-        /**
-         * Use {@link ByteBuffer} for values.
-         *
-         * @return the interface used to specify the remaining key-value store 
options; never null
-         */
-        public KeyValueFactory<K, ByteBuffer> withByteBufferValues() {
-            return withValues(Serdes.ByteBuffer());
-        }
-
-        /**
-         * Use byte arrays for values.
-         *
-         * @return the interface used to specify the remaining key-value store 
options; never null
-         */
-        public KeyValueFactory<K, byte[]> withByteArrayValues() {
-            return withValues(Serdes.ByteArray());
-        }
-
-        /**
-         * Use values of the specified type.
-         *
-         * @param valueClass the class for the values, which must be one of 
the types for which Kafka has built-in serdes
-         * @return the interface used to specify the remaining key-value store 
options; never null
-         */
-        public <V> KeyValueFactory<K, V> withValues(Class<V> valueClass) {
-            return withValues(Serdes.serdeFrom(valueClass));
-        }
-
-        /**
-         * Use the specified serializer and deserializer for the values.
-         *
-         * @param valueSerde    the serialization factory for values; may be 
null
-         * @return              the interface used to specify the remaining 
key-value store options; never null
-         */
-        public abstract <V> KeyValueFactory<K, V> withValues(Serde<V> 
valueSerde);
-    }
-
-
-    public interface KeyValueFactory<K, V> {
-        /**
-         * Keep all key-value entries in-memory, although for durability all 
entries are recorded in a Kafka topic that can be
-         * read to restore the entries if they are lost.
-         *
-         * @return the factory to create in-memory key-value stores; never null
-         */
-        InMemoryKeyValueFactory<K, V> inMemory();
-
-        /**
-         * Keep all key-value entries off-heap in a local database, although 
for durability all entries are recorded in a Kafka
-         * topic that can be read to restore the entries if they are lost.
-         *
-         * @return the factory to create persistent key-value stores; never 
null
-         */
-        PersistentKeyValueFactory<K, V> persistent();
-    }
-
-    /**
-     * The interface used to create in-memory key-value stores.
-     *
-     * @param <K> the type of keys
-     * @param <V> the type of values
-     */
-    @Deprecated
-    public interface InMemoryKeyValueFactory<K, V> {
-        /**
-         * Limits the in-memory key-value store to hold a maximum number of 
entries. The default is {@link Integer#MAX_VALUE}, which is
-         * equivalent to not placing a limit on the number of entries.
-         *
-         * @param capacity the maximum capacity of the in-memory cache; should 
be one less than a power of 2
-         * @return this factory
-         * @throws IllegalArgumentException if the capacity is not positive
-         */
-        InMemoryKeyValueFactory<K, V> maxEntries(int capacity);
-
-        /**
-         * Indicates that a changelog should be created for the store. The 
changelog will be created
-         * with the provided cleanupPolicy and configs.
-         *
-         * Note: Any unrecognized configs will be ignored.
-         * @param config    any configs that should be applied to the changelog
-         * @return  the factory to create an in-memory key-value store
-         */
-        InMemoryKeyValueFactory<K, V> enableLogging(final Map<String, String> 
config);
-
-        /**
-         * Indicates that a changelog should not be created for the key-value 
store
-         * @return the factory to create an in-memory key-value store
-         */
-        InMemoryKeyValueFactory<K, V> disableLogging();
-
-
-        /**
-         * Return the instance of StateStoreSupplier of new key-value store.
-         * @return the state store supplier; never null
-         */
-        org.apache.kafka.streams.processor.StateStoreSupplier build();
-    }
-
-    /**
-     * The interface used to create off-heap key-value stores that use a local 
database.
-     *
-     * @param <K> the type of keys
-     * @param <V> the type of values
-     */
-    @Deprecated
-    public interface PersistentKeyValueFactory<K, V> {
-
-        /**
-         * Set the persistent store as a windowed key-value store
-         * @param windowSize size of the windows
-         * @param retentionPeriod the maximum period of time in milli-second 
to keep each window in this store
-         * @param numSegments the maximum number of segments for rolling the 
windowed store
-         * @param retainDuplicates whether or not to retain duplicate data 
within the window
-         */
-        PersistentKeyValueFactory<K, V> windowed(final long windowSize, long 
retentionPeriod, int numSegments, boolean retainDuplicates);
-
-        /**
-         * Set the persistent store as a {@link SessionStore} for use with 
{@link org.apache.kafka.streams.kstream.SessionWindows}
-         * @param retentionPeriod period of time in milliseconds to keep each 
window in this store
-         */
-        PersistentKeyValueFactory<K, V> sessionWindowed(final long 
retentionPeriod);
-
-        /**
-         * Indicates that a changelog should be created for the store. The 
changelog will be created
-         * with the provided cleanupPolicy and configs.
-         *
-         * Note: Any unrecognized configs will be ignored.
-         * @param config            any configs that should be applied to the 
changelog
-         * @return  the factory to create a persistent key-value store
-         */
-        PersistentKeyValueFactory<K, V> enableLogging(final Map<String, 
String> config);
-
-        /**
-         * Indicates that a changelog should not be created for the key-value 
store
-         * @return the factory to create a persistent key-value store
-         */
-        PersistentKeyValueFactory<K, V> disableLogging();
-
-        /**
-         * Caching should be enabled on the created store.
-         * @return the factory to create a persistent key-value store
-         */
-        PersistentKeyValueFactory<K, V> enableCaching();
-
-        /**
-         * Return the instance of StateStoreSupplier of new key-value store.
-         * @return the key-value store; never null
-         */
-        org.apache.kafka.streams.processor.StateStoreSupplier build();
-
-    }
 }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
deleted file mode 100644
index f9554212e01..00000000000
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.state.KeyValueStore;
-
-import java.util.Map;
-
-/**
- * An in-memory key-value store based on a TreeMap.
- *
- * Note that the use of array-typed keys is discouraged because they result in 
incorrect ordering behavior.
- * If you intend to work on byte arrays as key, for example, you may want to 
wrap them with the {@code Bytes} class,
- * i.e. use {@code RocksDBStore<Bytes, ...>} rather than {@code 
RocksDBStore<byte[], ...>}.
- *
- * @param <K> The key type
- * @param <V> The value type
- *
- * @see org.apache.kafka.streams.state.Stores#create(String)
- */
-@Deprecated
-public class InMemoryKeyValueStoreSupplier<K, V> extends 
AbstractStoreSupplier<K, V, KeyValueStore> {
-
-    public InMemoryKeyValueStoreSupplier(String name, Serde<K> keySerde, 
Serde<V> valueSerde, boolean logged, Map<String, String> logConfig) {
-        this(name, keySerde, valueSerde, null, logged, logConfig);
-    }
-
-    public InMemoryKeyValueStoreSupplier(String name, Serde<K> keySerde, 
Serde<V> valueSerde, Time time, boolean logged, Map<String, String> logConfig) {
-        super(name, keySerde, valueSerde, time, logged, logConfig);
-    }
-
-    public KeyValueStore get() {
-        InMemoryKeyValueStore<K, V> store = new InMemoryKeyValueStore<>(name, 
keySerde, valueSerde);
-
-        return new MeteredKeyValueStore<>(logged ? store.enableLogging() : 
store, "in-memory-state", time);
-    }
-}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
deleted file mode 100644
index 0f897ba8823..00000000000
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.state.KeyValueStore;
-
-import java.util.Map;
-
-/**
- * An in-memory key-value store that is limited in size and retains a maximum 
number of most recently used entries.
- *
- * @param <K> The key type
- * @param <V> The value type
- *
- */
-@Deprecated
-public class InMemoryLRUCacheStoreSupplier<K, V> extends 
AbstractStoreSupplier<K, V, KeyValueStore> {
-
-    private final int capacity;
-
-    public InMemoryLRUCacheStoreSupplier(String name, int capacity, Serde<K> 
keySerde, Serde<V> valueSerde, boolean logged, Map<String, String> logConfig) {
-        this(name, capacity, keySerde, valueSerde, null, logged, logConfig);
-    }
-
-    private InMemoryLRUCacheStoreSupplier(String name, int capacity, Serde<K> 
keySerde, Serde<V> valueSerde, Time time, boolean logged, Map<String, String> 
logConfig) {
-        super(name, keySerde, valueSerde, time, logged, logConfig);
-        this.capacity = capacity;
-    }
-
-    public KeyValueStore get() {
-        MemoryNavigableLRUCache<K, V> cache = new 
MemoryNavigableLRUCache<>(name, capacity, keySerde, valueSerde);
-        return new MeteredKeyValueStore<>(logged ? cache.enableLogging() : 
cache, "in-memory-lru-state", time);
-    }
-
-}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
index b99c907ab44..1957aa4753e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
@@ -37,12 +37,9 @@
  *  * Note that the use of array-typed keys is discouraged because they result 
in incorrect ordering behavior.
  * If you intend to work on byte arrays as key, for example, you may want to 
wrap them with the {@code Bytes} class,
  * i.e. use {@code RocksDBStore<Bytes, ...>} rather than {@code 
RocksDBStore<byte[], ...>}.
-
  *
  * @param <K> The key type
  * @param <V> The value type
- *
- * @see org.apache.kafka.streams.state.Stores#create(String)
  */
 public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
index 4b233f069ea..3bc56c2b670 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
@@ -27,7 +27,6 @@
  *
  * @param <K> the type of keys
  * @param <V> the type of values
- * @see org.apache.kafka.streams.state.Stores#create(String)
  */
 @Deprecated
 public class RocksDBKeyValueStoreSupplier<K, V> extends 
AbstractStoreSupplier<K, V, KeyValueStore> {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java
deleted file mode 100644
index 1552f7dfbfd..00000000000
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.state.SessionStore;
-
-import java.util.Map;
-
-/**
- * A {@link org.apache.kafka.streams.state.KeyValueStore} that stores all 
entries in a local RocksDB database.
- *
- * @param <K> the type of keys
- * @param <V> the type of values
- *
- * @see org.apache.kafka.streams.state.Stores#create(String)
- */
-@Deprecated
-public class RocksDBSessionStoreSupplier<K, V> extends 
AbstractStoreSupplier<K, V, SessionStore> implements 
WindowStoreSupplier<SessionStore> {
-
-    static final int NUM_SEGMENTS = 3;
-    private final long retentionPeriod;
-    private final SessionStoreBuilder<K, V> builder;
-
-    public RocksDBSessionStoreSupplier(String name, long retentionPeriod, 
Serde<K> keySerde, Serde<V> valueSerde, boolean logged, Map<String, String> 
logConfig, boolean cached) {
-        super(name, keySerde, valueSerde, Time.SYSTEM, logged, logConfig);
-        this.retentionPeriod = retentionPeriod;
-        builder = new SessionStoreBuilder<>(new 
RocksDbSessionBytesStoreSupplier(name,
-                                                                               
  retentionPeriod),
-                                            keySerde,
-                                            valueSerde,
-                                            time);
-        if (cached) {
-            builder.withCachingEnabled();
-        }
-        // logged by default so we only need to worry about when it is 
disabled.
-        if (!logged) {
-            builder.withLoggingDisabled();
-        }
-    }
-
-    public SessionStore<K, V> get() {
-        return builder.build();
-
-    }
-
-    public long retentionPeriod() {
-        return retentionPeriod;
-    }
-}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 281304140d5..d2b8cd2bac6 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -62,8 +62,6 @@
  * Note that the use of array-typed keys is discouraged because they result in 
incorrect caching behavior.
  * If you intend to work on byte arrays as key, for example, you may want to 
wrap them with the {@code Bytes} class,
  * i.e. use {@code RocksDBStore<Bytes, ...>} rather than {@code 
RocksDBStore<byte[], ...>}.
- *
- * @see org.apache.kafka.streams.state.Stores#create(String)
  */
 public class RocksDBStore implements KeyValueStore<Bytes, byte[]> {
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
deleted file mode 100644
index 2a82f798745..00000000000
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.state.WindowStore;
-
-import java.util.Map;
-
-/**
- * A {@link org.apache.kafka.streams.state.KeyValueStore} that stores all 
entries in a local RocksDB database.
- *
- * @param <K> the type of keys
- * @param <V> the type of values
- *
- * @see org.apache.kafka.streams.state.Stores#create(String)
- */
-@Deprecated
-public class RocksDBWindowStoreSupplier<K, V> extends AbstractStoreSupplier<K, 
V, WindowStore> implements WindowStoreSupplier<WindowStore> {
-    public static final int MIN_SEGMENTS = 2;
-    private final long retentionPeriod;
-    private WindowStoreBuilder<K, V> builder;
-
-    public RocksDBWindowStoreSupplier(String name, long retentionPeriod, int 
numSegments, boolean retainDuplicates, Serde<K> keySerde, Serde<V> valueSerde, 
long windowSize, boolean logged, Map<String, String> logConfig, boolean 
enableCaching) {
-        this(name, retentionPeriod, numSegments, retainDuplicates, keySerde, 
valueSerde, Time.SYSTEM, windowSize, logged, logConfig, enableCaching);
-    }
-
-    public RocksDBWindowStoreSupplier(String name, long retentionPeriod, int 
numSegments, boolean retainDuplicates, Serde<K> keySerde, Serde<V> valueSerde, 
Time time, long windowSize, boolean logged, Map<String, String> logConfig, 
boolean enableCaching) {
-        super(name, keySerde, valueSerde, time, logged, logConfig);
-        if (numSegments < MIN_SEGMENTS) {
-            throw new IllegalArgumentException("numSegments must be >= " + 
MIN_SEGMENTS);
-        }
-        this.retentionPeriod = retentionPeriod;
-        builder = new WindowStoreBuilder<>(new 
RocksDbWindowBytesStoreSupplier(name,
-                                                                               
retentionPeriod,
-                                                                               
numSegments,
-                                                                               
windowSize,
-                                                                               
retainDuplicates),
-                                           keySerde,
-                                           valueSerde,
-                                           time);
-        if (enableCaching) {
-            builder.withCachingEnabled();
-        }
-        // logged by default so we only need to worry about when it is 
disabled.
-        if (!logged) {
-            builder.withLoggingDisabled();
-        }
-    }
-
-    public WindowStore<K, V> get() {
-        return builder.build();
-    }
-
-    @Override
-    public long retentionPeriod() {
-        return retentionPeriod;
-    }
-
-}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java
index b9b7181c5c3..5a87bc57d56 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java
@@ -25,6 +25,8 @@
     private final String name;
     private final long retentionPeriod;
 
+    private static final int NUM_SEGMENTS = 3;
+
     public RocksDbSessionBytesStoreSupplier(final String name,
                                             final long retentionPeriod) {
         this.name = name;
@@ -36,13 +38,12 @@ public String name() {
         return name;
     }
 
-    @SuppressWarnings("deprecation")
     @Override
     public SessionStore<Bytes, byte[]> get() {
         final RocksDBSegmentedBytesStore segmented = new 
RocksDBSegmentedBytesStore(
             name,
             retentionPeriod,
-            
org.apache.kafka.streams.state.internals.RocksDBSessionStoreSupplier.NUM_SEGMENTS,
+            NUM_SEGMENTS,
             new SessionKeySchema());
         return new RocksDBSessionStore<>(segmented, Serdes.Bytes(), 
Serdes.ByteArray());
     }
@@ -52,11 +53,10 @@ public String metricsScope() {
         return "rocksdb-session";
     }
 
-    @SuppressWarnings("deprecation")
     @Override
     public long segmentIntervalMs() {
         return Segments.segmentInterval(
             retentionPeriod,
-            
org.apache.kafka.streams.state.internals.RocksDBSessionStoreSupplier.NUM_SEGMENTS);
+            NUM_SEGMENTS);
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
index e1521f8438c..5fbf491dfc4 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
@@ -28,14 +28,15 @@
     private final long windowSize;
     private final boolean retainDuplicates;
 
-    @SuppressWarnings("deprecation")
+    private static final int MIN_SEGMENTS = 2;
+
     public RocksDbWindowBytesStoreSupplier(final String name,
                                            final long retentionPeriod,
                                            final int segments,
                                            final long windowSize,
                                            final boolean retainDuplicates) {
-        if (segments < 
org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier.MIN_SEGMENTS)
 {
-            throw new IllegalArgumentException("numSegments must be >= " + 
org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier.MIN_SEGMENTS);
+        if (segments < MIN_SEGMENTS) {
+            throw new IllegalArgumentException("numSegments must be >= " + 
MIN_SEGMENTS);
         }
         this.name = name;
         this.retentionPeriod = retentionPeriod;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
index b947664fe29..8c3716b3542 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
@@ -33,8 +33,6 @@
 /**
  * An in-memory LRU cache store similar to {@link MemoryLRUCache} but 
byte-based, not
  * record based
- *
- * @see org.apache.kafka.streams.state.Stores#create(String)
  */
 public class ThreadCache {
     private final Logger log;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/TopologyWrapper.java 
b/streams/src/test/java/org/apache/kafka/streams/TopologyWrapper.java
new file mode 100644
index 00000000000..f1067667f31
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyWrapper.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+
+/**
+ *  This class allows to access the {@link InternalTopologyBuilder} a {@link 
Topology} object.
+ *
+ */
+public class TopologyWrapper extends Topology {
+
+    public InternalTopologyBuilder getInternalBuilder() {
+        return internalTopologyBuilder;
+    }
+
+    public void setApplicationId(String applicationId) {
+        internalTopologyBuilder.setApplicationId(applicationId);
+    }
+}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index d0361dc675c..e5160e1db91 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -29,17 +29,17 @@
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyWrapper;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.MockStateStoreSupplier;
 import org.apache.kafka.test.StreamsTestUtils;
 import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
@@ -136,8 +136,6 @@ public void testRegexMatchesTopicsAWhenCreated() throws 
Exception {
         final List<String> expectedFirstAssignment = 
Arrays.asList("TEST-TOPIC-1");
         final List<String> expectedSecondAssignment = 
Arrays.asList("TEST-TOPIC-1", "TEST-TOPIC-2");
 
-        final StreamsConfig streamsConfig = new 
StreamsConfig(streamsConfiguration);
-
         CLUSTER.createTopic("TEST-TOPIC-1");
 
         final StreamsBuilder builder = new StreamsBuilder();
@@ -227,28 +225,27 @@ public boolean conditionMet() {
         }, STREAM_TASKS_NOT_UPDATED);
     }
 
-    @SuppressWarnings("deprecation")
     @Test
     public void shouldAddStateStoreToRegexDefinedSource() throws 
InterruptedException {
 
         final ProcessorSupplier<String, String> processorSupplier = new 
MockProcessorSupplier<>();
-        final MockStateStoreSupplier stateStoreSupplier = new 
MockStateStoreSupplier("testStateStore", false);
+        final StoreBuilder storeBuilder = 
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("testStateStore"), 
Serdes.String(), Serdes.String());
         final long thirtySecondTimeout = 30 * 1000;
 
-        final TopologyBuilder builder = new TopologyBuilder()
-                .addSource("ingest", Pattern.compile("topic-\\d+"))
-                .addProcessor("my-processor", processorSupplier, "ingest")
-                .addStateStore(stateStoreSupplier, "my-processor");
+        final TopologyWrapper topology = new TopologyWrapper();
+        topology.addSource("ingest", Pattern.compile("topic-\\d+"));
+        topology.addProcessor("my-processor", processorSupplier, "ingest");
+        topology.addStateStore(storeBuilder, "my-processor");
 
+        streams = new KafkaStreams(topology, streamsConfiguration);
 
-        streams = new KafkaStreams(builder, streamsConfiguration);
         try {
             streams.start();
 
             final TestCondition stateStoreNameBoundToSourceTopic = new 
TestCondition() {
                 @Override
                 public boolean conditionMet() {
-                    final Map<String, List<String>> stateStoreToSourceTopic = 
builder.stateStoreNameToSourceTopics();
+                    final Map<String, List<String>> stateStoreToSourceTopic = 
topology.getInternalBuilder().stateStoreNameToSourceTopics();
                     final List<String> topicNamesList = 
stateStoreToSourceTopic.get("testStateStore");
                     return topicNamesList != null && !topicNamesList.isEmpty() 
&& topicNamesList.get(0).equals("topic-1");
                 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
index 255c3ebb046..27f0833d760 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
@@ -18,9 +18,9 @@
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TopologyTestDriverWrapper;
 import org.apache.kafka.streams.errors.TopologyBuilderException;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index 8cb2eaebe1c..afc9be12c3f 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -31,7 +31,8 @@
 import 
org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.SessionStore;
-import org.apache.kafka.streams.state.internals.RocksDBSessionStoreSupplier;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.NoOpRecordCollector;
@@ -43,7 +44,6 @@
 import java.io.File;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
 
 import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
@@ -106,16 +106,16 @@ public void initializeStore() {
     }
 
     private void initStore(final boolean enableCaching) {
-        final RocksDBSessionStoreSupplier<String, Long> supplier =
-            new RocksDBSessionStoreSupplier<>(
-                STORE_NAME,
-                GAP_MS * 3,
+        final StoreBuilder<SessionStore<String, Long>> storeBuilder = 
Stores.sessionStoreBuilder(Stores.persistentSessionStore(STORE_NAME, GAP_MS * 
3),
                 Serdes.String(),
-                Serdes.Long(),
-                false,
-                Collections.<String, String>emptyMap(),
-                enableCaching);
-        sessionStore = supplier.get();
+                Serdes.Long())
+                .withLoggingDisabled();
+
+        if (enableCaching) {
+            storeBuilder.withCachingEnabled();
+        }
+
+        sessionStore = storeBuilder.build();
         sessionStore.init(context, sessionStore);
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index d1d25e9ce4a..93b233b593c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -31,8 +31,6 @@
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
 import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
 import 
org.apache.kafka.streams.processor.internals.UnwindowedChangelogTopicConfig;
-import org.apache.kafka.streams.state.Stores;
-import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockStateStoreSupplier;
 import org.apache.kafka.test.MockTimestampExtractor;
@@ -547,23 +545,6 @@ public void shouldCorrectlyMapStateStoreToInternalTopics() 
{
         assertEquals(Collections.singletonList("appId-internal-topic"), 
stateStoreNameToSourceTopic.get("store"));
     }
 
-    @SuppressWarnings("unchecked")
-    @Test
-    public void shouldAddInternalTopicConfigForWindowStores() {
-        final TopologyBuilder builder = new TopologyBuilder();
-        builder.setApplicationId("appId");
-        builder.addSource("source", "topic");
-        builder.addProcessor("processor", new MockProcessorSupplier(), 
"source");
-        builder.addStateStore(new RocksDBWindowStoreSupplier("store", 30000, 
3, false, null, null, 10000, true, Collections.<String, String>emptyMap(), 
false), "processor");
-        final Map<Integer, TopicsInfo> topicGroups = builder.topicGroups();
-        final TopicsInfo topicsInfo = topicGroups.values().iterator().next();
-        final InternalTopicConfig topicConfig = 
topicsInfo.stateChangelogTopics.get("appId-store-changelog");
-        final Map<String, String> properties = 
topicConfig.getProperties(Collections.<String, String>emptyMap(), 10000);
-        assertEquals(2, properties.size());
-        assertEquals("40000", properties.get(TopicConfig.RETENTION_MS_CONFIG));
-        assertEquals("appId-store-changelog", topicConfig.name());
-    }
-
     @Test
     public void shouldAddInternalTopicConfigForNonWindowStores() {
         final TopologyBuilder builder = new TopologyBuilder();
@@ -594,7 +575,7 @@ public void 
shouldAddInternalTopicConfigForRepartitionTopics() {
     }
 
     @Test
-    public void shouldThroughOnUnassignedStateStoreAccess() throws Exception {
+    public void shouldThrowOnUnassignedStateStoreAccess() {
         final String sourceNodeName = "source";
         final String goodNodeName = "goodGuy";
         final String badNodeName = "badGuy";
@@ -603,12 +584,11 @@ public void shouldThroughOnUnassignedStateStoreAccess() 
throws Exception {
         config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "host:1");
         config.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
         config.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getAbsolutePath());
-        final StreamsConfig streamsConfig = new StreamsConfig(config);
 
         final TopologyBuilder builder = new TopologyBuilder();
         builder.addSource(sourceNodeName, "topic")
                 .addProcessor(goodNodeName, new LocalMockProcessorSupplier(), 
sourceNodeName)
-                
.addStateStore(Stores.create(LocalMockProcessorSupplier.STORE_NAME).withStringKeys().withStringValues().inMemory().build(),
 goodNodeName)
+                .addStateStore(new 
MockStateStoreSupplier(LocalMockProcessorSupplier.STORE_NAME, false), 
goodNodeName)
                 .addProcessor(badNodeName, new LocalMockProcessorSupplier(), 
sourceNodeName);
         try {
             final TopologyTestDriverWrapper driver = new 
TopologyTestDriverWrapper(builder.internalTopologyBuilder, config);
@@ -724,6 +704,7 @@ public void 
shouldAddTimestampExtractorWithOffsetResetAndKeyValSerdesAndPatternP
         
assertThat(processorTopology.source(pattern.pattern()).getTimestampExtractor(), 
instanceOf(MockTimestampExtractor.class));
     }
 
+    @SuppressWarnings("unchecked")
     @Test
     public void shouldConnectRegexMatchedTopicsToStateStore() throws Exception 
{
 
@@ -755,10 +736,11 @@ public void shouldConnectRegexMatchedTopicsToStateStore() 
throws Exception {
         assertFalse(topics.contains("topic-A"));
     }
 
+    @SuppressWarnings("unchecked")
     @Test(expected = TopologyBuilderException.class)
     public void 
shouldNotAllowToAddGlobalStoreWithSourceNameEqualsProcessorName() {
         final String sameNameForSourceAndProcessor = "sameName";
-        final TopologyBuilder topologyBuilder = new TopologyBuilder()
+        new TopologyBuilder()
             .addGlobalStore(new MockStateStoreSupplier("anyName", false, 
false),
                 sameNameForSourceAndProcessor,
                 null,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
index 149a1584f2d..c73593e7165 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
@@ -29,11 +29,10 @@
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
-import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier;
 import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.MockStateStoreSupplier;
 import org.apache.kafka.test.MockTimestampExtractor;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Test;
@@ -63,8 +62,9 @@
 
 public class InternalTopologyBuilderTest {
 
-    private final InternalTopologyBuilder builder = new 
InternalTopologyBuilder();
     private final Serde<String> stringSerde = Serdes.String();
+    private final InternalTopologyBuilder builder = new 
InternalTopologyBuilder();
+    private final StoreBuilder storeBuilder = 
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("store"), 
Serdes.ByteArray(), Serdes.ByteArray());
 
     @Test
     public void shouldAddSourceWithOffsetReset() {
@@ -266,14 +266,14 @@ public void testNamedTopicMatchesAlreadyProvidedPattern() 
{
 
     @Test(expected = TopologyException.class)
     public void testAddStateStoreWithNonExistingProcessor() {
-        builder.addStateStore(new MockStateStoreSupplier("store", false), 
"no-such-processsor");
+        builder.addStateStore(storeBuilder, "no-such-processsor");
     }
 
     @Test
     public void testAddStateStoreWithSource() {
         builder.addSource(null, "source-1", null, null, null, "topic-1");
         try {
-            builder.addStateStore(new MockStateStoreSupplier("store", false), 
"source-1");
+            builder.addStateStore(storeBuilder, "source-1");
             fail("Should throw TopologyException with store cannot be added to 
source");
         } catch (final TopologyException expected) { /* ok */ }
     }
@@ -282,36 +282,34 @@ public void testAddStateStoreWithSource() {
     public void testAddStateStoreWithSink() {
         builder.addSink("sink-1", "topic-1", null, null, null);
         try {
-            builder.addStateStore(new MockStateStoreSupplier("store", false), 
"sink-1");
+            builder.addStateStore(storeBuilder, "sink-1");
             fail("Should throw TopologyException with store cannot be added to 
sink");
         } catch (final TopologyException expected) { /* ok */ }
     }
 
     @Test
     public void testAddStateStoreWithDuplicates() {
-        builder.addStateStore(new MockStateStoreSupplier("store", false));
+        builder.addStateStore(storeBuilder);
         try {
-            builder.addStateStore(new MockStateStoreSupplier("store", false));
+            builder.addStateStore(storeBuilder);
             fail("Should throw TopologyException with store name conflict");
         } catch (final TopologyException expected) { /* ok */ }
     }
 
-    @SuppressWarnings("deprecation")
     @Test
     public void testAddStateStore() {
-        final StateStoreSupplier supplier = new 
MockStateStoreSupplier("store-1", false);
-        builder.addStateStore(supplier);
+        builder.addStateStore(storeBuilder);
         builder.setApplicationId("X");
         builder.addSource(null, "source-1", null, null, null, "topic-1");
         builder.addProcessor("processor-1", new MockProcessorSupplier(), 
"source-1");
 
         assertEquals(0, builder.build(null).stateStores().size());
 
-        builder.connectProcessorAndStateStores("processor-1", "store-1");
+        builder.connectProcessorAndStateStores("processor-1", 
storeBuilder.name());
 
         final List<StateStore> suppliers = builder.build(null).stateStores();
         assertEquals(1, suppliers.size());
-        assertEquals(supplier.name(), suppliers.get(0).name());
+        assertEquals(storeBuilder.name(), suppliers.get(0).name());
     }
 
     @Test
@@ -346,7 +344,6 @@ public void testTopicGroups() {
         assertEquals(mkSet(mkSet("topic-1", "X-topic-1x", "topic-2")), new 
HashSet<>(copartitionGroups));
     }
 
-    @SuppressWarnings("deprecation")
     @Test
     public void testTopicGroupsByStateStore() {
         builder.setApplicationId("X");
@@ -358,15 +355,14 @@ public void testTopicGroupsByStateStore() {
 
         builder.addProcessor("processor-1", new MockProcessorSupplier(), 
"source-1");
         builder.addProcessor("processor-2", new MockProcessorSupplier(), 
"source-2");
-        builder.addStateStore(new MockStateStoreSupplier("store-1", false), 
"processor-1", "processor-2");
+        
builder.addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("store-1"),
 Serdes.ByteArray(), Serdes.ByteArray()), "processor-1", "processor-2");
 
         builder.addProcessor("processor-3", new MockProcessorSupplier(), 
"source-3");
         builder.addProcessor("processor-4", new MockProcessorSupplier(), 
"source-4");
-        builder.addStateStore(new MockStateStoreSupplier("store-2", false), 
"processor-3", "processor-4");
+        
builder.addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("store-2"),
 Serdes.ByteArray(), Serdes.ByteArray()), "processor-3", "processor-4");
 
         builder.addProcessor("processor-5", new MockProcessorSupplier(), 
"source-5");
-        final StateStoreSupplier supplier = new 
MockStateStoreSupplier("store-3", false);
-        builder.addStateStore(supplier);
+        
builder.addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("store-3"),
 Serdes.ByteArray(), Serdes.ByteArray()));
         builder.connectProcessorAndStateStores("processor-5", "store-3");
 
         final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = 
builder.topicGroups();
@@ -415,17 +411,17 @@ public void testBuild() {
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullNameWhenAddingSink() throws Exception {
+    public void shouldNotAllowNullNameWhenAddingSink() {
         builder.addSink(null, "topic", null, null, null);
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullTopicWhenAddingSink() throws Exception {
+    public void shouldNotAllowNullTopicWhenAddingSink() {
         builder.addSink("name", null, null, null, null);
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullNameWhenAddingProcessor() throws Exception {
+    public void shouldNotAllowNullNameWhenAddingProcessor() {
         builder.addProcessor(null, new ProcessorSupplier() {
             @Override
             public Processor get() {
@@ -435,39 +431,38 @@ public Processor get() {
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullProcessorSupplier() throws Exception {
+    public void shouldNotAllowNullProcessorSupplier() {
         builder.addProcessor("name", null);
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullNameWhenAddingSource() throws Exception {
+    public void shouldNotAllowNullNameWhenAddingSource() {
         builder.addSource(null, null, null, null, null, Pattern.compile(".*"));
     }
 
     @Test(expected = NullPointerException.class)
-    public void 
shouldNotAllowNullProcessorNameWhenConnectingProcessorAndStateStores() throws 
Exception {
+    public void 
shouldNotAllowNullProcessorNameWhenConnectingProcessorAndStateStores() {
         builder.connectProcessorAndStateStores(null, "store");
     }
 
     @Test(expected = NullPointerException.class)
-    public void 
shouldNotAllowNullStateStoreNameWhenConnectingProcessorAndStateStores() throws 
Exception {
+    public void 
shouldNotAllowNullStateStoreNameWhenConnectingProcessorAndStateStores() {
         builder.connectProcessorAndStateStores("processor", new 
String[]{null});
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAddNullInternalTopic() throws Exception {
+    public void shouldNotAddNullInternalTopic() {
         builder.addInternalTopic(null);
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotSetApplicationIdToNull() throws Exception {
+    public void shouldNotSetApplicationIdToNull() {
         builder.setApplicationId(null);
     }
 
-    @SuppressWarnings("deprecation")
     @Test(expected = NullPointerException.class)
-    public void shouldNotAddNullStateStoreSupplier() throws Exception {
-        builder.addStateStore((StateStoreSupplier) null);
+    public void shouldNotAddNullStateStoreSupplier() {
+        builder.addStateStore((StoreBuilder) null);
     }
 
     private Set<String> nodeNames(final Collection<ProcessorNode> nodes) {
@@ -479,44 +474,43 @@ public void shouldNotAddNullStateStoreSupplier() throws 
Exception {
     }
 
     @Test
-    public void 
shouldAssociateStateStoreNameWhenStateStoreSupplierIsInternal() throws 
Exception {
+    public void 
shouldAssociateStateStoreNameWhenStateStoreSupplierIsInternal() {
         builder.addSource(null, "source", null, null, null, "topic");
         builder.addProcessor("processor", new MockProcessorSupplier(), 
"source");
-        builder.addStateStore(new MockStateStoreSupplier("store", false), 
"processor");
+        builder.addStateStore(storeBuilder, "processor");
         final Map<String, List<String>> stateStoreNameToSourceTopic = 
builder.stateStoreNameToSourceTopics();
         assertEquals(1, stateStoreNameToSourceTopic.size());
         assertEquals(Collections.singletonList("topic"), 
stateStoreNameToSourceTopic.get("store"));
     }
 
     @Test
-    public void 
shouldAssociateStateStoreNameWhenStateStoreSupplierIsExternal() throws 
Exception {
+    public void 
shouldAssociateStateStoreNameWhenStateStoreSupplierIsExternal() {
         builder.addSource(null, "source", null, null, null, "topic");
         builder.addProcessor("processor", new MockProcessorSupplier(), 
"source");
-        builder.addStateStore(new MockStateStoreSupplier("store", false), 
"processor");
+        builder.addStateStore(storeBuilder, "processor");
         final Map<String, List<String>> stateStoreNameToSourceTopic = 
builder.stateStoreNameToSourceTopics();
         assertEquals(1, stateStoreNameToSourceTopic.size());
         assertEquals(Collections.singletonList("topic"), 
stateStoreNameToSourceTopic.get("store"));
     }
 
     @Test
-    public void shouldCorrectlyMapStateStoreToInternalTopics() throws 
Exception {
+    public void shouldCorrectlyMapStateStoreToInternalTopics() {
         builder.setApplicationId("appId");
         builder.addInternalTopic("internal-topic");
         builder.addSource(null, "source", null, null, null, "internal-topic");
         builder.addProcessor("processor", new MockProcessorSupplier(), 
"source");
-        builder.addStateStore(new MockStateStoreSupplier("store", false), 
"processor");
+        builder.addStateStore(storeBuilder, "processor");
         final Map<String, List<String>> stateStoreNameToSourceTopic = 
builder.stateStoreNameToSourceTopics();
         assertEquals(1, stateStoreNameToSourceTopic.size());
         assertEquals(Collections.singletonList("appId-internal-topic"), 
stateStoreNameToSourceTopic.get("store"));
     }
 
-    @SuppressWarnings("unchecked")
     @Test
-    public void shouldAddInternalTopicConfigForWindowStores() throws Exception 
{
+    public void shouldAddInternalTopicConfigForWindowStores() {
         builder.setApplicationId("appId");
         builder.addSource(null, "source", null, null, null, "topic");
         builder.addProcessor("processor", new MockProcessorSupplier(), 
"source");
-        builder.addStateStore(new RocksDBWindowStoreSupplier("store", 30000, 
3, false, null, null, 10000, true, Collections.<String, String>emptyMap(), 
false), "processor");
+        
builder.addStateStore(Stores.windowStoreBuilder(Stores.persistentWindowStore("store",
 30000, 3, 10000, false), Serdes.String(), Serdes.String()), "processor");
         final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = 
builder.topicGroups();
         final InternalTopologyBuilder.TopicsInfo topicsInfo = 
topicGroups.values().iterator().next();
         final InternalTopicConfig topicConfig = 
topicsInfo.stateChangelogTopics.get("appId-store-changelog");
@@ -528,13 +522,12 @@ public void shouldAddInternalTopicConfigForWindowStores() 
throws Exception {
         assertTrue(topicConfig instanceof WindowedChangelogTopicConfig);
     }
 
-    @SuppressWarnings("unchecked")
     @Test
-    public void shouldAddInternalTopicConfigForNonWindowStores() throws 
Exception {
+    public void shouldAddInternalTopicConfigForNonWindowStores() {
         builder.setApplicationId("appId");
         builder.addSource(null, "source", null, null, null, "topic");
         builder.addProcessor("processor", new MockProcessorSupplier(), 
"source");
-        builder.addStateStore(new MockStateStoreSupplier("store", true), 
"processor");
+        builder.addStateStore(storeBuilder, "processor");
         final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = 
builder.topicGroups();
         final InternalTopologyBuilder.TopicsInfo topicsInfo = 
topicGroups.values().iterator().next();
         final InternalTopicConfig topicConfig = 
topicsInfo.stateChangelogTopics.get("appId-store-changelog");
@@ -545,9 +538,8 @@ public void 
shouldAddInternalTopicConfigForNonWindowStores() throws Exception {
         assertTrue(topicConfig instanceof UnwindowedChangelogTopicConfig);
     }
 
-    @SuppressWarnings("unchecked")
     @Test
-    public void shouldAddInternalTopicConfigForRepartitionTopics() throws 
Exception {
+    public void shouldAddInternalTopicConfigForRepartitionTopics() {
         builder.setApplicationId("appId");
         builder.addInternalTopic("foo");
         builder.addSource(null, "source", null, null, null, "foo");
@@ -561,7 +553,6 @@ public void 
shouldAddInternalTopicConfigForRepartitionTopics() throws Exception
         assertTrue(topicConfig instanceof RepartitionTopicConfig);
     }
 
-    @SuppressWarnings("deprecation")
     @Test
     public void shouldThrowOnUnassignedStateStoreAccess() {
         final String sourceNodeName = "source";
@@ -572,12 +563,11 @@ public void shouldThrowOnUnassignedStateStoreAccess() {
         config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "host:1");
         config.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
         config.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getAbsolutePath());
-        final StreamsConfig streamsConfig = new StreamsConfig(config);
 
         builder.addSource(null, sourceNodeName, null, null, null, "topic");
         builder.addProcessor(goodNodeName, new LocalMockProcessorSupplier(), 
sourceNodeName);
         builder.addStateStore(
-            
Stores.create(LocalMockProcessorSupplier.STORE_NAME).withStringKeys().withStringValues().inMemory().build(),
+            
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(LocalMockProcessorSupplier.STORE_NAME),
 Serdes.String(), Serdes.String()),
             goodNodeName);
         builder.addProcessor(badNodeName, new LocalMockProcessorSupplier(), 
sourceNodeName);
         
@@ -641,17 +631,15 @@ public void 
shouldSetCorrectSourceNodesWithRegexUpdatedTopics() throws Exception
 
     }
 
-    @SuppressWarnings("unchecked")
     @Test
-    public void shouldAddTimestampExtractorPerSource() throws Exception {
+    public void shouldAddTimestampExtractorPerSource() {
         builder.addSource(null, "source", new MockTimestampExtractor(), null, 
null, "topic");
         final ProcessorTopology processorTopology = builder.build(null);
         assertThat(processorTopology.source("topic").getTimestampExtractor(), 
instanceOf(MockTimestampExtractor.class));
     }
 
-    @SuppressWarnings("unchecked")
     @Test
-    public void shouldAddTimestampExtractorWithPatternPerSource() throws 
Exception {
+    public void shouldAddTimestampExtractorWithPatternPerSource() {
         final Pattern pattern = Pattern.compile("t.*");
         builder.addSource(null, "source", new MockTimestampExtractor(), null, 
null, pattern);
         final ProcessorTopology processorTopology = builder.build(null);
@@ -659,7 +647,7 @@ public void 
shouldAddTimestampExtractorWithPatternPerSource() throws Exception {
     }
 
     @Test
-    public void shouldSortProcessorNodesCorrectly() throws Exception {
+    public void shouldSortProcessorNodesCorrectly() {
         builder.addSource(null, "source1", null, null, null, "topic1");
         builder.addSource(null, "source2", null, null, null, "topic2");
         builder.addProcessor("processor1", new MockProcessorSupplier(), 
"source1");
@@ -702,11 +690,12 @@ public void shouldSortProcessorNodesCorrectly() throws 
Exception {
         assertEquals(1, node.size);
     }
 
+    @SuppressWarnings("unchecked")
     @Test
     public void shouldConnectRegexMatchedTopicsToStateStore() throws Exception 
{
         builder.addSource(null, "ingest", null, null, null, 
Pattern.compile("topic-\\d+"));
         builder.addProcessor("my-processor", new MockProcessorSupplier(), 
"ingest");
-        builder.addStateStore(new MockStateStoreSupplier("testStateStore", 
false), "my-processor");
+        builder.addStateStore(storeBuilder, "my-processor");
 
         final InternalTopologyBuilder.SubscriptionUpdates subscriptionUpdates 
= new InternalTopologyBuilder.SubscriptionUpdates();
         final Field updatedTopicsField  = 
subscriptionUpdates.getClass().getDeclaredField("updatedTopicSubscriptions");
@@ -722,7 +711,7 @@ public void shouldConnectRegexMatchedTopicsToStateStore() 
throws Exception {
         builder.setApplicationId("test-app");
 
         final Map<String, List<String>> stateStoreAndTopics = 
builder.stateStoreNameToSourceTopics();
-        final List<String> topics = stateStoreAndTopics.get("testStateStore");
+        final List<String> topics = 
stateStoreAndTopics.get(storeBuilder.name());
 
         assertTrue("Expected to contain two topics", topics.size() == 2);
 
@@ -731,11 +720,12 @@ public void shouldConnectRegexMatchedTopicsToStateStore() 
throws Exception {
         assertFalse(topics.contains("topic-A"));
     }
 
+    @SuppressWarnings("unchecked")
     @Test(expected = TopologyException.class)
     public void 
shouldNotAllowToAddGlobalStoreWithSourceNameEqualsProcessorName() {
         final String sameNameForSourceAndProcessor = "sameName";
         builder.addGlobalStore(
-            new MockStateStoreSupplier("anyName", false, false),
+            (StoreBuilder<KeyValueStore>) storeBuilder,
             sameNameForSourceAndProcessor,
             null,
             null,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index 51d4e05b5a2..9f2b2422ba2 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -25,15 +25,14 @@
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TopologyTestDriverWrapper;
+import org.apache.kafka.streams.TopologyWrapper;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.apache.kafka.streams.processor.To;
-import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
@@ -64,7 +63,7 @@
     private static final String OUTPUT_TOPIC_2 = "output-topic-2";
     private static final String THROUGH_TOPIC_1 = "through-topic-1";
 
-    private final TopologyBuilder builder = new TopologyBuilder();
+    private final TopologyWrapper topology = new TopologyWrapper();
     private final MockProcessorSupplier mockProcessorSupplier = new 
MockProcessorSupplier();
     private final ConsumerRecordFactory<String, String> recordFactory = new 
ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER, 0L);
 
@@ -94,36 +93,36 @@ public void cleanup() {
 
     @Test
     public void testTopologyMetadata() {
-        builder.setApplicationId("X");
+        topology.setApplicationId("X");
 
-        builder.addSource("source-1", "topic-1");
-        builder.addSource("source-2", "topic-2", "topic-3");
-        builder.addProcessor("processor-1", new MockProcessorSupplier<>(), 
"source-1");
-        builder.addProcessor("processor-2", new MockProcessorSupplier<>(), 
"source-1", "source-2");
-        builder.addSink("sink-1", "topic-3", "processor-1");
-        builder.addSink("sink-2", "topic-4", "processor-1", "processor-2");
+        topology.addSource("source-1", "topic-1");
+        topology.addSource("source-2", "topic-2", "topic-3");
+        topology.addProcessor("processor-1", new MockProcessorSupplier<>(), 
"source-1");
+        topology.addProcessor("processor-2", new MockProcessorSupplier<>(), 
"source-1", "source-2");
+        topology.addSink("sink-1", "topic-3", "processor-1");
+        topology.addSink("sink-2", "topic-4", "processor-1", "processor-2");
 
-        final ProcessorTopology topology = builder.build(null);
+        final ProcessorTopology processorTopology = 
topology.getInternalBuilder().build();
 
-        assertEquals(6, topology.processors().size());
+        assertEquals(6, processorTopology.processors().size());
 
-        assertEquals(2, topology.sources().size());
+        assertEquals(2, processorTopology.sources().size());
 
-        assertEquals(3, topology.sourceTopics().size());
+        assertEquals(3, processorTopology.sourceTopics().size());
 
-        assertNotNull(topology.source("topic-1"));
+        assertNotNull(processorTopology.source("topic-1"));
 
-        assertNotNull(topology.source("topic-2"));
+        assertNotNull(processorTopology.source("topic-2"));
 
-        assertNotNull(topology.source("topic-3"));
+        assertNotNull(processorTopology.source("topic-3"));
 
-        assertEquals(topology.source("topic-2"), topology.source("topic-3"));
+        assertEquals(processorTopology.source("topic-2"), 
processorTopology.source("topic-3"));
     }
 
     @Test
     public void testDrivingSimpleTopology() {
         int partition = 10;
-        driver = new 
TopologyTestDriverWrapper(createSimpleTopology(partition).internalTopologyBuilder,
 props);
+        driver = new 
TopologyTestDriverWrapper(createSimpleTopology(partition), props);
         driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", 
"value1"));
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1", partition);
         assertNoOutputRecord(OUTPUT_TOPIC_2);
@@ -144,7 +143,7 @@ public void testDrivingSimpleTopology() {
 
     @Test
     public void testDrivingMultiplexingTopology() {
-        driver = new 
TopologyTestDriverWrapper(createMultiplexingTopology().internalTopologyBuilder, 
props);
+        driver = new TopologyTestDriverWrapper(createMultiplexingTopology(), 
props);
         driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", 
"value1"));
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1(1)");
         assertNextOutputRecord(OUTPUT_TOPIC_2, "key1", "value1(2)");
@@ -166,7 +165,7 @@ public void testDrivingMultiplexingTopology() {
 
     @Test
     public void testDrivingMultiplexByNameTopology() {
-        driver = new 
TopologyTestDriverWrapper(createMultiplexByNameTopology().internalTopologyBuilder,
 props);
+        driver = new 
TopologyTestDriverWrapper(createMultiplexByNameTopology(), props);
         driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", 
"value1"));
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1(1)");
         assertNextOutputRecord(OUTPUT_TOPIC_2, "key1", "value1(2)");
@@ -189,7 +188,7 @@ public void testDrivingMultiplexByNameTopology() {
     @Test
     public void testDrivingStatefulTopology() {
         String storeName = "entries";
-        driver = new 
TopologyTestDriverWrapper(createStatefulTopology(storeName).internalTopologyBuilder,
 props);
+        driver = new 
TopologyTestDriverWrapper(createStatefulTopology(storeName), props);
         driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", 
"value1"));
         driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key2", 
"value2"));
         driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key3", 
"value3"));
@@ -203,19 +202,17 @@ public void testDrivingStatefulTopology() {
         assertNull(store.get("key4"));
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldDriveGlobalStore() {
         final String storeName = "my-store";
-        final StateStoreSupplier storeSupplier = Stores.create(storeName)
-                
.withStringKeys().withStringValues().inMemory().disableLogging().build();
         final String global = "global";
         final String topic = "topic";
-        final TopologyBuilder topologyBuilder = this.builder
-                .addGlobalStore(storeSupplier, global, STRING_DESERIALIZER, 
STRING_DESERIALIZER, topic, "processor", define(new 
StatefulProcessor(storeName)));
 
-        driver = new 
TopologyTestDriverWrapper(topologyBuilder.internalTopologyBuilder, props);
-        final KeyValueStore<String, String> globalStore = 
(KeyValueStore<String, String>) 
topologyBuilder.globalStateStores().get("my-store");
+        
topology.addGlobalStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName),
 Serdes.String(), Serdes.String()).withLoggingDisabled(),
+                global, STRING_DESERIALIZER, STRING_DESERIALIZER, topic, 
"processor", define(new StatefulProcessor(storeName)));
+
+        driver = new TopologyTestDriverWrapper(topology.getInternalBuilder(), 
props);
+        final KeyValueStore<String, String> globalStore = 
driver.getKeyValueStore(storeName);
         driver.pipeInput(recordFactory.create(topic, "key1", "value1"));
         driver.pipeInput(recordFactory.create(topic, "key2", "value2"));
         assertEquals("value1", globalStore.get("key1"));
@@ -225,7 +222,7 @@ public void shouldDriveGlobalStore() {
     @Test
     public void testDrivingSimpleMultiSourceTopology() {
         final int partition = 10;
-        driver = new 
TopologyTestDriverWrapper(createSimpleMultiSourceTopology(partition).internalTopologyBuilder,
 props);
+        driver = new 
TopologyTestDriverWrapper(createSimpleMultiSourceTopology(partition), props);
 
         driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", 
"value1"));
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1", partition);
@@ -238,7 +235,7 @@ public void testDrivingSimpleMultiSourceTopology() {
 
     @Test
     public void testDrivingForwardToSourceTopology() {
-        driver = new 
TopologyTestDriverWrapper(createForwardToSourceTopology().internalTopologyBuilder,
 props);
+        driver = new 
TopologyTestDriverWrapper(createForwardToSourceTopology(), props);
         driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", 
"value1"));
         driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key2", 
"value2"));
         driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key3", 
"value3"));
@@ -249,7 +246,7 @@ public void testDrivingForwardToSourceTopology() {
 
     @Test
     public void testDrivingInternalRepartitioningTopology() {
-        driver = new 
TopologyTestDriverWrapper(createInternalRepartitioningTopology().internalTopologyBuilder,
 props);
+        driver = new 
TopologyTestDriverWrapper(createInternalRepartitioningTopology(), props);
         driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", 
"value1"));
         driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key2", 
"value2"));
         driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key3", 
"value3"));
@@ -260,7 +257,7 @@ public void testDrivingInternalRepartitioningTopology() {
 
     @Test
     public void testDrivingInternalRepartitioningForwardingTimestampTopology() 
{
-        driver = new 
TopologyTestDriverWrapper(createInternalRepartitioningWithValueTimestampTopology().internalTopologyBuilder,
 props);
+        driver = new 
TopologyTestDriverWrapper(createInternalRepartitioningWithValueTimestampTopology(),
 props);
         driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", 
"value1@1000"));
         driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key2", 
"value2@2000"));
         driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key3", 
"value3@3000"));
@@ -274,29 +271,29 @@ public void 
testDrivingInternalRepartitioningForwardingTimestampTopology() {
 
     @Test
     public void shouldCreateStringWithSourceAndTopics() {
-        builder.addSource("source", "topic1", "topic2");
-        final ProcessorTopology topology = builder.build(null);
-        final String result = topology.toString();
+        topology.addSource("source", "topic1", "topic2");
+        final ProcessorTopology processorTopology = 
topology.getInternalBuilder().build();
+        final String result = processorTopology.toString();
         assertThat(result, containsString("source:\n\t\ttopics:\t\t[topic1, 
topic2]\n"));
     }
 
     @Test
     public void shouldCreateStringWithMultipleSourcesAndTopics() {
-        builder.addSource("source", "topic1", "topic2");
-        builder.addSource("source2", "t", "t1", "t2");
-        final ProcessorTopology topology = builder.build(null);
-        final String result = topology.toString();
+        topology.addSource("source", "topic1", "topic2");
+        topology.addSource("source2", "t", "t1", "t2");
+        final ProcessorTopology processorTopology = 
topology.getInternalBuilder().build();
+        final String result = processorTopology.toString();
         assertThat(result, containsString("source:\n\t\ttopics:\t\t[topic1, 
topic2]\n"));
         assertThat(result, containsString("source2:\n\t\ttopics:\t\t[t, t1, 
t2]\n"));
     }
 
     @Test
     public void shouldCreateStringWithProcessors() {
-        builder.addSource("source", "t")
+        topology.addSource("source", "t")
                 .addProcessor("processor", mockProcessorSupplier, "source")
                 .addProcessor("other", mockProcessorSupplier, "source");
-        final ProcessorTopology topology = builder.build(null);
-        final String result = topology.toString();
+        final ProcessorTopology processorTopology = 
topology.getInternalBuilder().build();
+        final String result = processorTopology.toString();
         assertThat(result, containsString("\t\tchildren:\t[processor, 
other]"));
         assertThat(result, containsString("processor:\n"));
         assertThat(result, containsString("other:\n"));
@@ -304,14 +301,14 @@ public void shouldCreateStringWithProcessors() {
 
     @Test
     public void shouldRecursivelyPrintChildren() {
-        builder.addSource("source", "t")
+        topology.addSource("source", "t")
                 .addProcessor("processor", mockProcessorSupplier, "source")
                 .addProcessor("child-one", mockProcessorSupplier, "processor")
                 .addProcessor("child-one-one", mockProcessorSupplier, 
"child-one")
                 .addProcessor("child-two", mockProcessorSupplier, "processor")
                 .addProcessor("child-two-one", mockProcessorSupplier, 
"child-two");
 
-        final String result = builder.build(null).toString();
+        final String result = topology.getInternalBuilder().build().toString();
         assertThat(result, 
containsString("child-one:\n\t\tchildren:\t[child-one-one]"));
         assertThat(result, 
containsString("child-two:\n\t\tchildren:\t[child-two-one]"));
     }
@@ -319,7 +316,7 @@ public void shouldRecursivelyPrintChildren() {
     @Test
     public void shouldConsiderTimeStamps() {
         final int partition = 10;
-        driver = new 
TopologyTestDriverWrapper(createSimpleTopology(partition).internalTopologyBuilder,
 props);
+        driver = new 
TopologyTestDriverWrapper(createSimpleTopology(partition), props);
         driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1", 
10L));
         driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key2", "value2", 
20L));
         driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key3", "value3", 
30L));
@@ -331,7 +328,7 @@ public void shouldConsiderTimeStamps() {
     @Test
     public void shouldConsiderModifiedTimeStamps() {
         final int partition = 10;
-        driver = new 
TopologyTestDriverWrapper(createTimestampTopology(partition).internalTopologyBuilder,
 props);
+        driver = new 
TopologyTestDriverWrapper(createTimestampTopology(partition), props);
         driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1", 
10L));
         driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key2", "value2", 
20L));
         driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key3", "value3", 
30L));
@@ -379,80 +376,92 @@ public Integer partition(final Object key, final Object 
value, final int numPart
         };
     }
 
-    private TopologyBuilder createSimpleTopology(final int partition) {
-        return builder
+    private InternalTopologyBuilder createSimpleTopology(final int partition) {
+        return ((TopologyWrapper) topology
             .addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
             .addProcessor("processor", define(new ForwardingProcessor()), 
"source")
-            .addSink("sink", OUTPUT_TOPIC_1, constantPartitioner(partition), 
"processor");
+            .addSink("sink", OUTPUT_TOPIC_1, constantPartitioner(partition), 
"processor"))
+            .getInternalBuilder();
     }
 
-    private TopologyBuilder createTimestampTopology(final int partition) {
-        return builder
+    private InternalTopologyBuilder createTimestampTopology(final int 
partition) {
+        return ((TopologyWrapper) topology
             .addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
             .addProcessor("processor", define(new TimestampProcessor()), 
"source")
-            .addSink("sink", OUTPUT_TOPIC_1, constantPartitioner(partition), 
"processor");
+            .addSink("sink", OUTPUT_TOPIC_1, constantPartitioner(partition), 
"processor"))
+            .getInternalBuilder();
     }
 
-    private TopologyBuilder createMultiplexingTopology() {
-        return builder
+    private InternalTopologyBuilder createMultiplexingTopology() {
+        return ((TopologyWrapper) topology
             .addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
             .addProcessor("processor", define(new MultiplexingProcessor(2)), 
"source")
             .addSink("sink1", OUTPUT_TOPIC_1, "processor")
-            .addSink("sink2", OUTPUT_TOPIC_2, "processor");
+            .addSink("sink2", OUTPUT_TOPIC_2, "processor"))
+            .getInternalBuilder();
     }
 
-    private TopologyBuilder createMultiplexByNameTopology() {
-        return builder
+    private InternalTopologyBuilder createMultiplexByNameTopology() {
+        return ((TopologyWrapper) topology
             .addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
             .addProcessor("processor", define(new 
MultiplexByNameProcessor(2)), "source")
             .addSink("sink0", OUTPUT_TOPIC_1, "processor")
-            .addSink("sink1", OUTPUT_TOPIC_2, "processor");
+            .addSink("sink1", OUTPUT_TOPIC_2, "processor"))
+                .getInternalBuilder();
     }
 
-    private TopologyBuilder createStatefulTopology(final String storeName) {
-        return builder
+    private InternalTopologyBuilder createStatefulTopology(final String 
storeName) {
+        return ((TopologyWrapper) topology
             .addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
             .addProcessor("processor", define(new 
StatefulProcessor(storeName)), "source")
-            .addStateStore(
-                
Stores.create(storeName).withStringKeys().withStringValues().inMemory().build(),
-                "processor"
-            )
-            .addSink("counts", OUTPUT_TOPIC_1, "processor");
+            
.addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName),
 Serdes.String(), Serdes.String()), "processor")
+            .addSink("counts", OUTPUT_TOPIC_1, "processor"))
+            .getInternalBuilder();
     }
 
-    private TopologyBuilder createInternalRepartitioningTopology() {
-        return builder
+    private InternalTopologyBuilder createInternalRepartitioningTopology() {
+        final InternalTopologyBuilder internalTopologyBuilder = 
((TopologyWrapper) topology
             .addSource("source", INPUT_TOPIC_1)
-            .addInternalTopic(THROUGH_TOPIC_1)
             .addSink("sink0", THROUGH_TOPIC_1, "source")
             .addSource("source1", THROUGH_TOPIC_1)
-            .addSink("sink1", OUTPUT_TOPIC_1, "source1");
+            .addSink("sink1", OUTPUT_TOPIC_1, "source1"))
+            .getInternalBuilder();
+
+        internalTopologyBuilder.addInternalTopic(THROUGH_TOPIC_1);
+
+        return internalTopologyBuilder;
     }
 
-    private TopologyBuilder 
createInternalRepartitioningWithValueTimestampTopology() {
-        return builder
+    private InternalTopologyBuilder 
createInternalRepartitioningWithValueTimestampTopology() {
+        final InternalTopologyBuilder internalTopologyBuilder = 
((TopologyWrapper) topology
             .addSource("source", INPUT_TOPIC_1)
-            .addInternalTopic(THROUGH_TOPIC_1)
             .addProcessor("processor", define(new ValueTimestampProcessor()), 
"source")
             .addSink("sink0", THROUGH_TOPIC_1, "processor")
             .addSource("source1", THROUGH_TOPIC_1)
-            .addSink("sink1", OUTPUT_TOPIC_1, "source1");
+            .addSink("sink1", OUTPUT_TOPIC_1, "source1"))
+            .getInternalBuilder();
+
+        internalTopologyBuilder.addInternalTopic(THROUGH_TOPIC_1);
+
+        return internalTopologyBuilder;
     }
 
-    private TopologyBuilder createForwardToSourceTopology() {
-        return builder.addSource("source-1", INPUT_TOPIC_1)
+    private InternalTopologyBuilder createForwardToSourceTopology() {
+        return ((TopologyWrapper) topology.addSource("source-1", INPUT_TOPIC_1)
                 .addSink("sink-1", OUTPUT_TOPIC_1, "source-1")
                 .addSource("source-2", OUTPUT_TOPIC_1)
-                .addSink("sink-2", OUTPUT_TOPIC_2, "source-2");
+                .addSink("sink-2", OUTPUT_TOPIC_2, "source-2"))
+                .getInternalBuilder();
     }
 
-    private TopologyBuilder createSimpleMultiSourceTopology(int partition) {
-        return builder.addSource("source-1", STRING_DESERIALIZER, 
STRING_DESERIALIZER, INPUT_TOPIC_1)
+    private InternalTopologyBuilder createSimpleMultiSourceTopology(int 
partition) {
+        return ((TopologyWrapper) topology.addSource("source-1", 
STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
                 .addProcessor("processor-1", define(new 
ForwardingProcessor()), "source-1")
                 .addSink("sink-1", OUTPUT_TOPIC_1, 
constantPartitioner(partition), "processor-1")
                 .addSource("source-2", STRING_DESERIALIZER, 
STRING_DESERIALIZER, INPUT_TOPIC_2)
                 .addProcessor("processor-2", define(new 
ForwardingProcessor()), "source-2")
-                .addSink("sink-2", OUTPUT_TOPIC_2, 
constantPartitioner(partition), "processor-2");
+                .addSink("sink-2", OUTPUT_TOPIC_2, 
constantPartitioner(partition), "processor-2"))
+                .getInternalBuilder();
     }
 
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 54ea1ce7329..9090012fba2 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -347,7 +347,6 @@ public void shouldCheckpointStoreOffsetsOnCommit() throws 
IOException {
         );
         task.initializeStateStores();
 
-
         restoreStateConsumer.assign(new 
ArrayList<>(task.checkpointedOffsets().keySet()));
 
         final byte[] serializedValue = 
Serdes.Integer().serializer().serialize("", 1);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index 4e04b4985ed..0048e730318 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -23,6 +23,7 @@
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
@@ -40,10 +41,10 @@
 import 
org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
 import org.apache.kafka.streams.state.HostInfo;
 import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.test.MockClientSupplier;
 import org.apache.kafka.test.MockInternalTopicManager;
 import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.MockStateStoreSupplier;
 import org.easymock.Capture;
 import org.easymock.EasyMock;
 import org.junit.Test;
@@ -328,10 +329,10 @@ public void 
shouldAssignEvenlyAcrossConsumersOneClientMultipleThreads() {
     public void testAssignWithPartialTopology() {
         builder.addSource(null, "source1", null, null, null, "topic1");
         builder.addProcessor("processor1", new MockProcessorSupplier(), 
"source1");
-        builder.addStateStore(new MockStateStoreSupplier("store1", false), 
"processor1");
+        
builder.addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("store1"),
 Serdes.ByteArray(), Serdes.ByteArray()), "processor1");
         builder.addSource(null, "source2", null, null, null, "topic2");
         builder.addProcessor("processor2", new MockProcessorSupplier(), 
"source2");
-        builder.addStateStore(new MockStateStoreSupplier("store2", false), 
"processor2");
+        
builder.addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("store2"),
 Serdes.ByteArray(), Serdes.ByteArray()), "processor2");
         List<String> topics = Utils.mkList("topic1", "topic2");
         Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
 
@@ -469,11 +470,11 @@ public void testAssignWithStates() {
         builder.addSource(null, "source2", null, null, null, "topic2");
 
         builder.addProcessor("processor-1", new MockProcessorSupplier(), 
"source1");
-        builder.addStateStore(new MockStateStoreSupplier("store1", false), 
"processor-1");
+        
builder.addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("store1"),
 Serdes.ByteArray(), Serdes.ByteArray()), "processor-1");
 
         builder.addProcessor("processor-2", new MockProcessorSupplier(), 
"source2");
-        builder.addStateStore(new MockStateStoreSupplier("store2", false), 
"processor-2");
-        builder.addStateStore(new MockStateStoreSupplier("store3", false), 
"processor-2");
+        
builder.addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("store2"),
 Serdes.ByteArray(), Serdes.ByteArray()), "processor-2");
+        
builder.addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("store3"),
 Serdes.ByteArray(), Serdes.ByteArray()), "processor-2");
 
         List<String> topics = Utils.mkList("topic1", "topic2");
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java 
b/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
index 665ebc0b418..5383c27ac43 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.streams.state;
 
 import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
 import org.apache.kafka.streams.state.internals.MemoryNavigableLRUCache;
 import org.apache.kafka.streams.state.internals.RocksDBSessionStore;
@@ -25,16 +24,10 @@
 import org.apache.kafka.streams.state.internals.RocksDBWindowStore;
 import org.junit.Test;
 
-import java.util.Collections;
-import java.util.Map;
-
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.IsInstanceOf.instanceOf;
 import static org.hamcrest.core.IsNot.not;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 public class StoresTest {
@@ -104,70 +97,10 @@ public void 
shouldThrowIfSupplierIsNullForSessionStoreBuilder() {
         Stores.sessionStoreBuilder(null, Serdes.ByteArray(), 
Serdes.ByteArray());
     }
 
-    @SuppressWarnings("deprecation")
-    @Test
-    public void shouldCreateInMemoryStoreSupplierWithLoggedConfig() {
-        final StateStoreSupplier supplier = Stores.create("store")
-                .withKeys(Serdes.String())
-                .withValues(Serdes.String())
-                .inMemory()
-                .enableLogging(Collections.singletonMap("retention.ms", 
"1000"))
-                .build();
-
-        final Map<String, String> config = supplier.logConfig();
-        assertTrue(supplier.loggingEnabled());
-        assertEquals("1000", config.get("retention.ms"));
-    }
-
-    @SuppressWarnings("deprecation")
-    @Test
-    public void shouldCreateInMemoryStoreSupplierNotLogged() {
-        final StateStoreSupplier supplier = Stores.create("store")
-                .withKeys(Serdes.String())
-                .withValues(Serdes.String())
-                .inMemory()
-                .disableLogging()
-                .build();
-
-        assertFalse(supplier.loggingEnabled());
-    }
-
-    @SuppressWarnings("deprecation")
-    @Test
-    public void shouldCreatePersistenStoreSupplierWithLoggedConfig() {
-        final StateStoreSupplier supplier = Stores.create("store")
-                .withKeys(Serdes.String())
-                .withValues(Serdes.String())
-                .persistent()
-                .enableLogging(Collections.singletonMap("retention.ms", 
"1000"))
-                .build();
-
-        final Map<String, String> config = supplier.logConfig();
-        assertTrue(supplier.loggingEnabled());
-        assertEquals("1000", config.get("retention.ms"));
-    }
-
-    @SuppressWarnings("deprecation")
-    @Test
-    public void shouldCreatePersistenStoreSupplierNotLogged() {
-        final StateStoreSupplier supplier = Stores.create("store")
-                .withKeys(Serdes.String())
-                .withValues(Serdes.String())
-                .persistent()
-                .disableLogging()
-                .build();
-
-        assertFalse(supplier.loggingEnabled());
-    }
-
     @Test
     public void 
shouldThrowIllegalArgumentExceptionWhenTryingToConstructWindowStoreWithLessThanTwoSegments()
 {
-        final Stores.PersistentKeyValueFactory<String, String> storeFactory = 
Stores.create("store")
-                .withKeys(Serdes.String())
-                .withValues(Serdes.String())
-                .persistent();
         try {
-            storeFactory.windowed(1, 1, 1, false);
+            Stores.persistentWindowStore("store", 1, 1, 1, false);
             fail("Should have thrown illegal argument exception as number of 
segments is less than 2");
         } catch (final IllegalArgumentException e) {
          // ok
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
index 4ff0b907916..a061ff2117d 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
@@ -16,12 +16,18 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.StateSerdes;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.NoOpReadOnlyStore;
+import org.apache.kafka.test.NoOpRecordCollector;
 import org.apache.kafka.test.StateStoreProviderStub;
 import org.junit.Before;
 import org.junit.Test;
@@ -64,7 +70,16 @@ public void before() {
     }
 
     private KeyValueStore<String, String> newStoreInstance() {
-        return StateStoreTestUtils.newKeyValueStore(storeName, "app-id", 
String.class, String.class);
+        final KeyValueStore<String, String> store = 
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName),
+                Serdes.String(),
+                Serdes.String())
+                .build();
+
+        store.init(new InternalMockProcessorContext(new 
StateSerdes<>(ProcessorStateManager.storeChangelogTopic("appId", storeName), 
Serdes.String(), Serdes.String()),
+                                                    new NoOpRecordCollector()),
+                store);
+
+        return store;
     }
 
     @Test
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java
deleted file mode 100644
index b25b8cbb00a..00000000000
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.streams.StreamsMetrics;
-import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
-import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.test.InternalMockProcessorContext;
-import org.apache.kafka.test.NoOpRecordCollector;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.core.IsInstanceOf.instanceOf;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-public class RocksDBKeyValueStoreSupplierTest {
-
-    private static final String STORE_NAME = "name";
-    private final ThreadCache cache = new ThreadCache(new LogContext("test "), 
1024, new MockStreamsMetrics(new Metrics()));
-    private final InternalMockProcessorContext context = new 
InternalMockProcessorContext(TestUtils.tempDirectory(),
-                                                                          
Serdes.String(),
-                                                                          
Serdes.String(),
-                                                                          new 
NoOpRecordCollector(),
-                                                                          
cache);
-    private KeyValueStore<String, String> store;
-
-    @After
-    public void close() {
-        store.close();
-    }
-
-    @Test
-    public void shouldCreateLoggingEnabledStoreWhenStoreLogged() {
-        store = createStore(true, false);
-        final List<ProducerRecord> logged = new ArrayList<>();
-        final NoOpRecordCollector collector = new NoOpRecordCollector() {
-            @Override
-            public <K, V> void send(final String topic,
-                                    K key,
-                                    V value,
-                                    Integer partition,
-                                    Long timestamp,
-                                    Serializer<K> keySerializer,
-                                    Serializer<V> valueSerializer) {
-                logged.add(new ProducerRecord<K, V>(topic, partition, 
timestamp, key, value));
-            }
-        };
-        final InternalMockProcessorContext context = new 
InternalMockProcessorContext(TestUtils.tempDirectory(),
-                                                                      
Serdes.String(),
-                                                                      
Serdes.String(),
-                                                                      
collector,
-                                                                      cache);
-        context.setTime(1);
-        store.init(context, store);
-        store.put("a", "b");
-        assertFalse(logged.isEmpty());
-    }
-
-    @Test
-    public void shouldNotBeLoggingEnabledStoreWhenLoggingNotEnabled() {
-        store = createStore(false, false);
-        final List<ProducerRecord> logged = new ArrayList<>();
-        final NoOpRecordCollector collector = new NoOpRecordCollector() {
-            @Override
-            public <K, V> void send(final String topic,
-                                    K key,
-                                    V value,
-                                    Integer partition,
-                                    Long timestamp,
-                                    Serializer<K> keySerializer,
-                                    Serializer<V> valueSerializer) {
-                logged.add(new ProducerRecord<>(topic, partition, timestamp, 
key, value));
-            }
-        };
-        final InternalMockProcessorContext context = new 
InternalMockProcessorContext(TestUtils.tempDirectory(),
-                                                                      
Serdes.String(),
-                                                                      
Serdes.String(),
-                                                                      
collector,
-                                                                      cache);
-        context.setTime(1);
-        store.init(context, store);
-        store.put("a", "b");
-        assertTrue(logged.isEmpty());
-    }
-
-    @Test
-    public void shouldHaveCachedKeyValueStoreWhenCachingEnabled() {
-        store = createStore(false, true);
-        store.init(context, store);
-        context.setTime(1);
-        store.put("a", "b");
-        store.put("b", "c");
-        assertThat(((WrappedStateStore) store).wrappedStore(), 
is(instanceOf(CachingKeyValueStore.class)));
-        assertThat(cache.size(), is(2L));
-    }
-
-    @Test
-    public void shouldReturnMeteredStoreWhenCachingAndLoggingDisabled() {
-        store = createStore(false, false);
-        assertThat(store, is(instanceOf(MeteredKeyValueBytesStore.class)));
-    }
-
-    @Test
-    public void shouldReturnMeteredStoreWhenCachingDisabled() {
-        store = createStore(true, false);
-        assertThat(store, is(instanceOf(MeteredKeyValueBytesStore.class)));
-    }
-
-    @Test
-    public void shouldHaveMeteredStoreWhenCached() {
-        store = createStore(false, true);
-        store.init(context, store);
-        final StreamsMetrics metrics = context.metrics();
-        assertFalse(metrics.metrics().isEmpty());
-    }
-
-    @Test
-    public void shouldHaveMeteredStoreWhenLogged() {
-        store = createStore(true, false);
-        store.init(context, store);
-        final StreamsMetrics metrics = context.metrics();
-        assertFalse(metrics.metrics().isEmpty());
-    }
-
-    @SuppressWarnings("unchecked")
-    private KeyValueStore<String, String> createStore(final boolean logged, 
final boolean cached) {
-        return new RocksDBKeyValueStoreSupplier<>(STORE_NAME,
-                                                  Serdes.String(),
-                                                  Serdes.String(),
-                                                  logged,
-                                                  Collections.EMPTY_MAP,
-                                                  cached).get();
-    }
-
-}
\ No newline at end of file
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java
deleted file mode 100644
index c50dfba2942..00000000000
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.streams.StreamsMetrics;
-import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.kstream.internals.SessionWindow;
-import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
-import org.apache.kafka.streams.state.SessionStore;
-import org.apache.kafka.test.InternalMockProcessorContext;
-import org.apache.kafka.test.NoOpRecordCollector;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.core.IsInstanceOf.instanceOf;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-public class RocksDBSessionStoreSupplierTest {
-
-    private static final String STORE_NAME = "name";
-    private final List<ProducerRecord> logged = new ArrayList<>();
-    private final ThreadCache cache = new ThreadCache(new LogContext("test "), 
1024, new MockStreamsMetrics(new Metrics()));
-    private final InternalMockProcessorContext context = new 
InternalMockProcessorContext(TestUtils.tempDirectory(),
-        Serdes.String(),
-        Serdes.String(),
-        new NoOpRecordCollector() {
-            @Override
-            public <K, V> void send(final String topic,
-                                    final K key,
-                                    final V value,
-                                    final Integer partition,
-                                    final Long timestamp,
-                                    final Serializer<K> keySerializer,
-                                    final Serializer<V> valueSerializer) {
-                logged.add(new ProducerRecord<>(topic, partition, timestamp, 
key, value));
-            }
-        },
-        cache);
-
-    private SessionStore<String, String> store;
-
-    @After
-    public void close() {
-        store.close();
-    }
-
-    @Test
-    public void shouldCreateLoggingEnabledStoreWhenStoreLogged() {
-        store = createStore(true, false);
-        context.setTime(1);
-        store.init(context, store);
-        store.put(new Windowed<>("a", new SessionWindow(0, 10)), "b");
-        assertFalse(logged.isEmpty());
-    }
-
-    @Test
-    public void shouldNotBeLoggingEnabledStoreWhenLoggingNotEnabled() {
-        store = createStore(false, false);
-        context.setTime(1);
-        store.init(context, store);
-        store.put(new Windowed<>("a", new SessionWindow(0, 10)), "b");
-        assertTrue(logged.isEmpty());
-    }
-
-    @Test
-    public void shouldReturnCachedSessionStoreWhenCachingEnabled() {
-        store = createStore(false, true);
-        store.init(context, store);
-        context.setTime(1);
-        store.put(new Windowed<>("a", new SessionWindow(0, 10)), "b");
-        store.put(new Windowed<>("b", new SessionWindow(0, 10)), "c");
-        assertThat(((WrappedStateStore) store).wrappedStore(), 
is(instanceOf(CachingSessionStore.class)));
-        assertThat(cache.size(), is(2L));
-    }
-
-    @Test
-    public void shouldHaveMeteredStoreWhenCached() {
-        store = createStore(false, true);
-        store.init(context, store);
-        final StreamsMetrics metrics = context.metrics();
-        assertFalse(metrics.metrics().isEmpty());
-    }
-
-    @Test
-    public void shouldHaveMeteredStoreWhenLogged() {
-        store = createStore(true, false);
-        store.init(context, store);
-        final StreamsMetrics metrics = context.metrics();
-        assertFalse(metrics.metrics().isEmpty());
-    }
-
-    @Test
-    public void shouldHaveMeteredStoreWhenNotLoggedOrCached() {
-        store = createStore(false, false);
-        store.init(context, store);
-        final StreamsMetrics metrics = context.metrics();
-        assertFalse(metrics.metrics().isEmpty());
-    }
-
-
-
-    private SessionStore<String, String> createStore(final boolean logged, 
final boolean cached) {
-        return new RocksDBSessionStoreSupplier<>(STORE_NAME,
-                                                 10,
-                                                 Serdes.String(),
-                                                 Serdes.String(),
-                                                 logged,
-                                                 Collections.<String, 
String>emptyMap(),
-                                                 cached).get();
-    }
-
-}
\ No newline at end of file
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java
deleted file mode 100644
index 7409a13c154..00000000000
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.streams.StreamsMetrics;
-import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
-import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.test.InternalMockProcessorContext;
-import org.apache.kafka.test.NoOpRecordCollector;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.core.IsInstanceOf.instanceOf;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-public class RocksDBWindowStoreSupplierTest {
-
-    private static final String STORE_NAME = "name";
-    private WindowStore<String, String> store;
-    private final ThreadCache cache = new ThreadCache(new LogContext("test "), 
1024, new MockStreamsMetrics(new Metrics()));
-    private final InternalMockProcessorContext context = new 
InternalMockProcessorContext(TestUtils.tempDirectory(),
-                                                                          
Serdes.String(),
-                                                                          
Serdes.String(),
-                                                                          new 
NoOpRecordCollector(),
-                                                                          
cache);
-
-    @After
-    public void close() {
-        if (store != null) {
-            store.close();
-        }
-    }
-
-    @Test
-    public void shouldCreateLoggingEnabledStoreWhenWindowStoreLogged() {
-        store = createStore(true, false, 3);
-        final List<ProducerRecord> logged = new ArrayList<>();
-        final NoOpRecordCollector collector = new NoOpRecordCollector() {
-            @Override
-            public <K, V> void send(final String topic,
-                                    K key,
-                                    V value,
-                                    Integer partition,
-                                    Long timestamp,
-                                    Serializer<K> keySerializer,
-                                    Serializer<V> valueSerializer) {
-                logged.add(new ProducerRecord<K, V>(topic, partition, 
timestamp, key, value));
-            }
-        };
-        final InternalMockProcessorContext context = new 
InternalMockProcessorContext(TestUtils.tempDirectory(),
-                                                                      
Serdes.String(),
-                                                                      
Serdes.String(),
-                                                                      
collector,
-                                                                      cache);
-        context.setTime(1);
-        store.init(context, store);
-        store.put("a", "b");
-        assertFalse(logged.isEmpty());
-    }
-
-    @Test
-    public void shouldNotBeLoggingEnabledStoreWhenLogginNotEnabled() {
-        store = createStore(false, false, 3);
-        final List<ProducerRecord> logged = new ArrayList<>();
-        final NoOpRecordCollector collector = new NoOpRecordCollector() {
-            @Override
-            public <K, V> void send(final String topic,
-                                    K key,
-                                    V value,
-                                    Integer partition,
-                                    Long timestamp,
-                                    Serializer<K> keySerializer,
-                                    Serializer<V> valueSerializer) {
-                logged.add(new ProducerRecord<K, V>(topic, partition, 
timestamp, key, value));
-            }
-        };
-        final InternalMockProcessorContext context = new 
InternalMockProcessorContext(TestUtils.tempDirectory(),
-                                                                      
Serdes.String(),
-                                                                      
Serdes.String(),
-                                                                      
collector,
-                                                                      cache);
-        context.setTime(1);
-        store.init(context, store);
-        store.put("a", "b");
-        assertTrue(logged.isEmpty());
-    }
-
-    @Test
-    public void shouldBeCachedWindowStoreWhenCachingEnabled() {
-        store = createStore(false, true, 3);
-        store.init(context, store);
-        context.setTime(1);
-        store.put("a", "b");
-        store.put("b", "c");
-        assertThat(((WrappedStateStore) store).wrappedStore(), 
is(instanceOf(CachingWindowStore.class)));
-        assertThat(context.getCache().size(), is(2L));
-    }
-
-    @Test
-    public void shouldHaveMeteredStoreAsOuterMost() {
-        assertThat(createStore(false, false, 2), 
instanceOf(MeteredWindowStore.class));
-        assertThat(createStore(false, true, 2), 
instanceOf(MeteredWindowStore.class));
-        assertThat(createStore(true, false, 2), 
instanceOf(MeteredWindowStore.class));
-    }
-    @Test
-    public void shouldHaveMeteredStoreWhenCached() {
-        store = createStore(false, true, 3);
-        store.init(context, store);
-        final StreamsMetrics metrics = context.metrics();
-        assertFalse(metrics.metrics().isEmpty());
-    }
-
-    @Test
-    public void shouldHaveMeteredStoreWhenLogged() {
-        store = createStore(true, false, 3);
-        store.init(context, store);
-        final StreamsMetrics metrics = context.metrics();
-        assertFalse(metrics.metrics().isEmpty());
-    }
-
-    @Test
-    public void shouldHaveMeteredStoreWhenNotLoggedOrCached() {
-        store = createStore(false, false, 3);
-        store.init(context, store);
-        final StreamsMetrics metrics = context.metrics();
-        assertFalse(metrics.metrics().isEmpty());
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void shouldThrowIllegalArgumentExceptionIfNumSegmentsLessThanTwo() {
-        createStore(true, true, 1);
-    }
-
-    @SuppressWarnings("unchecked")
-    private WindowStore<String, String> createStore(final boolean logged, 
final boolean cached, final int numSegments) {
-        return new RocksDBWindowStoreSupplier<>(STORE_NAME,
-                                                10,
-                                                numSegments,
-                                                false,
-                                                Serdes.String(),
-                                                Serdes.String(),
-                                                10,
-                                                logged,
-                                                Collections.<String, 
String>emptyMap(),
-                                                cached).get();
-    }
-
-}
\ No newline at end of file
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java
deleted file mode 100644
index b1818c21ef1..00000000000
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
-import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.StateSerdes;
-import org.apache.kafka.test.InternalMockProcessorContext;
-import org.apache.kafka.test.NoOpRecordCollector;
-
-import java.util.Collections;
-
-@SuppressWarnings("unchecked")
-public class StateStoreTestUtils {
-
-    public static <K, V> KeyValueStore<K, V> newKeyValueStore(final String 
name,
-                                                              final String 
applicationId,
-                                                              final Class<K> 
keyType,
-                                                              final Class<V> 
valueType) {
-        final InMemoryKeyValueStoreSupplier<K, V> supplier = new 
InMemoryKeyValueStoreSupplier<>(name,
-                                                                               
                  null,
-                                                                               
                  null,
-                                                                               
                  new MockTime(),
-                                                                               
                  false,
-                                                                               
                  Collections.<String, String>emptyMap());
-
-        final StateStore stateStore = supplier.get();
-        stateStore.init(
-            new InternalMockProcessorContext(
-                StateSerdes.withBuiltinTypes(
-                    ProcessorStateManager.storeChangelogTopic(applicationId, 
name),
-                    keyType,
-                    valueType),
-                new NoOpRecordCollector()),
-            stateStore);
-        return (KeyValueStore<K, V>) stateStore;
-
-    }
-
-}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index dc045360c1f..c24122abd13 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -20,13 +20,14 @@
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyWrapper;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
 import org.apache.kafka.streams.processor.internals.StateDirectory;
@@ -68,21 +69,13 @@
     private StreamThread threadMock;
     private Map<TaskId, StreamTask> tasks;
 
-    @SuppressWarnings("deprecation")
     @Before
     public void before() {
-        final TopologyBuilder builder = new TopologyBuilder();
-        builder.addSource("the-source", topicName);
-        builder.addProcessor("the-processor", new MockProcessorSupplier(), 
"the-source");
-        builder.addStateStore(Stores.create("kv-store")
-            .withStringKeys()
-            .withStringValues().inMemory().build(), "the-processor");
-
-        builder.addStateStore(Stores.create("window-store")
-            .withStringKeys()
-            .withStringValues()
-            .persistent()
-            .windowed(10, 10, 2, false).build(), "the-processor");
+        final TopologyWrapper topology = new TopologyWrapper();
+        topology.addSource("the-source", topicName);
+        topology.addProcessor("the-processor", new MockProcessorSupplier(), 
"the-source");
+        
topology.addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("kv-store"),
 Serdes.String(), Serdes.String()), "the-processor");
+        
topology.addStateStore(Stores.windowStoreBuilder(Stores.persistentWindowStore("window-store",
 10, 2, 2, false), Serdes.String(), Serdes.String()), "the-processor");
 
         final Properties properties = new Properties();
         final String applicationId = "applicationId";
@@ -96,17 +89,17 @@ public void before() {
         configureRestoreConsumer(clientSupplier, 
"applicationId-kv-store-changelog");
         configureRestoreConsumer(clientSupplier, 
"applicationId-window-store-changelog");
 
-        builder.setApplicationId(applicationId);
-        final ProcessorTopology topology = builder.build(null);
+        topology.setApplicationId(applicationId);
+        final ProcessorTopology processorTopology = 
topology.getInternalBuilder().build();
 
         tasks = new HashMap<>();
         stateDirectory = new StateDirectory(streamsConfig, new MockTime());
 
-        taskOne = createStreamsTask(streamsConfig, clientSupplier, topology, 
new TaskId(0, 0));
+        taskOne = createStreamsTask(streamsConfig, clientSupplier, 
processorTopology, new TaskId(0, 0));
         taskOne.initializeStateStores();
         tasks.put(new TaskId(0, 0), taskOne);
 
-        final StreamTask taskTwo = createStreamsTask(streamsConfig, 
clientSupplier, topology, new TaskId(0, 1));
+        final StreamTask taskTwo = createStreamsTask(streamsConfig, 
clientSupplier, processorTopology, new TaskId(0, 1));
         taskTwo.initializeStateStores();
         tasks.put(new TaskId(0, 1), taskTwo);
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java
index b5379d7142f..06c14eef124 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java
@@ -17,11 +17,13 @@
 package org.apache.kafka.streams.state.internals;
 
 
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.state.NoOpWindowStore;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 import org.apache.kafka.streams.state.ReadOnlyWindowStore;
+import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.test.StateStoreProviderStub;
 import org.junit.Before;
 import org.junit.Test;
@@ -42,9 +44,15 @@ public void before() {
         final StateStoreProviderStub stubProviderTwo = new 
StateStoreProviderStub(false);
 
 
-        stubProviderOne.addStore("kv", 
StateStoreTestUtils.newKeyValueStore("kv", "app-id", String.class, 
String.class));
+        stubProviderOne.addStore("kv", 
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("kv"),
+                Serdes.serdeFrom(String.class),
+                Serdes.serdeFrom(String.class))
+                .build());
         stubProviderOne.addStore("window", new NoOpWindowStore());
-        stubProviderTwo.addStore("kv", 
StateStoreTestUtils.newKeyValueStore("kv", "app-id", String.class, 
String.class));
+        stubProviderTwo.addStore("kv", 
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("kv"),
+                Serdes.serdeFrom(String.class),
+                Serdes.serdeFrom(String.class))
+                .build());
         stubProviderTwo.addStore("window", new NoOpWindowStore());
 
         wrappingStoreProvider = new WrappingStoreProvider(


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove deprecated APIs from KIP-120 and KIP-182 in Streams
> ----------------------------------------------------------
>
>                 Key: KAFKA-6813
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6813
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Guozhang Wang
>            Assignee: Guozhang Wang
>            Priority: Major
>             Fix For: 2.0.0
>
>
> As we move on to the next major release 2.0, we can consider removing the 
> deprecated APIs from KIP-120 and KIP-182.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to